#include "dataset.h" #include "msg.h" #include "opus_lock.h" #include "osf.h" #include "rootname.h" #include "CxxPoller.h" using namespace std; //////////////////////////////////////////////////////////////////////////////// // // Name : main // // Purpose: // This is the main program. See the CxxPoller class later in this file. // // Description: // This program contains all the routines which require an interface // with the OAPI. This main routine can be run in a pipeline or in // interactive (command-line) mode. // // If this is operating in the pipeline mode, then this just // establishes a connection with opus, and registers the required // callbacks. If called interactively from the command line, this // processes the dataset and exits. // // Exceptions thrown: // None // ////////////////////////////////////////////////////////////////////////// int main (int argc, char *argv[]) { Msg m; m << "Starting c++poller processing." << endm; // Create an opus environment Opus_env opus(argc, argv); if (!opus.is_initialized()) { m << sev(Msg::E) << "Failed to initialize OPUS environment." << endm; exit(1); } else { m << "Initialized OPUS environment." << endm; } // Create and run the CxxPoller task. Pipeline_task* task = 0; task = CxxPoller::get_instance(); task->init(opus); // register callbacks, etc task->run(opus); // main event loop, handles exceptions delete task; return 0; } // main ////////////////////////////////////////////////////////////////////////// // // Method: CxxPoller::CxxPoller // // Purpose: // Constructor // // Exceptions thrown: // none // ////////////////////////////////////////////////////////////////////////// CxxPoller::CxxPoller(): _numFileEvents(0), _numOsfEvents(0) { // empty for now } ////////////////////////////////////////////////////////////////////////// // // Method: CxxPoller::~CxxPoller // // Purpose: // Destructor // // Exceptions thrown: // none // ////////////////////////////////////////////////////////////////////////// CxxPoller::~CxxPoller() { // empty for now } ////////////////////////////////////////////////////////////////////////// // // Method: CxxPoller::get_instance // // Purpose: // Get or create the only instance of this class. // // Description: // Subclasses should override this method to return a new instance of // themselves. Subclasses MUST retain the logic used in this method // that checks and sets the member "_theInstance". // // Returns: // A pointer to the single instance of CxxPoller. // // Exceptions thrown: // none // ////////////////////////////////////////////////////////////////////////// Pipeline_task* const CxxPoller::get_instance() { if (0 == _theInstance) { _theInstance = new CxxPoller(); } return _theInstance; } ////////////////////////////////////////////////////////////////////////////// // // Name: process_osf_event // // Purpose: // This is called for each OSF event. // // Description: // When the CxxPoller task is triggered by an OSF, this method // will be called. We first get the dataset name from the // event, and then process it. // // Returns: // None // // Exceptions thrown: // None // ////////////////////////////////////////////////////////////////////////////// void CxxPoller::process_osf_event(const string& title, Event* evt, const Opus_env& opus) { Msg m; // Get the dataset name Dataset* ds = new Dataset(); Event::iterator ei = evt->begin(); (*ei)->get_field(ds); string datasetName = ds->ustr(); delete ds; string endState("C++POLLER_OK"); // Process the dataset try { m << sev(Msg::I) << "Processing OSF dataset: " << datasetName << ". Event #" << _numOsfEvents++ << ". " << endm; process_dataset(opus, datasetName, false); } catch (Opus_exceptions &oe) { m << "Error processing: " << datasetName << ": " << oe.which() << endl << oe.str() << endm; endState = "C++POLLER_ERROR"; } catch (...) { m << "Unknown error processing: " << datasetName << endm; endState = "C++POLLER_ERROR"; } // Lock the event entries & close the event. Update OSF to final status vectorlocks; try { evt->lock_list(locks); // populate vector opus.close_event(endState, evt, ei); } catch (...) { m << sev(Msg::E) << "Error updating OSF for " << datasetName << endm; } for (int i = 0; i < locks.size(); i++) delete locks[i]; m << sev(Msg::I) << "Completed processing request for " << datasetName << endm; } // process_osf_event ////////////////////////////////////////////////////////////////////////////// // // Name: process_file_event // // Purpose: // This is called when this task is invoked from the command line, // which generates File Events. // // Description: // When the CxxPoller task is triggered by a File Event, this method // will be called. We first get the dataset name from the // event (as specified on the command line), and then process it. // There is no OSF processing involved. // // Returns: // None // // Exceptions thrown: // None // ////////////////////////////////////////////////////////////////////////////// void CxxPoller::process_file_event(const string& title, Event* evt, const Opus_env& opus) { Msg m; // Get the rootname from the file event Rootname* rn = new Rootname(); Event::iterator ei = evt->begin(); (*ei)->get_field(rn); string datasetName = rn->ustr(); delete rn; // Process the dataset try { m << sev(Msg::I) << "Processing File-Event dataset: " << datasetName << ". Event #" << _numFileEvents++ << ". " << endm; process_dataset(opus, datasetName, true); } catch (Opus_exceptions &oe) { m << "Error processing: " << datasetName << ": " << oe.which() << endl << oe.str() << endm; } catch (...) { m << "Unknown error processing: " << datasetName << endm; } // Lock the event entries & close the event vector locks; try { evt->lock_list(locks); // populate vector opus.close_event(Opus_env::IGNORE_EVENT, evt); } catch (...) { m << sev(Msg::E) << "Error closing event for " << datasetName << endm; } for (int i = 0; i < locks.size(); i++) delete locks[i]; m << sev(Msg::I) << "Completed processing request for " << datasetName << endm; } // process_file_event ////////////////////////////////////////////////////////////////////////////// // // Name: process_halt_event // // Purpose: // This is called for each halt event encountered. // // Description: // When this process receives a 'halt' event, it just exits. // // Returns: // None // // Exceptions thrown: // None // ////////////////////////////////////////////////////////////////////////////// void CxxPoller::process_halt_event(const string& title, Event* evt, const Opus_env& opus) { Msg m; m << sev(Msg::I) << "Exiting for halt event." << endm; exit(0); } ////////////////////////////////////////////////////////////////////////////// // // Name: process_dataset // // Purpose: // Main routine that does the actual CxxPoller processing. // // Returns: // none // // Exceptions thrown: // Severe : if anything fails during processing // ////////////////////////////////////////////////////////////////////////////// void CxxPoller::process_dataset(const Opus_env& opus, const string& datasetName, const bool datasetIsAbsolutePath) { Msg m; // Get the INPATH resource, as an example resource value. Say this is // the path to our data. string inpath; try { inpath = opus.get_res_item("INPATH"); } catch(No_entry ) { m << sev(Msg::W) << "No INPATH value found in resource file." << endm; inpath = "."; } // Find the path to the dataset itself. string fullpath = datasetName; if (!datasetIsAbsolutePath) fullpath = inpath + "/" + datasetName; // Let's say that our process simply does a system call 'ls' on // the dataset name, which it assumes to be a filename. Your internal // poller can do anything here, and the dataset name need not be a // filename - it can represent anything depending on how your pipeline // is designed. string cmd = "ls -ld " + fullpath + "*"; m << "Running command: " << cmd << endm; if (0 != system(cmd.c_str())) { m << sev(Msg::E) << "Error listing dataset: " << fullpath << endm; throw Severe ("Failure during c++poller processing."); } } // process_dataset
!-------------------------------------------------------------------- ! ! c++poller Resource File ! ! Example C++ Internal Poller using an OSF trigger ! !-------------------------------------------------------------------- PROCESS_NAME = c++poller TASK =DESCRIPTION = 'Example C++ Internal Poller' SYSTEM = SAMPLE CLASS = ALL ! OSF_RANK = 1 ! Sets this to use an OSF trigger OSF_TRIGGER1.KW = w ! when OSF column KW = "w" (waiting) OSF_PROCESSING.KW = p ! then set column KW to "p" when triggered C++POLLER_OK.KW = c ! then set column KW to "c" on success C++POLLER_ERROR.KW = e ! or to "e" if the process has a failure ! POLLING_TIME = 10 ! Wait (seconds) before polling for next MINBLOCKS = 2000 ! blocks required on output disk INPATH = input_data ! make INPATH available to process OUTPATH = input_data ! just throw it all in the same dir ERRPATH = input_data ! for this example ! ! And the path file should contain the following key-value pairs: ! ! STAGE_FILE = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage ! OPUS_OBSERVATIONS_DIR = (set this to your OSF directory) ! input_data = (set this to the location of input files)
###################################################################### # # c++poller.make # # Use this simplified make file (e.g: gmake -f c++poller.make) # to build the 'c++poller' example internal poller under: # # Solaris 2.8 (GCC 3.2), and # RH Linux 8.0 (GCC 3.2) # # Note to OPUS developers - this is intentionally not a part # of the normal OPUS build. It is intended to be built and # tested outside of the OPUS environmnt, as a Sample Pipeline # user would encounter it. # ###################################################################### # # !!! # EDIT THE FOLLOWING LINE ! Set OPUS_DIST to your OPUS installation path OPUS_DIST = /usr/local/opus # !!! # BIN = c++poller OPUS_INCS = $(OPUS_DIST)/inc ifeq ($(shell uname),SunOS) C++FLAGS = -DITS_UNIX -DITS_SPARC_SOLARIS -DACE_HAS_EXCEPTIONS -pthreads OPUS_LIBS = $(OPUS_DIST)/lib/sparc_solaris else C++FLAGS = -DITS_UNIX -DITS_LINUX -DACE_HAS_EXCEPTIONS -pthread OPUS_LIBS = $(OPUS_DIST)/lib/linux endif COMMON = -g -I$(OPUS_INCS) -L$(OPUS_LIBS) \ -loapi -losfile -ldscp -losys -lstr -ltime_utils -ldes -lstcrypt -lerr_pkg \ -lACE -lTAO -lTAO_CosEvent -lTAO_CosNaming -lTAO_IORTable \ -lTAO_Svc_Utils -lTAO_PortableServer SRCS = CxxPoller.cpp $(BIN): $(SRCS) g++ $(C++FLAGS) $(COMMON) $? -o $@ clean: -rm -f $(BIN) *~ *.o *.a *.so
ODCL: +++ odcl_run_process.csh started +++ ODCL: ODCL: OPUS 6.1 ODCL: ODCL: Input parameters: ODCL: PROCESS_NAME = c++poller ODCL: PATH_FILE = /home/sontag/opus_test/definitions//quick.path ODCL: PATH_BASENAME = quick.path ODCL: PATH_BASEROOT = quick ODCL: TIME_STAMP = 3f71cf7b (date: 24-Sep-03 17:08:11) ODCL: PASSWORD? Yes ODCL: ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command) ODCL: ODCL: ODCL: Task to be run: c++poller (/home/sontag/bin/c++poller) ODCL: ODCL: ODCL: Creating process PSTAT...(odcl_create_psffile) ODCL: ODCL: ODCL: Running the process... ODCL: 2003267170831-I-INFO Starting c++poller processing. (1) 2003267170832-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1) 2003267170832-I-INFO Initialized OPUS environment. (1) 2003267170832-I-INFO Registered callbacks. (1) 2003267171242-I-INFO Processing OSF dataset: my_osf. Event #0. (1) 2003267171242-I-INFO Opus_env::get_res_item input_data substituted with path value /home/sontag/opus_test/quick/input/ (1) 2003267171242-I-INFO Running command: ls -ld /home/sontag/opus_test/quick/input//my_osf* (1) -rw-r--r-- 1 sontag 0 Sep 24 17:12 /home/sontag/opus_test/quick/input//my_osf_file.txt 2003267171242-I-INFO Completed processing request for my_osf (1) 2003267171723-I-INFO Halt event: halt registered (1) 2003267171723-I-INFO Exiting for halt event. (1) ODCL: ODCL: Process exited. Cleaning up...(odcl_cleanup) ODCL: