#include <errno.h>
#include <sys/time.h>
#include "entry_event_mgr.h"
#include "persistent_store.h"
#include "event_id.h"
#include "system_time.h"
#include "os_specific.h"
#include "msg.h"
#include "opus_exceptions.h"
#include "file_persistent_store.h"
#include "file_entry.h"

//////////////////////////////////////////////////////////////////////////
//
// File: caching_bb.templates
//
// Description:
//
//    Template definitions for the Caching_bb class.
//
// Example:
//
//    none
//
// Modification History:
//
// Date      OPR      Who         Reason
// --------- -------- ----------- ----------------------------------------
// ???       ???      WMiller     Initial code
// 07/25/03  46127    Sontag      Special case for File_persistent_store
// 10/30/03  49601    Sontag      Add call to check entry during post
// 04/26/06  55700    Sontag      Can not use endl here without namespace
// 03/03/07  57369    Sontag      Use size_t to loop over collections
//////////////////////////////////////////////////////////////////////////

template<class T>
const std::string Caching_bb<T>::CACHING_BB_NAME("Caching_bb/");

template<class T>
const int Caching_bb<T>::RETRY_INTERVAL = 10;

template<class T>
const int Caching_bb<T>::MAX_LOCK_ATTEMPTS = 3;

template<class T>
Caching_bb<T>::Caching_bb(Entry_event_mgr* em, Persistent_store* pe,
                          const int mp) : persist(pe), evt_mgr(em),
                                          max_pend(mp), event_id(new Event_id)
{
   pthread_cond_init(&locked_entry, 0);
   pthread_mutex_init(&lock_cache_lock, 0);
   pthread_mutex_init(&cache_lock, 0);
   // load cache from pesistent store
   T* e = new T;

   // Only wildcard when not File_entry with a File_persistent_store, since the
   // empty entry would use './*' in cwd, which may not be at all OPUS related.
   if (0 == dynamic_cast<File_persistent_store*>(persist) ||
       0 == dynamic_cast<File_entry*>(e))
   {
      e->search_fill_all(); // adds appropriate wildcarding
   }

   std::vector<Entry*> ents;
   persist->search(e, ents);
   delete e;
   for (size_t i = 0; i < ents.size(); ++i) {
      add_to_cache(ents[i]);
      delete ents[i];
   }
}

template<class T>
Caching_bb<T>::~Caching_bb()
{
   delete evt_mgr;
   delete persist;
   pthread_cond_destroy(&locked_entry);
   pthread_mutex_destroy(&lock_cache_lock);
   pthread_mutex_destroy(&cache_lock);

   // destroy caches
   remove_all_from_cache();
   std::list<Entry*>::iterator lci = lock_cache.begin();
   while (lci != lock_cache.end()) {
      delete *lci;
      ++lci;
   }

   delete event_id;
}

template<class T>
void Caching_bb<T>::post(const Entry* e)
{
   if (!persist->is_consistent()) {
      Msg m;
      m << sev(Msg::E) << type(Msg::CORRUPT) <<
         "Caching_bb::post - The persistent store reports an inconsistency."
         << endm;
      write_store();
   }
   if (!e) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::post - Null argument." << endm;
      throw Bad_val<void*>(0);
   }
   e->check();

   Entry* tmp = e->clone();
   T* t;
   if (!(t = dynamic_cast<T*>(tmp))) {
      delete tmp;
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::post - Argument of wrong type: " << e->str() << endm;
      throw Type<const Entry*>(e, e->str());
   }
   
   // check if a duplicate
   t->search_mask();
   pthread_mutex_lock(&cache_lock);
   if (find_in_cache(t) != end_of_cache()) {
      pthread_mutex_unlock(&cache_lock);
      delete t;
      Msg m;
      m << sev(Msg::E) << type(Msg::DUPL) <<
         "Caching_bb::post - Entry already exists: " << e->str() << endm;
      throw Already<const Entry*>(e, e->str());
   }
   delete t;

   // insert in cache
   if (max_pend && persist->queue_size() > max_pend) persist->sync(max_pend);
   add_to_cache(e);
   try {
      persist->save(e);
      evt_mgr->notify(e, 0, Entry_event_mgr::CREATED, ++*event_id);
      pthread_mutex_unlock(&cache_lock);
   }
   catch(...) {
      pthread_mutex_unlock(&cache_lock);
      throw;
   }

   Msg m;
   m << sev(Msg::D) << "Caching_bb::post(" << e->str() << ") - Completed."
     << endm;
}

template<class T>
void Caching_bb<T>::erase(const Entry* e)
{
   if (!persist->is_consistent()) {
      Msg m;
      m << sev(Msg::E) << type(Msg::CORRUPT) <<
         "Caching_bb::erase - The persistent store reports an inconsistency."
      << endm;
      write_store();
   }
   if (!e) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::erase - Null argument." << endm;
      throw Bad_val<void*>(0);
   }

   if (!dynamic_cast<const T*>(e)) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::erase - Argument of wrong type: " << e->str() << endm;
      throw Type<const Entry*>(e, e->str());
   }

   pthread_mutex_lock(&cache_lock);
   if (max_pend && persist->queue_size() > max_pend) persist->sync(max_pend);
   std::list<Entry*>::iterator li;
   if ((li = find_in_cache(e)) == end_of_cache()) {
      pthread_mutex_unlock(&cache_lock);
      Msg m;
      m << sev(Msg::E) << type(Msg::MISSING) <<
         "Caching_bb::erase - Entry not found: " << e->str() << endm;
      throw No_entry<const Entry*>(e, e->str());
   }
   delete *li;
   remove_from_cache(li);
   try {
      persist->remove(e);
      evt_mgr->notify(e, 0, Entry_event_mgr::DELETED, ++*event_id);
      pthread_mutex_unlock(&cache_lock);
   }
   catch(...) {
      pthread_mutex_unlock(&cache_lock);
      throw;
   }

   Msg m;
   m << sev(Msg::D) << "Caching_bb::erase(" << e->str() << ") - Completed."
     << endm;
}

template<class T>
void Caching_bb<T>::replace(const Entry* old_ent, const Entry* new_ent)
{
   if (!persist->is_consistent()) {
      Msg m;
      m << sev(Msg::E) << type(Msg::CORRUPT) <<
         "Caching_bb::replace - The persistent store reports an inconsistency."
        << endm;
      write_store();
   }
   if (!old_ent || !new_ent) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::replace - Null argument." << endm;
      throw Bad_val<void*>(0);
   }

   const T* t = 0;
   if (!(t = dynamic_cast<const T*>(old_ent)) ||
       !dynamic_cast<const T*>(new_ent)) {
      const Entry* e = (t) ? new_ent : old_ent;
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::replace - Argument of wrong type: " << e->str() << endm;
      throw Type<const Entry*>(e, e->str());
   }

   pthread_mutex_lock(&cache_lock);
   if (max_pend && persist->queue_size() > max_pend) persist->sync(max_pend);
   std::list<Entry*>::iterator li;
   if ((li = find_in_cache(old_ent)) == end_of_cache()) {
      pthread_mutex_unlock(&cache_lock);
      Msg m;
      m << sev(Msg::E) << type(Msg::MISSING) <<
         "Caching_bb::replace - Entry not found: " << old_ent->str() << endm;
      throw No_entry<const Entry*>(old_ent, old_ent->str());
   }
   delete *li;
   *li = new_ent->clone();
   try {
      persist->replace(old_ent, new_ent);
      evt_mgr->notify(old_ent, new_ent, Entry_event_mgr::MODIFIED, ++*event_id);
      pthread_mutex_unlock(&cache_lock);
   }
   catch(...) {
      pthread_mutex_unlock(&cache_lock);
      throw;
   }

   Msg m;
   m << sev(Msg::D) << "Caching_bb::replace(" << old_ent->str() << 
     ") - Completed." << endm;
}

template<class T>
int Caching_bb<T>::search(const Entry* e, std::vector<Entry*>& res) const
{
   if (!e) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::search - Null argument." << endm;
      throw Bad_val<void*>(0);
   }

   if (!dynamic_cast<const T*>(e)) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::search - Argument of wrong type: " << e->str() << endm;
      throw Type<const Entry*>(e, e->str());
   }

   pthread_mutex_lock(&cache_lock);
   find_all_in_cache(e, res);
   // get next event id
   int id = ++*event_id;
   pthread_mutex_unlock(&cache_lock);
   return id;
}

template<class T>
Opus_lock* Caching_bb<T>::lock_entry(const Entry* e)
{
   if (!e) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::lock_entry - Null argument." << endm;
      throw Bad_val<void*>(0);
   }

   Entry* tmp = e->clone();
   T* t;
   if (!(t = dynamic_cast<T*>(tmp))) {
      delete tmp;
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::lock_entry - Argument of wrong type: " << e->str()
        << endm;
      throw Type<const Entry*>(e, e->str());
   }

   t->mask();
   if (!get_lock_cache_lock(t)) {
      delete t;
      Msg m;
      m << sev(Msg::W) << type(Msg::LOCK) <<
         "Caching_bb::lock_entry - Entry already locked: " << e->str() << endm;
      throw Locked<const Entry*>(e, e->str());
   }
   lock_cache.push_front(t->clone());
   pthread_mutex_unlock(&lock_cache_lock);
   pthread_cond_signal(&locked_entry);
   std::list<Entry*>::const_iterator li;
   t->search_mask();
   Cached_entry_lock<T>* cl = 0;
   pthread_mutex_lock(&cache_lock);
   if ((li = find_in_cache(t)) == end_of_cache()) {
      cl = new Cached_entry_lock<T>(e, this);
   } else {
      cl = new Cached_entry_lock<T>(*li, this);
   }
   pthread_mutex_unlock(&cache_lock);
   delete t;
   Msg m;
   m << sev(Msg::D) << "Caching_bb::lock_entry(" << str() << ") - Locked "
     << e->str() << endm;
   return cl;
}

template<class T>
bool Caching_bb<T>::get_lock_cache_lock(const Entry* e)
{
   pthread_mutex_lock(&lock_cache_lock);
   struct timeval tval;
   gettimeofday(&tval, 0); // get current time in UTC(!)
   struct timespec timer;
#ifdef ITS_IBM_AIX
// AIX link error: Undefined symbols MAX_LOCK_ATTEMPTS and RETRY_INTERVAL.
// This is kludgy but there is not enough time now to fix this correctly.
   timer.tv_sec = tval.tv_sec + 30;
#else
   timer.tv_sec = tval.tv_sec + MAX_LOCK_ATTEMPTS * RETRY_INTERVAL;
#endif
   timer.tv_nsec = 0;
   int wait_status;
   while (find_in_lock_cache(e) != lock_cache.end()) {
      wait_status = pthread_cond_timedwait(&locked_entry, 
                                           &lock_cache_lock,
                                           &timer);
      if (wait_status == ETIMEDOUT) {
         pthread_mutex_unlock(&lock_cache_lock);
         return false;
      }
   }
   return true; // lock_cache_lock is locked at this point!
}

template<class T>
void Caching_bb<T>::unlock_entry(const Entry* e)
{
   if (!e) {
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::unlock_entry - Null argument." << endm;
      throw Bad_val<void*>(0);
   }

   Entry* tmp = e->clone();
   T* t;
   if (!(t = dynamic_cast<T*>(tmp))) {
      delete tmp;
      Msg m;
      m << sev(Msg::E) << type(Msg::BADVAL) <<
         "Caching_bb::unlock_entry - Argument of wrong type: " << e->str()
        << endm;
      throw Type<const Entry*>(e, e->str());
   }

   t->mask();
   pthread_mutex_lock(&lock_cache_lock);
   std::list<Entry*>::iterator li;
   if ((li = find_in_lock_cache(t)) == lock_cache.end()) {
      pthread_mutex_unlock(&lock_cache_lock);
      delete t;
      Msg m;
      m << sev(Msg::E) << type(Msg::MISSING) <<
         "Caching_bb::unlock_entry - Entry not found: " << e->str() << endm;
      throw No_entry<const Entry*>(e, e->str());
   }
   delete t;
   delete *li;
   lock_cache.erase(li);
   pthread_mutex_unlock(&lock_cache_lock);
   pthread_cond_signal(&locked_entry);
   Msg m;
   m << sev(Msg::D) << "Caching_bb::unlock_entry(" << str() << ") - Unlocked "
     << e->str() << endm;
}

template<class T>
std::string Caching_bb<T>::str() const
{
   return CACHING_BB_NAME;
}

template<class T>
bool Caching_bb<T>::test_driver()
{
   return true;
}

template<class T>
std::list<Entry*>::iterator Caching_bb<T>::find_in_cache(const Entry* e)
{
   std::list<Entry*>::iterator li = cache.begin();
   while (li != cache.end()) {
      if (**li == e) return li;
      ++li;
   }
   return li;
}

template<class T>
std::list<Entry*>::iterator Caching_bb<T>::find_in_lock_cache(const Entry* e)
{
   std::list<Entry*>::iterator li = lock_cache.begin();
   while (li != lock_cache.end()) {
      if (**li == e) return li;
      ++li;
   }
   return li;
}

template<class T>
std::list<Entry*>::iterator Caching_bb<T>::end_of_cache()
{
   return cache.end();
}

template<class T>
int Caching_bb<T>::find_all_in_cache(const Entry* e, std::vector<Entry*>& res)
   const
{
   std::list<Entry*>::const_iterator li = cache.begin();
   while(li != cache.end()) {
      if (**li == e) {
         res.push_back((*li)->clone());
         Msg m;
         m << sev(Msg::D) << "Caching_bb::search(" << str() <<
            ") - Search for " << e->str() << " found " << (*li)->str()
           << "." << endm;
      }
      ++li;
   }
   return res.size();
}

template<class T>
void Caching_bb<T>::add_to_cache(const Entry* e)
{
   cache.push_back(e->clone());
}

template<class T>
void Caching_bb<T>::remove_from_cache(std::list<Entry*>::iterator& it)
{
   cache.erase(it);
}

template<class T>
void Caching_bb<T>::remove_all_from_cache()
{
   std::list<Entry*>::iterator li = cache.begin();
   while (li != cache.end()) {
      delete *li;
      ++li;
   }
   cache.clear();
}

template<class T>
void Caching_bb<T>::write_store()
{
   pthread_mutex_lock(&cache_lock);
   if (persist->is_consistent()) {
      Msg m;
      m << sev(Msg::I) << "Caching_bb::write_store() - Cache/persistent store"
         " now syncronized. No action taken." << endm;
      pthread_mutex_unlock(&cache_lock);
      return;
   }
   T* e = 0;
   std::vector<Entry*> ents;
   Msg m;
   std::string sync("SYNC");
   try {
      m << sev(Msg::I) << type(sync) << "Caching_bb::write_store - "
         "Examining/deleting persistent store entries..." << endm;
      persist->sync(); // flush pending persistent store operations
      e = new T;
      e->search_fill_all();
      persist->search(e, ents);
      std::list<Entry*>::iterator li;
      for (size_t i = 0; i < ents.size(); ++i) {
         if ((li = find_in_cache(ents[i])) == end_of_cache()) {
            m << sev(Msg::I) << type(sync) << "Caching_bb::write_store - "
               "The following persistent store entry was not found in the "
               "cache:\n" << ents[i]->str() << endm;
            T* t = dynamic_cast<T*>(ents[i]);
            t->search_mask();
            if ((li = find_in_cache(t)) != end_of_cache()) {
               m << sev(Msg::I) << type(sync) << "Caching_bb::write_store - "
                  "Cache version of entry in previous " << sync <<
                   " message:\n" << (*li)->str() << endm;
            }
         }
         persist->remove(ents[i]);
         delete ents[i]; ents[i] = 0;
      }
      ents.clear();
      persist->sync(); // flush pending persistent store removes
      int num = find_all_in_cache(e, ents);
      delete e; e = 0;
      m << sev(Msg::I) << type(sync) << "Caching_bb::write_store - "
         "Writing cache (" << num << " entries) to the persistent store" 
        << endm;
      for (size_t i = 0; i < ents.size(); ++i) {
         persist->save(ents[i]);
         delete ents[i]; ents[i] = 0;
      }
   }
   catch(Opus_exceptions& oe) {
      m << sev(Msg::E) << type(sync) << "Caching_bb::write_store - "
         "Unable to refresh persistent store:\n" << oe.full() << endm;
      delete e;
      for (size_t i = 0; i < ents.size(); ++i) delete ents[i];
      throw Corrupt<Persistent_store*>(persist);
   }
   catch(...) {
      m << sev(Msg::E) << type(sync) << "Caching_bb::write_store - "
         "Unable to refresh persistent store." << endm;
      delete e;
      for (size_t i = 0; i < ents.size(); ++i) delete ents[i];
      throw Corrupt<Persistent_store*>(persist);
   }
   persist->set_consistency(true);
}
