#!/usr/bin/env python import sys # access to command-line args import string # string functions from omniORB import CORBA # omniORBpy used to generate Python client stubs import OPUS # client stubs # the following imports are only for our example code in processDataset import time import os #------------------------------------------------------------------------------ # # An OPUS Python Internal Poller w/ resource value fetches; much applies # to external pollers as well # # Note that, although some C++ internal pollers are able to be run # independently (on the command-line), this example cannot, because # it requires the stringified IOR to be passed to it by the OPUS servers. # That is how it gains a PipelineAppContext which gives it access to OPUS # resources, whereas C++ apps simply use an Opus_env object for this. # Therefore this example can only be run via the PMG or the broker. # #------------------------------------------------------------------------------ #------------------------------------------------------------------------------ # The dataset processing routine - this is where the OPUS user's work is # actually performed. Put task-specific processing here. # def processDataset(datasetName, inputDir): print "Serious processing being done on: " + datasetName + "..." time.sleep(5) file = inputDir + os.sep + datasetName + "*" sys.stdout.flush() os.system("ls -l " + file) # okay, so its a little OS-specific #------------------------------------------------------------------------------ # The main routine # # Initialize the CORBA orb (this is always the same) orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID) # Need to grab the -i option from the command line: it has # the object reference for the UserContext CORBA object. # This is a new OPUS argument that is also forwarded to any commands # executed by XPOLL, by the way. ior = "" for i in range(len(sys.argv)): if sys.argv[i] == "-i": ior = sys.argv[i+1] break if (ior == ""): print "Improper invocation" sys.exit(1) # Convert the stringified object reference to a CORBA object obj = orb.string_to_object(ior) # Narrow the object to its actual type (like a cast) uc = obj._narrow(OPUS.OPUS_API.OPUSUSER.UserContext) if uc == None: print "Narrow failed" sys.exit(1) # Use the UserContext object to "register" this application with OPUS. # Must do for internal pollers; xpoll does this for external pollers. try: papp = uc.registerPipelineApp(sys.argv) except: print("Failed to register application") sys.exit(1) # As an example of fetching values from the process resource file, # get all key/val pairs. Just an example, this is not required. try: res = papp.getResourceClass("*.*") except: print("unable to get resource file contents") sys.exit(1) # Print out resources inputDir = "unknown" print "\nHere are the key/val pairs from the process resource file:\n" for pair in res: print pair.key, pair.val if (pair.key == "FILE_DIRECTORY1"): inputDir = pair.val print "\n" # Event-handling loop for internal pollers (only); unlike C++, there # is no callback scheme for the events: you must determine the event # type and handle it accordingly. while 1: # Make sure that the output from the last event goes to the log right away sys.stdout.flush() # Poll for the next event try: event = papp.poll() except: print("Polling failed") sys.exit(1) # Handle the possible event types if event.theEventType == OPUS.OPUS_API.OPUSUSER.HALT: print "Received HALT event. Stopping." sys.exit(0) elif event.theEventType == OPUS.OPUS_API.OPUSUSER.FILE: # If file contains "bad" in rootname, set FILE_ERROR; # otherwise set FILE_SUCCESS # Get config entry w/Field data bbData = event.bb.getState() rootnameID = -999 for field in bbData.entryInfo: if field.value == "ROOTNAME": rootnameID = field.id break # Get dataset name from event trigger dataset = "unknown" for field in event.triggers[0]: if field.id == rootnameID: # get field.value, use strip to rm trailing whitespace padding dataset = field.value.strip() break print "----- Processing started for: " + dataset + " -----" # Check for "bad" in the dataset name, otherwise process it if string.find(dataset, 'bad') >= 0: print "Bad data in: " + dataset try: papp.closeEvent("FILE_ERROR", event.eid) except: print("Failed to close event") else: processDataset(dataset, inputDir) try: papp.closeEvent("FILE_SUCCESS", event.eid) except: print("Failed to close event") print "----- Processing completed for: " + dataset + " -----" else: # ignore other events print("Ignoring unhandled event type") try: papp.closeEvent("IGNEVT", event.eid) except: print("Failed to close event")
!-------------------------------------------------------------------- ! ! An OPUS Python Internal Poller example with resource value fetches ! using a FILE trigger ! !-------------------------------------------------------------------- PROCESS_NAME = pypoller TASK = <pyintpoller.py -p $PATH_FILE -r pypoller> DESCRIPTION = 'Example Python Internal Poller' SYSTEM = SAMPLE CLASS = ALL ! FILE_RANK = 1 ! File Trigger FILE_DIRECTORY1 = input_data ! Path file indirection FILE_OBJECT1 = *.try ! File specification for searches FILE_PROCESSING = _proc ! Extension addition during processing FILE_SUCCESS.DIRECTORY = success_dir ! Move to different directory on success FILE_SUCCESS.DANGLE = _done ! ...and change _proc to _done FILE_ERROR.DIRECTORY = error_dir ! Move to different directory on error FILE_ERROR.DANGLE = _bad ! ...and change _proc to _bad ! POLLING_TIME = 10 ! Wait (seconds) before polling for next MINBLOCKS = 2000 ! blocks required on output disk OUTPATH = success_dir ! check for space in success directory ! ! And the path file should contain the following key-value pairs: ! ! STAGE_FILE = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage ! OPUS_OBSERVATIONS_DIR = (set this to your OSF directory) ! input_data = (set this to the location of input files) ! success_dir = (set this to the dir to copy files on success) ! error_dir = (set this to the dir to copy files on error) !
ODCL: +++ odcl_run_process.csh started +++ ODCL: ODCL: Pipeline Software Release OPUS xx.x SHARE x.x ********* dd MMM YYYY ********* ODCL: ODCL: Input parameters: ODCL: PROCESS_NAME = pypoller ODCL: PATH_FILE = /home/sontag/dev/dst/defs//quick.path ODCL: PATH_BASENAME = quick.path ODCL: PATH_BASEROOT = quick ODCL: TIME_STAMP = 3d89fefb (date: 19-Sep-02 12:44:43) ODCL: PASSWORD? Yes ODCL: ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command) ODCL: ODCL: ODCL: Task to be run: pyintpoller.py (/home/sontag/bin/pyintpoller.py) ODCL: ODCL: ODCL: Creating process PSTAT...(odcl_create_psffile) ODCL: ODCL: Running the process... Here are the key/val pairs from the process resource file: CLASS ALL DESCRIPTION Example Python Internal Poller FILE_DIRECTORY1 /home/sontag/opus_test/quick/input/ FILE_ERROR.DANGLE _bad FILE_ERROR.DIRECTORY /home/sontag/opus_test/quick/error/ FILE_OBJECT1 *.try FILE_PROCESSING _proc FILE_RANK 1 FILE_SUCCESS.DANGLE _done FILE_SUCCESS.DIRECTORY /home/sontag/opus_test/quick/input/ MINBLOCKS 2000 OUTPATH /home/sontag/opus_test/quick/input/ POLLING_TIME 10 PROCESS_NAME pypoller SYSTEM SAMPLE TASK <pyintpoller.py -p $PATH_FILE -r pypoller> ----- Processing started for: workfile1 ----- Serious processing being done on: workfile1... -rw-r--r-- 1 sontag opus 0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile1.try_proc ----- Processing completed for: workfile1 ----- ----- Processing started for: workfile2 ----- Serious processing being done on: workfile2... -rw-r--r-- 1 sontag opus 0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile2.try_proc ----- Processing completed for: workfile2 ----- Received HALT event. Stopping. ODCL: ODCL: Process exited. Cleaning up...(odcl_cleanup) ODCL: