Files for C++ Internal Poller Example


What does the C++ code "CxxPoller.cpp" look like ?

#include "dataset.h"
#include "msg.h"
#include "opus_lock.h"
#include "osf.h"
#include "rootname.h"
#include "CxxPoller.h"

using namespace std;


////////////////////////////////////////////////////////////////////////////////
//
// Name : main
//
// Purpose:
//    This is the main program.  See the CxxPoller class later in this file.
//
// Description:
//    This program contains all the routines which require an interface
//    with the OAPI.  This main routine can be run in a pipeline or in
//    interactive (command-line) mode.
//
//    If this is operating in the pipeline mode, then this just
//    establishes a connection with opus, and registers the required
//    callbacks.  If called interactively from the command line, this
//    processes the dataset and exits.
//
// Exceptions thrown:
//    None
//
//////////////////////////////////////////////////////////////////////////

int main (int argc, char *argv[])
{
   Msg m;
   m << "Starting c++poller processing." << endm;

   // Create an opus environment

   Opus_env opus(argc, argv);
   if (!opus.is_initialized())
   {
      m << sev(Msg::E)
        << "Failed to initialize OPUS environment." << endm;
      exit(1);
   }
   else
   {
      m << "Initialized OPUS environment." << endm;
   }

   // Create and run the CxxPoller task.

   Pipeline_task* task = 0;

   task = CxxPoller::get_instance();

   task->init(opus); // register callbacks, etc
   task->run(opus);  // main event loop, handles exceptions

   delete task;
   return 0;

} // main


//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::CxxPoller
//
// Purpose:
//    Constructor
//
// Exceptions thrown:
//    none
//
//////////////////////////////////////////////////////////////////////////

CxxPoller::CxxPoller():
 _numFileEvents(0),
 _numOsfEvents(0)
{
   // empty for now
}


//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::~CxxPoller
//
// Purpose:
//    Destructor
//
// Exceptions thrown:
//    none
//
//////////////////////////////////////////////////////////////////////////

CxxPoller::~CxxPoller()
{
   // empty for now
}


//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::get_instance
//
// Purpose:
//    Get or create the only instance of this class.
//
// Description:
//    Subclasses should override this method to return a new instance of
//    themselves.  Subclasses MUST retain the logic used in this method
//    that checks and sets the member "_theInstance".
//
// Returns:
//    A pointer to the single instance of CxxPoller.
//
// Exceptions thrown:
//    none
//
//////////////////////////////////////////////////////////////////////////

Pipeline_task* const
CxxPoller::get_instance()
{
   if (0 == _theInstance)
   {
      _theInstance = new CxxPoller();
   }
   return _theInstance;
}


//////////////////////////////////////////////////////////////////////////////
//
// Name: process_osf_event
//
// Purpose:
//      This is called for each OSF event.
//
// Description:
//      When the CxxPoller task is triggered by an OSF, this method
//      will be called.  We first get the dataset name from the
//      event, and then process it.
//
// Returns:
//      None
//
// Exceptions thrown:
//      None
//
//////////////////////////////////////////////////////////////////////////////

void
CxxPoller::process_osf_event(const string& title, Event* evt,
                             const Opus_env& opus)
{
   Msg m;

   // Get the dataset name

   Dataset* ds = new Dataset();
   Event::iterator ei = evt->begin();
   (*ei)->get_field(ds);
   string datasetName = ds->ustr();
   delete ds;

   string endState("C++POLLER_OK");

   // Process the dataset

   try
   {
      m << sev(Msg::I) << "Processing OSF dataset: " << datasetName
        << ". Event #" << _numOsfEvents++ << ". " << endm;

      process_dataset(opus, datasetName, false);
   }

   catch (Opus_exceptions &oe)
   {
      m << "Error processing: " << datasetName << ": " << oe.which()
        << endl << oe.str() << endm;
      endState = "C++POLLER_ERROR";
   }
   catch (...)
   {
      m << "Unknown error processing: " << datasetName << endm;
      endState = "C++POLLER_ERROR";
   }


   // Lock the event entries & close the event.  Update OSF to final status

   vector locks;
   try
   {
      evt->lock_list(locks);  // populate vector
      opus.close_event(endState, evt, ei);
   }
   catch (...)
   {
      m << sev(Msg::E) << "Error updating OSF for " << datasetName << endm;
   }

   for (int i = 0; i < locks.size(); i++) delete locks[i];

   m << sev(Msg::I) << "Completed processing request for "
     << datasetName << endm;

} // process_osf_event


//////////////////////////////////////////////////////////////////////////////
//
// Name: process_file_event
//
// Purpose:
//      This is called when this task is invoked from the command line,
//      which generates File Events.
//
// Description:
//      When the CxxPoller task is triggered by a File Event, this method
//      will be called.  We first get the dataset name from the
//      event (as specified on the command line), and then process it.
//      There is no OSF processing involved.
//
// Returns:
//      None
//
// Exceptions thrown:
//      None
//
//////////////////////////////////////////////////////////////////////////////

void
CxxPoller::process_file_event(const string& title, Event* evt,
                              const Opus_env& opus)
{
   Msg m;

   // Get the rootname from the file event

   Rootname* rn = new Rootname();
   Event::iterator ei = evt->begin();
   (*ei)->get_field(rn);
   string datasetName = rn->ustr();
   delete rn;

   // Process the dataset

   try
   {
      m << sev(Msg::I) << "Processing File-Event dataset: " << datasetName
        << ". Event #" << _numFileEvents++ << ". " << endm;

      process_dataset(opus, datasetName, true);
   }

   catch (Opus_exceptions &oe)
   {
      m << "Error processing: " << datasetName << ": " << oe.which()
        << endl << oe.str() << endm;
   }
   catch (...)
   {
      m << "Unknown error processing: " << datasetName << endm;
   }

   // Lock the event entries & close the event

   vector locks;
   try
   {
      evt->lock_list(locks); // populate vector
      opus.close_event(Opus_env::IGNORE_EVENT, evt);
   }
   catch (...)
   {
      m << sev(Msg::E) << "Error closing event for " << datasetName << endm;
   }

   for (int i = 0; i < locks.size(); i++) delete locks[i];

   m << sev(Msg::I) << "Completed processing request for "
     << datasetName << endm;

} // process_file_event


//////////////////////////////////////////////////////////////////////////////
//
// Name: process_halt_event
//
// Purpose:
//      This is called for each halt event encountered.
//
// Description:
//      When this process receives a 'halt' event, it just exits.
//
// Returns:
//      None
//
// Exceptions thrown:
//      None
//
//////////////////////////////////////////////////////////////////////////////

void
CxxPoller::process_halt_event(const string& title, Event* evt,
                              const Opus_env& opus)
{
   Msg m;
   m << sev(Msg::I) << "Exiting for halt event." << endm;
   exit(0);
}


//////////////////////////////////////////////////////////////////////////////
//
// Name: process_dataset
//
// Purpose:
//    Main routine that does the actual CxxPoller processing.
//
// Returns:
//    none
//
// Exceptions thrown:
//    Severe:    if anything fails during processing
//
//////////////////////////////////////////////////////////////////////////////

void
CxxPoller::process_dataset(const Opus_env& opus,
                           const string& datasetName,
                           const bool datasetIsAbsolutePath)
{
   Msg m;

   // Get the INPATH resource, as an example resource value.  Say this is
   // the path to our data.

   string inpath;
   try
   {
      inpath = opus.get_res_item("INPATH");
   }
   catch(No_entry)
   {
      m << sev(Msg::W) << "No INPATH value found in resource file." << endm;
      inpath = ".";
   }

   // Find the path to the dataset itself.

   string fullpath = datasetName;
   if (!datasetIsAbsolutePath)
      fullpath = inpath + "/" + datasetName;

   // Let's say that our process simply does a system call 'ls' on
   // the dataset name, which it assumes to be a filename.  Your internal
   // poller can do anything here, and the dataset name need not be a
   // filename - it can represent anything depending on how your pipeline
   // is designed.

   string cmd = "ls -ld " + fullpath + "*";
   m << "Running command: " << cmd << endm;

   if (0 != system(cmd.c_str()))
   {
      m << sev(Msg::E) << "Error listing dataset: " << fullpath << endm;

      throw Severe("Failure during c++poller processing.");
   }

} // process_dataset

What does the resource file "c++poller.resource" look like ?

!--------------------------------------------------------------------
!
! c++poller Resource File
!
! Example C++ Internal Poller using an OSF trigger
!
!--------------------------------------------------------------------

PROCESS_NAME = c++poller
TASK = 
DESCRIPTION = 'Example C++ Internal Poller'
SYSTEM = SAMPLE
CLASS = ALL
!
OSF_RANK           = 1           ! Sets this to use an OSF trigger
OSF_TRIGGER1.KW    = w           ! when OSF column KW = "w" (waiting)
OSF_PROCESSING.KW  = p           ! then set column KW to "p" when triggered
C++POLLER_OK.KW    = c           ! then set column KW to "c" on success
C++POLLER_ERROR.KW = e           ! or to "e" if the process has a failure
!
POLLING_TIME       = 10          ! Wait (seconds) before polling for next
MINBLOCKS          = 2000        ! blocks required on output disk
INPATH             = input_data  ! make INPATH available to process
OUTPATH            = input_data  ! just throw it all in the same dir
ERRPATH            = input_data  !    for this example
!
! And the path file should contain the following key-value pairs:
!
! STAGE_FILE             = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage
! OPUS_OBSERVATIONS_DIR  = (set this to your OSF directory)
! input_data             = (set this to the location of input files)

What does the make file "c++poller.make" look like ?

######################################################################
# 
# c++poller.make
#
# Use this simplified make file (e.g: gmake -f c++poller.make)
# to build the 'c++poller' example internal poller under:
#
#      Solaris 2.8  (GCC 3.2), and
#      RH Linux 8.0 (GCC 3.2)
#
# Note to OPUS developers - this is intentionally not a part
# of the normal OPUS build.  It is intended to be built and
# tested outside of the OPUS environmnt, as a Sample Pipeline
# user would encounter it.
#
######################################################################

#
# !!!
# EDIT THE FOLLOWING LINE !  Set OPUS_DIST to your OPUS installation path
OPUS_DIST = /usr/local/opus
# !!!
#

BIN = c++poller
OPUS_INCS = $(OPUS_DIST)/inc

ifeq ($(shell uname),SunOS)
   C++FLAGS = -DITS_UNIX -DITS_SPARC_SOLARIS -DACE_HAS_EXCEPTIONS -pthreads
   OPUS_LIBS = $(OPUS_DIST)/lib/sparc_solaris
else
   C++FLAGS = -DITS_UNIX -DITS_LINUX -DACE_HAS_EXCEPTIONS -pthread
   OPUS_LIBS = $(OPUS_DIST)/lib/linux
endif

COMMON = -g -I$(OPUS_INCS) -L$(OPUS_LIBS) \
   -loapi -losfile -ldscp -losys -lstr -ltime_utils -ldes -lstcrypt -lerr_pkg \
   -lACE -lTAO -lTAO_CosEvent -lTAO_CosNaming -lTAO_IORTable \
   -lTAO_Svc_Utils -lTAO_PortableServer

SRCS = CxxPoller.cpp

$(BIN): $(SRCS)
	g++ $(C++FLAGS) $(COMMON) $? -o $@

clean:
	-rm -f $(BIN) *~ *.o *.a *.so

What does the output look like in the log file ?

ODCL: +++ odcl_run_process.csh started +++
ODCL:
ODCL: OPUS 6.1
ODCL:
ODCL: Input parameters:
ODCL:     PROCESS_NAME = c++poller
ODCL:     PATH_FILE    = /home/sontag/opus_test/definitions//quick.path
ODCL:     PATH_BASENAME = quick.path
ODCL:     PATH_BASEROOT = quick
ODCL:     TIME_STAMP   = 3f71cf7b (date: 24-Sep-03 17:08:11)
ODCL:     PASSWORD? Yes
ODCL:
ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command)
ODCL:
ODCL:
ODCL: Task to be run: c++poller (/home/sontag/bin/c++poller)
ODCL:
ODCL:
ODCL: Creating process PSTAT...(odcl_create_psffile)
ODCL:
ODCL:
ODCL: Running the process...
ODCL:
2003267170831-I-INFO Starting c++poller processing. (1)
2003267170832-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1)
2003267170832-I-INFO Initialized OPUS environment. (1)
2003267170832-I-INFO Registered callbacks. (1)
2003267171242-I-INFO Processing OSF dataset: my_osf. Event #0.  (1)
2003267171242-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1)
2003267171242-I-INFO Running command: ls -ld /home/sontag/opus_test/quick/input//my_osf* (1)
-rw-r--r--   1 sontag          0 Sep 24 17:12 /home/sontag/opus_test/quick/input//my_osf_file.txt
2003267171242-I-INFO Completed processing request for my_osf (1)
2003267171723-I-INFO Halt event: halt registered (1)
2003267171723-I-INFO Exiting for halt event. (1)
ODCL:
ODCL: Process exited. Cleaning up...(odcl_cleanup)
ODCL:

Back to the Top

Back to Example