Files for C++ Internal Poller Example
What does the C++ code "CxxPoller.cpp" look like ?
#include "dataset.h"
#include "msg.h"
#include "opus_lock.h"
#include "osf.h"
#include "rootname.h"
#include "CxxPoller.h"
using namespace std;
////////////////////////////////////////////////////////////////////////////////
//
// Name : main
//
// Purpose:
// This is the main program. See the CxxPoller class later in this file.
//
// Description:
// This program contains all the routines which require an interface
// with the OAPI. This main routine can be run in a pipeline or in
// interactive (command-line) mode.
//
// If this is operating in the pipeline mode, then this just
// establishes a connection with opus, and registers the required
// callbacks. If called interactively from the command line, this
// processes the dataset and exits.
//
// Exceptions thrown:
// None
//
//////////////////////////////////////////////////////////////////////////
int main (int argc, char *argv[])
{
Msg m;
m << "Starting c++poller processing." << endm;
// Create an opus environment
Opus_env opus(argc, argv);
if (!opus.is_initialized())
{
m << sev(Msg::E)
<< "Failed to initialize OPUS environment." << endm;
exit(1);
}
else
{
m << "Initialized OPUS environment." << endm;
}
// Create and run the CxxPoller task.
Pipeline_task* task = 0;
task = CxxPoller::get_instance();
task->init(opus); // register callbacks, etc
task->run(opus); // main event loop, handles exceptions
delete task;
return 0;
} // main
//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::CxxPoller
//
// Purpose:
// Constructor
//
// Exceptions thrown:
// none
//
//////////////////////////////////////////////////////////////////////////
CxxPoller::CxxPoller():
_numFileEvents(0),
_numOsfEvents(0)
{
// empty for now
}
//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::~CxxPoller
//
// Purpose:
// Destructor
//
// Exceptions thrown:
// none
//
//////////////////////////////////////////////////////////////////////////
CxxPoller::~CxxPoller()
{
// empty for now
}
//////////////////////////////////////////////////////////////////////////
//
// Method: CxxPoller::get_instance
//
// Purpose:
// Get or create the only instance of this class.
//
// Description:
// Subclasses should override this method to return a new instance of
// themselves. Subclasses MUST retain the logic used in this method
// that checks and sets the member "_theInstance".
//
// Returns:
// A pointer to the single instance of CxxPoller.
//
// Exceptions thrown:
// none
//
//////////////////////////////////////////////////////////////////////////
Pipeline_task* const
CxxPoller::get_instance()
{
if (0 == _theInstance)
{
_theInstance = new CxxPoller();
}
return _theInstance;
}
//////////////////////////////////////////////////////////////////////////////
//
// Name: process_osf_event
//
// Purpose:
// This is called for each OSF event.
//
// Description:
// When the CxxPoller task is triggered by an OSF, this method
// will be called. We first get the dataset name from the
// event, and then process it.
//
// Returns:
// None
//
// Exceptions thrown:
// None
//
//////////////////////////////////////////////////////////////////////////////
void
CxxPoller::process_osf_event(const string& title, Event* evt,
const Opus_env& opus)
{
Msg m;
// Get the dataset name
Dataset* ds = new Dataset();
Event::iterator ei = evt->begin();
(*ei)->get_field(ds);
string datasetName = ds->ustr();
delete ds;
string endState("C++POLLER_OK");
// Process the dataset
try
{
m << sev(Msg::I) << "Processing OSF dataset: " << datasetName
<< ". Event #" << _numOsfEvents++ << ". " << endm;
process_dataset(opus, datasetName, false);
}
catch (Opus_exceptions &oe)
{
m << "Error processing: " << datasetName << ": " << oe.which()
<< endl << oe.str() << endm;
endState = "C++POLLER_ERROR";
}
catch (...)
{
m << "Unknown error processing: " << datasetName << endm;
endState = "C++POLLER_ERROR";
}
// Lock the event entries & close the event. Update OSF to final status
vector locks;
try
{
evt->lock_list(locks); // populate vector
opus.close_event(endState, evt, ei);
}
catch (...)
{
m << sev(Msg::E) << "Error updating OSF for " << datasetName << endm;
}
for (int i = 0; i < locks.size(); i++) delete locks[i];
m << sev(Msg::I) << "Completed processing request for "
<< datasetName << endm;
} // process_osf_event
//////////////////////////////////////////////////////////////////////////////
//
// Name: process_file_event
//
// Purpose:
// This is called when this task is invoked from the command line,
// which generates File Events.
//
// Description:
// When the CxxPoller task is triggered by a File Event, this method
// will be called. We first get the dataset name from the
// event (as specified on the command line), and then process it.
// There is no OSF processing involved.
//
// Returns:
// None
//
// Exceptions thrown:
// None
//
//////////////////////////////////////////////////////////////////////////////
void
CxxPoller::process_file_event(const string& title, Event* evt,
const Opus_env& opus)
{
Msg m;
// Get the rootname from the file event
Rootname* rn = new Rootname();
Event::iterator ei = evt->begin();
(*ei)->get_field(rn);
string datasetName = rn->ustr();
delete rn;
// Process the dataset
try
{
m << sev(Msg::I) << "Processing File-Event dataset: " << datasetName
<< ". Event #" << _numFileEvents++ << ". " << endm;
process_dataset(opus, datasetName, true);
}
catch (Opus_exceptions &oe)
{
m << "Error processing: " << datasetName << ": " << oe.which()
<< endl << oe.str() << endm;
}
catch (...)
{
m << "Unknown error processing: " << datasetName << endm;
}
// Lock the event entries & close the event
vector locks;
try
{
evt->lock_list(locks); // populate vector
opus.close_event(Opus_env::IGNORE_EVENT, evt);
}
catch (...)
{
m << sev(Msg::E) << "Error closing event for " << datasetName << endm;
}
for (int i = 0; i < locks.size(); i++) delete locks[i];
m << sev(Msg::I) << "Completed processing request for "
<< datasetName << endm;
} // process_file_event
//////////////////////////////////////////////////////////////////////////////
//
// Name: process_halt_event
//
// Purpose:
// This is called for each halt event encountered.
//
// Description:
// When this process receives a 'halt' event, it just exits.
//
// Returns:
// None
//
// Exceptions thrown:
// None
//
//////////////////////////////////////////////////////////////////////////////
void
CxxPoller::process_halt_event(const string& title, Event* evt,
const Opus_env& opus)
{
Msg m;
m << sev(Msg::I) << "Exiting for halt event." << endm;
exit(0);
}
//////////////////////////////////////////////////////////////////////////////
//
// Name: process_dataset
//
// Purpose:
// Main routine that does the actual CxxPoller processing.
//
// Returns:
// none
//
// Exceptions thrown:
// Severe: if anything fails during processing
//
//////////////////////////////////////////////////////////////////////////////
void
CxxPoller::process_dataset(const Opus_env& opus,
const string& datasetName,
const bool datasetIsAbsolutePath)
{
Msg m;
// Get the INPATH resource, as an example resource value. Say this is
// the path to our data.
string inpath;
try
{
inpath = opus.get_res_item("INPATH");
}
catch(No_entry)
{
m << sev(Msg::W) << "No INPATH value found in resource file." << endm;
inpath = ".";
}
// Find the path to the dataset itself.
string fullpath = datasetName;
if (!datasetIsAbsolutePath)
fullpath = inpath + "/" + datasetName;
// Let's say that our process simply does a system call 'ls' on
// the dataset name, which it assumes to be a filename. Your internal
// poller can do anything here, and the dataset name need not be a
// filename - it can represent anything depending on how your pipeline
// is designed.
string cmd = "ls -ld " + fullpath + "*";
m << "Running command: " << cmd << endm;
if (0 != system(cmd.c_str()))
{
m << sev(Msg::E) << "Error listing dataset: " << fullpath << endm;
throw Severe("Failure during c++poller processing.");
}
} // process_dataset
What does the resource file "c++poller.resource" look like ?
!--------------------------------------------------------------------
!
! c++poller Resource File
!
! Example C++ Internal Poller using an OSF trigger
!
!--------------------------------------------------------------------
PROCESS_NAME = c++poller
TASK =
DESCRIPTION = 'Example C++ Internal Poller'
SYSTEM = SAMPLE
CLASS = ALL
!
OSF_RANK = 1 ! Sets this to use an OSF trigger
OSF_TRIGGER1.KW = w ! when OSF column KW = "w" (waiting)
OSF_PROCESSING.KW = p ! then set column KW to "p" when triggered
C++POLLER_OK.KW = c ! then set column KW to "c" on success
C++POLLER_ERROR.KW = e ! or to "e" if the process has a failure
!
POLLING_TIME = 10 ! Wait (seconds) before polling for next
MINBLOCKS = 2000 ! blocks required on output disk
INPATH = input_data ! make INPATH available to process
OUTPATH = input_data ! just throw it all in the same dir
ERRPATH = input_data ! for this example
!
! And the path file should contain the following key-value pairs:
!
! STAGE_FILE = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage
! OPUS_OBSERVATIONS_DIR = (set this to your OSF directory)
! input_data = (set this to the location of input files)
What does the make file "c++poller.make" look like ?
######################################################################
#
# c++poller.make
#
# Use this simplified make file (e.g: gmake -f c++poller.make)
# to build the 'c++poller' example internal poller under:
#
# Solaris 2.8 (GCC 3.2), and
# RH Linux 8.0 (GCC 3.2)
#
# Note to OPUS developers - this is intentionally not a part
# of the normal OPUS build. It is intended to be built and
# tested outside of the OPUS environmnt, as a Sample Pipeline
# user would encounter it.
#
######################################################################
#
# !!!
# EDIT THE FOLLOWING LINE ! Set OPUS_DIST to your OPUS installation path
OPUS_DIST = /usr/local/opus
# !!!
#
BIN = c++poller
OPUS_INCS = $(OPUS_DIST)/inc
ifeq ($(shell uname),SunOS)
C++FLAGS = -DITS_UNIX -DITS_SPARC_SOLARIS -DACE_HAS_EXCEPTIONS -pthreads
OPUS_LIBS = $(OPUS_DIST)/lib/sparc_solaris
else
C++FLAGS = -DITS_UNIX -DITS_LINUX -DACE_HAS_EXCEPTIONS -pthread
OPUS_LIBS = $(OPUS_DIST)/lib/linux
endif
COMMON = -g -I$(OPUS_INCS) -L$(OPUS_LIBS) \
-loapi -losfile -ldscp -losys -lstr -ltime_utils -ldes -lstcrypt -lerr_pkg \
-lACE -lTAO -lTAO_CosEvent -lTAO_CosNaming -lTAO_IORTable \
-lTAO_Svc_Utils -lTAO_PortableServer
SRCS = CxxPoller.cpp
$(BIN): $(SRCS)
g++ $(C++FLAGS) $(COMMON) $? -o $@
clean:
-rm -f $(BIN) *~ *.o *.a *.so
What does the output look like in the log file ?
ODCL: +++ odcl_run_process.csh started +++
ODCL:
ODCL: OPUS 6.1
ODCL:
ODCL: Input parameters:
ODCL: PROCESS_NAME = c++poller
ODCL: PATH_FILE = /home/sontag/opus_test/definitions//quick.path
ODCL: PATH_BASENAME = quick.path
ODCL: PATH_BASEROOT = quick
ODCL: TIME_STAMP = 3f71cf7b (date: 24-Sep-03 17:08:11)
ODCL: PASSWORD? Yes
ODCL:
ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command)
ODCL:
ODCL:
ODCL: Task to be run: c++poller (/home/sontag/bin/c++poller)
ODCL:
ODCL:
ODCL: Creating process PSTAT...(odcl_create_psffile)
ODCL:
ODCL:
ODCL: Running the process...
ODCL:
2003267170831-I-INFO Starting c++poller processing. (1)
2003267170832-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1)
2003267170832-I-INFO Initialized OPUS environment. (1)
2003267170832-I-INFO Registered callbacks. (1)
2003267171242-I-INFO Processing OSF dataset: my_osf. Event #0. (1)
2003267171242-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1)
2003267171242-I-INFO Running command: ls -ld /home/sontag/opus_test/quick/input//my_osf* (1)
-rw-r--r-- 1 sontag 0 Sep 24 17:12 /home/sontag/opus_test/quick/input//my_osf_file.txt
2003267171242-I-INFO Completed processing request for my_osf (1)
2003267171723-I-INFO Halt event: halt registered (1)
2003267171723-I-INFO Exiting for halt event. (1)
ODCL:
ODCL: Process exited. Cleaning up...(odcl_cleanup)
ODCL: