Files for Python Internal Poller Example


What does the Python script "pyintpoller.py" look like ?

#!/usr/bin/env python

import sys                  # access to command-line args
import string               # string functions
from omniORB import CORBA   # omniORBpy used to generate Python client stubs
import OPUS                 # client stubs
# the following imports are only for our example code in processDataset
import time
import os

#------------------------------------------------------------------------------
#
# An OPUS Python Internal Poller w/ resource value fetches; much applies
# to external pollers as well
#
# Note that, although some C++ internal pollers are able to be run
# independently (on the command-line), this example cannot, because
# it requires the stringified IOR to be passed to it by the OPUS servers.
# That is how it gains a PipelineAppContext which gives it access to OPUS
# resources, whereas C++ apps simply use an Opus_env object for this.
# Therefore this example can only be run via the PMG or the broker.
#
#------------------------------------------------------------------------------


#------------------------------------------------------------------------------
# The dataset processing routine - this is where the OPUS user's work is
# actually performed.  Put task-specific processing here.
#

def processDataset(datasetName, inputDir):
   print "Serious processing being done on: " + datasetName + "..."
   time.sleep(5)
   file = inputDir + os.sep + datasetName + "*"
   sys.stdout.flush()
   os.system("ls -l " + file) # okay, so its a little OS-specific


#------------------------------------------------------------------------------
# The main routine
#

# Initialize the CORBA orb (this is always the same)

orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)


# Need to grab the -i option from the command line: it has
# the object reference for the UserContext CORBA object.
# This is a new OPUS argument that is also forwarded to any commands
# executed by XPOLL, by the way.

ior = ""
for i in range(len(sys.argv)):
   if sys.argv[i] == "-i":
      ior = sys.argv[i+1]
      break
if (ior == ""):
   print "Improper invocation"
   sys.exit(1)

# Convert the stringified object reference to a CORBA object

obj = orb.string_to_object(ior)


# Narrow the object to its actual type (like a cast)

uc = obj._narrow(OPUS.OPUS_API.OPUSUSER.UserContext)
if uc == None:
   print "Narrow failed"
   sys.exit(1)


# Use the UserContext object to "register" this application with OPUS.
# Must do for internal pollers; xpoll does this for external pollers.

try:
   papp = uc.registerPipelineApp(sys.argv)
except:
   print("Failed to register application")
   sys.exit(1)


# As an example of fetching values from the process resource file,
# get all key/val pairs.  Just an example, this is not required.

try:
   res = papp.getResourceClass("*.*")
except:
   print("unable to get resource file contents")
   sys.exit(1)

# Print out resources

inputDir = "unknown"
print "\nHere are the key/val pairs from the process resource file:\n"
for pair in res:
   print pair.key, pair.val
   if (pair.key == "FILE_DIRECTORY1"):
      inputDir = pair.val
print "\n"


# Event-handling loop for internal pollers (only); unlike C++, there
# is no callback scheme for the events: you must determine the event
# type and handle it accordingly.

while 1:
   # Make sure that the output from the last event goes to the log right away
   sys.stdout.flush()

   # Poll for the next event
   try:
      event = papp.poll()
   except:
      print("Polling failed")
      sys.exit(1)

   # Handle the possible event types
   if event.theEventType == OPUS.OPUS_API.OPUSUSER.HALT:
      print "Received HALT event. Stopping."
      sys.exit(0) 

   elif event.theEventType == OPUS.OPUS_API.OPUSUSER.FILE:
      # If file contains "bad" in rootname, set FILE_ERROR;
      # otherwise set FILE_SUCCESS

      # Get config entry w/Field data
      bbData = event.bb.getState()
      rootnameID = -999
      for field in bbData.entryInfo:
         if field.value == "ROOTNAME":
            rootnameID = field.id
            break

      # Get dataset name from event trigger
      dataset = "unknown"
      for field in event.triggers[0]:
         if field.id == rootnameID:
            # get field.value, use strip to rm trailing whitespace padding
            dataset = field.value.strip()
            break

      print "----- Processing started for: " + dataset + " -----"

      # Check for "bad" in the dataset name, otherwise process it
      if string.find(dataset, 'bad') >= 0:
         print "Bad data in: " + dataset
         try:
            papp.closeEvent("FILE_ERROR", event.eid)
         except:
            print("Failed to close event")
      else:
         processDataset(dataset, inputDir)
         try:
            papp.closeEvent("FILE_SUCCESS", event.eid)
         except:
            print("Failed to close event")

      print "----- Processing completed for: " + dataset + " -----"

   else: # ignore other events
      print("Ignoring unhandled event type")
      try:
         papp.closeEvent("IGNEVT", event.eid)
      except:
         print("Failed to close event")


What does the resource file "pypoller.resource" look like ?

!--------------------------------------------------------------------
!
! An OPUS Python Internal Poller example with resource value fetches
! using a FILE trigger
!
!--------------------------------------------------------------------

PROCESS_NAME = pypoller
TASK = <pyintpoller.py -p $PATH_FILE -r pypoller>
DESCRIPTION = 'Example Python Internal Poller'
SYSTEM = SAMPLE
CLASS = ALL
!
FILE_RANK       = 1                  ! File Trigger
FILE_DIRECTORY1 = input_data         ! Path file indirection
FILE_OBJECT1    = *.try              ! File specification for searches
FILE_PROCESSING = _proc              ! Extension addition during processing
FILE_SUCCESS.DIRECTORY = success_dir ! Move to different directory on success
FILE_SUCCESS.DANGLE = _done          ! ...and change _proc to _done
FILE_ERROR.DIRECTORY = error_dir     ! Move to different directory on error
FILE_ERROR.DANGLE = _bad             ! ...and change _proc to _bad
!
POLLING_TIME  = 10           ! Wait (seconds) before polling for next
MINBLOCKS     = 2000         ! blocks required on output disk
OUTPATH       = success_dir  ! check for space in success directory
!
! And the path file should contain the following key-value pairs:
!
! STAGE_FILE             = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage
! OPUS_OBSERVATIONS_DIR  = (set this to your OSF directory)
! input_data             = (set this to the location of input files)
! success_dir            = (set this to the dir to copy files on success)
! error_dir              = (set this to the dir to copy files on error)
!

What does the output look like in the log file ?

ODCL: +++ odcl_run_process.csh started +++
ODCL:
ODCL: Pipeline Software Release OPUS xx.x SHARE x.x ********* dd MMM YYYY *********
ODCL:
ODCL: Input parameters:
ODCL:     PROCESS_NAME = pypoller
ODCL:     PATH_FILE    = /home/sontag/dev/dst/defs//quick.path
ODCL:     PATH_BASENAME = quick.path
ODCL:     PATH_BASEROOT = quick
ODCL:     TIME_STAMP   = 3d89fefb (date: 19-Sep-02 12:44:43)
ODCL:     PASSWORD? Yes
ODCL:
ODCL: Fetching TASK line from process resource file...(odcl_get_resource_command)
ODCL:
ODCL:
ODCL: Task to be run: pyintpoller.py (/home/sontag/bin/pyintpoller.py)
ODCL:
ODCL:
ODCL: Creating process PSTAT...(odcl_create_psffile)
ODCL:
ODCL: Running the process...

Here are the key/val pairs from the process resource file:

CLASS ALL
DESCRIPTION Example Python Internal Poller
FILE_DIRECTORY1 /home/sontag/opus_test/quick/input/
FILE_ERROR.DANGLE _bad
FILE_ERROR.DIRECTORY /home/sontag/opus_test/quick/error/
FILE_OBJECT1 *.try
FILE_PROCESSING _proc
FILE_RANK 1
FILE_SUCCESS.DANGLE _done
FILE_SUCCESS.DIRECTORY /home/sontag/opus_test/quick/input/
MINBLOCKS 2000
OUTPATH /home/sontag/opus_test/quick/input/
POLLING_TIME 10
PROCESS_NAME pypoller
SYSTEM SAMPLE
TASK <pyintpoller.py -p $PATH_FILE -r pypoller>

----- Processing started for: workfile1 -----
Serious processing being done on: workfile1...
-rw-r--r--   1 sontag   opus           0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile1.try_proc
----- Processing completed for: workfile1 -----
----- Processing started for: workfile2 -----
Serious processing being done on: workfile2...
-rw-r--r--   1 sontag   opus           0 Sep 18 16:26 /home/sontag/opus_test/quick/input//workfile2.try_proc
----- Processing completed for: workfile2 -----
Received HALT event. Stopping.
ODCL:
ODCL: Process exited. Cleaning up...(odcl_cleanup)
ODCL:

Back to the Top

Back to Example