Files for Python Internal Poller Example
What does the Python script "pyintpoller.py" look like ?
#!/usr/bin/env python
import sys # access to command-line args
import string # string functions
from omniORB import CORBA # omniORBpy used to generate Python client stubs
import OPUS # client stubs
# the following imports are only for our example code in processDataset
import time
import os
#------------------------------------------------------------------------------
#
# An OPUS Python Internal Poller w/ resource value fetches; much applies
# to external pollers as well
#
# Note that, although some C++ internal pollers are able to be run
# independently (on the command-line), this example cannot, because
# it requires the stringified IOR to be passed to it by the OPUS servers.
# That is how it gains a PipelineAppContext which gives it access to OPUS
# resources, whereas C++ apps simply use an Opus_env object for this.
# Therefore this example can only be run via the PMG or the broker.
#
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# The dataset processing routine - this is where the OPUS user's work is
# actually performed. Put task-specific processing here.
#
def processDataset(datasetName, inputDir):
print "Serious processing being done on: " + datasetName + "..."
time.sleep(5)
file = inputDir + os.sep + datasetName + "*"
sys.stdout.flush()
os.system("ls -l " + file) # okay, so its a little OS-specific
#------------------------------------------------------------------------------
# The main routine
#
# Initialize the CORBA orb (this is always the same)
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
# Need to grab the -i option from the command line: it has
# the object reference for the UserContext CORBA object.
# This is a new OPUS argument that is also forwarded to any commands
# executed by XPOLL, by the way.
ior = ""
for i in range(len(sys.argv)):
if sys.argv[i] == "-i":
ior = sys.argv[i+1]
break
if (ior == ""):
print "Improper invocation"
sys.exit(1)
# Convert the stringified object reference to a CORBA object
obj = orb.string_to_object(ior)
# Narrow the object to its actual type (like a cast)
uc = obj._narrow(OPUS.OPUS_API.OPUSUSER.UserContext)
if uc == None:
print "Narrow failed"
sys.exit(1)
# Use the UserContext object to "register" this application with OPUS.
# Must do for internal pollers; xpoll does this for external pollers.
try:
papp = uc.registerPipelineApp(sys.argv)
except:
print("Failed to register application")
sys.exit(1)
# As an example of fetching values from the process resource file,
# get all key/val pairs. Just an example, this is not required.
try:
res = papp.getResourceClass("*.*")
except:
print("unable to get resource file contents")
sys.exit(1)
# Print out resources
inputDir = "unknown"
print "\nHere are the key/val pairs from the process resource file:\n"
for pair in res:
print pair.key, pair.val
if (pair.key == "FILE_DIRECTORY1"):
inputDir = pair.val
print "\n"
# Event-handling loop for internal pollers (only); unlike C++, there
# is no callback scheme for the events: you must determine the event
# type and handle it accordingly.
while 1:
# Make sure that the output from the last event goes to the log right away
sys.stdout.flush()
# Poll for the next event
try:
event = papp.poll()
except:
print("Polling failed")
sys.exit(1)
# Handle the possible event types
if event.theEventType == OPUS.OPUS_API.OPUSUSER.HALT:
print "Received HALT event. Stopping."
sys.exit(0)
elif event.theEventType == OPUS.OPUS_API.OPUSUSER.FILE:
# If file contains "bad" in rootname, set FILE_ERROR;
# otherwise set FILE_SUCCESS
# Get config entry w/Field data
bbData = event.bb.getState()
rootnameID = -999
for field in bbData.entryInfo:
if field.value == "ROOTNAME":
rootnameID = field.id
break
# Get dataset name from event trigger
dataset = "unknown"
for field in event.triggers[0]:
if field.id == rootnameID:
# get field.value, use strip to rm trailing whitespace padding
dataset = field.value.strip()
break
print "----- Processing started for: " + dataset + " -----"
# Check for "bad" in the dataset name, otherwise process it
if string.find(dataset, 'bad') >= 0:
print "Bad data in: " + dataset
try:
papp.closeEvent("FILE_ERROR", event.eid)
except:
print("Failed to close event")
else:
processDataset(dataset, inputDir)
try:
papp.closeEvent("FILE_SUCCESS", event.eid)
except:
print("Failed to close event")
print "----- Processing completed for: " + dataset + " -----"
else: # ignore other events
print("Ignoring unhandled event type")
try:
papp.closeEvent("IGNEVT", event.eid)
except:
print("Failed to close event")
What does the resource file "pypoller.resource" look like ?
!--------------------------------------------------------------------
!
! An OPUS Python Internal Poller example with resource value fetches
! using a FILE trigger
!
!--------------------------------------------------------------------
PROCESS_NAME = pypoller
TASK = <pyintpoller.py -p $PATH_FILE -r pypoller>
DESCRIPTION = 'Example Python Internal Poller'
SYSTEM = SAMPLE
CLASS = ALL
!
FILE_RANK = 1 ! File Trigger
FILE_DIRECTORY1 = input_data ! Path file indirection
FILE_OBJECT1 = *.try ! File specification for searches
FILE_PROCESSING = _proc ! Extension addition during processing
FILE_SUCCESS.DIRECTORY = success_dir ! Move to different directory on success
FILE_SUCCESS.DANGLE = _done ! ...and change _proc to _done
FILE_ERROR.DIRECTORY = error_dir ! Move to different directory on error
FILE_ERROR.DANGLE = _bad ! ...and change _proc to _bad
!
POLLING_TIME = 10 ! Wait (seconds) before polling for next
MINBLOCKS = 2000 ! blocks required on output disk
OUTPATH = success_dir ! check for space in success directory
!
! And the path file should contain the following key-value pairs:
!
! STAGE_FILE = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage
! OPUS_OBSERVATIONS_DIR = (set this to your OSF directory)
! input_data = (set this to the location of input files)
! success_dir = (set this to the dir to copy files on success)
! error_dir = (set this to the dir to copy files on error)
!
What does the output look like in the log file ?
ODCL: +++ odcl_run_process.csh started +++
ODCL:
ODCL: Pipeline Software Release OPUS xx.x SHARE x.x ********* dd MMM YYYY *********
ODCL:
ODCL: Input parameters:
ODCL: PROCESS_NAME = pypoller
ODCL: PATH_FILE = /home/sontag/dev/dst/defs//quick.path
ODCL: PATH_BASENAME = quick.path
ODCL: PATH_BASEROOT = quick
ODCL: TIME_STAMP = 3d89fefb (date: 19-Sep-02 12:44:43)
ODCL: PASSWORD? Yes
ODCL:
ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command)
ODCL:
ODCL:
ODCL: Task to be run: pyintpoller.py (/home/sontag/bin/pyintpoller.py)
ODCL:
ODCL:
ODCL: Creating process PSTAT...(odcl_create_psffile)
ODCL:
ODCL: Running the process...
Here are the key/val pairs from the process resource file:
CLASS ALL
DESCRIPTION Example Python Internal Poller
FILE_DIRECTORY1 /home/sontag/opus_test/quick/input/
FILE_ERROR.DANGLE _bad
FILE_ERROR.DIRECTORY /home/sontag/opus_test/quick/error/
FILE_OBJECT1 *.try
FILE_PROCESSING _proc
FILE_RANK 1
FILE_SUCCESS.DANGLE _done
FILE_SUCCESS.DIRECTORY /home/sontag/opus_test/quick/input/
MINBLOCKS 2000
OUTPATH /home/sontag/opus_test/quick/input/
POLLING_TIME 10
PROCESS_NAME pypoller
SYSTEM SAMPLE
TASK <pyintpoller.py -p $PATH_FILE -r pypoller>
----- Processing started for: workfile1 -----
Serious processing being done on: workfile1...
-rw-r--r-- 1 sontag opus 0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile1.try_proc
----- Processing completed for: workfile1 -----
----- Processing started for: workfile2 -----
Serious processing being done on: workfile2...
-rw-r--r-- 1 sontag opus 0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile2.try_proc
----- Processing completed for: workfile2 -----
Received HALT event. Stopping.
ODCL:
ODCL: Process exited. Cleaning up...(odcl_cleanup)
ODCL: