Files for Java Internal Poller Example


What does the Java code "JIntPoller.java" look like ?

[Suggestion: Do not cut-paste this directly from a web browser, since some of the code characters may not correctly show up (e.g. the "less-than" operator). Rather, using an editor, grab this section of the html file itself, and save it as JIntPoller.java.]

package OPUS.UTIL;

import OPUS.OPUS_API.*;          // CORBA OAPI support
import OPUS.OPUS_API.BB.*;       // CORBA Blackboard
import OPUS.OPUS_API.OPUSUSER.*; // CORBA application services
import OPUS.UTIL.Msg;            // Message reporting

//////////////////////////////////////////////////////////////////////////
//
// An OPUS Java Internal Poller w/ message reporting & resource value
// fetches using an OSF trigger.
//
// The process resource file for this application is decribed in the
// OPUS FAQ under "OPUS Applications" -> "Can I create an internal poller
// in a language besides C++ ?" -> "Java"
//
// Note that, although some C++ internal pollers are able to be run
// independently (on the command-line), this example cannot, because
// it requires the stringified IOR to be passed to it by an OPUS server.
// That is how it gains a PipelineAppContext which gives it access to OPUS
// resources, whereas C++ apps simply use an Opus_env object for this.
// Therefore this example can only be run via the PMG or the broker.
//
// Modification History:
//
// Date      PR       Who         Reason
// --------- -------- ----------- ----------------------------------------
// 09/20/02  46438    Sontag      First version, based on OAPI doc
// 02/12/04  49903    Sontag      Add use of Event.eventName
// 07/28/04  49860    Sontag      Rename member for tao_idl keyword clash
// 01/21/05  52462    Sontag      Use a 100% Java message class
//
//////////////////////////////////////////////////////////////////////////


class JIntPoller
{
   //
   // The dataset processing routine - this is where the OPUS user's work is
   // actually performed.  Put task-specific processing here.
   //

   private static void processDataset(final String dataset) throws Exception
   {
      Msg msgObj = Msg.getInstance();
      msgObj.print("Serious processing being done on: " + dataset + "...");

      Thread.sleep(5000); // 5 secs
   }


   //
   // The main routine.
   //

   public static void main(String[] args)
   {
      // Get the message reporting object.  It is a singleton, so the
      // static method getInstance() is the only way to obtain a
      // reference (ie., no constructors are public).

      Msg msgObj = Msg.getInstance();

      // Must handle message report level manually (at least for now)

      msgObj.setReportLevel(Msg.ReportLevel.ALL);

      // Initialize the CORBA orb (this is always the same)

      org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);

      // Need to grab the -i option from the command line: it has
      // the object reference for the UserContext CORBA object.

      String ior = null;
      for(int i = 0; i < args.length; ++i)
      {
         if (args[i].equals("-i"))
         {
            ior = args[++i];
            break;
         }
      }

      if (ior == null)
      {
         msgObj.print(Msg.Severity.E, Msg.Type.MISSING,
            "IOR not found on command-line");
         System.exit(0);
      }

      // Convert the stringified object reference to a CORBA object

      org.omg.CORBA.Object obj = orb.string_to_object(ior);

      // Narrow the object to its actual type (like a cast)

      UserContext uc = UserContextHelper.narrow(obj);
      if (obj == null)
      {
         msgObj.print(Msg.Severity.E, Msg.Type.MISSING,
            "IOR not found on command-line");
         System.exit(0);
      }

      // Use the UserContext object to "register" this application with OPUS.
      // Must do for internal pollers; xpoll does this for external pollers.

      String[] cargs = new String[args.length + 1];
      cargs[0] = "JIntPoller"; // note you must create the C-like 0th
                               // argument for registerPipelineApp()
      System.arraycopy(args, 0, cargs, 1, args.length);
      PipelineAppContext papp = null;

      try
      {
         msgObj.print("About to register pipeline application...");
         papp = uc.registerPipelineApp(cargs);
         msgObj.print("Registered successfully");
      }
      catch(Exception e)
      {
         msgObj.print(Msg.Severity.E, Msg.Type.SEVERE,
            "Attempt to register application with UserContext failed:\n" +
            e.toString());
         System.exit(0);
      }

      // As an example of fetching values from the process resource file,
      // get all key/val pairs.  Just an example, this is not required.

      KeyValPair[] res = null;
      try
      {
         res = papp.getResourceClass("*.*"); // or, e.g. "OSF_SUCCESS."
      }
      catch(Exception e)
      {
         msgObj.print(Msg.Severity.E, Msg.Type.FIO,
            "An error occured while attempting to fetch resource values");
         System.exit(0);
      }

      // Print out resources

      msgObj.print("The key/val pairs from the process resource file:\n");
      for (int i = 0; i < res.length; ++i)
      {
         msgObj.print(res[i].key + " = " + res[i].val);
      }
      msgObj.print("\n");

      // Event-handling loop for internal pollers only.  Unlike C++, there
      // is no callback scheme for the events. You must determine the event
      // type and handle it accordingly.

      Event anEvent = null;
      while (true)
      {
         try
         {
            anEvent = papp.poll();

            // Can print this diagnostic message:
            msgObj.print(Msg.Severity.D, Msg.Type.INFO,
                   "Got an event, id: " + anEvent.eid +
                   ", name: " + anEvent.eventName);
         }
         catch (Exception ex)
         {
            msgObj.print(Msg.Severity.E, Msg.Type.SEVERE,
               "An error occured during polling:\n" + ex.toString());
            System.exit(1);
         }

         //
         // Determine event type and handle appropriately
         //

         if (anEvent.theEventType == EVENT_TYPE.HALT)
         {
            msgObj.print("Received HALT event.  Stopping.");
            System.exit(0); // exit
         }
         // This Java example includes the code to handle both a FILE
         // trigger and an OSF trigger, though the OSF trigger section
         // farther below is the focus of this example.
         else if (anEvent.theEventType == EVENT_TYPE.FILE)
         {
            // If file contains "bad", set FILE_ERROR, otherwise, file success

            // Get special entry that contains fields with the
            // field names, sizes, id's from blackboard state
            BBStateInfo bbData = anEvent.bb.getState();

            // Locate ROOTNAME field
            int rootnameID = -999;
            int dirnameID = -999;
            for (int i = 0; i < bbData.entryInfo.length; ++i)
            {
               if (bbData.entryInfo[i].value.equals("ROOTNAME"))
               {
                  rootnameID = bbData.entryInfo[i].id;
                  break;
               }
            }
            for (int i = 0; i < bbData.entryInfo.length; ++i)
            {
               if (bbData.entryInfo[i].value.equals("DIRECTORY"))
               {
                  dirnameID = bbData.entryInfo[i].id;
                  break;
               }
            }

            if (rootnameID == -999)  // something wrong: not file entry
            {
               msgObj.print(Msg.Severity.E, Msg.Type.MISUSE,
                  "File event contains non-file entry!");
               System.exit(1);
            }
            if (dirnameID == -999)  // something wrong: not file entry
            {
               msgObj.print(Msg.Severity.E, Msg.Type.MISUSE,
                  "File event contains non-file entry!");
               System.exit(1);
            }
       
            // Get dataset name from event trigger
            String dataset = "unknown";
            for (int i = 0; i < anEvent.triggers[0].length; ++i)
            {
               Field[] ent = anEvent.triggers[0];
               if (ent[i].id == rootnameID)
               {
                  dataset = ent[i].value.trim(); // rm trailing whitespace pad
                  break;
               }
            }

            // Starting message
            msgObj.print("--- Processing started for: " + dataset + " ---");

            // Check for "bad" in the dataset name, otherwise process it
            try
            {
               if (dataset.indexOf("bad") >= 0)
               {
                  papp.closeEvent("FILE_ERROR", anEvent.eid);
               }
               else
               {
                  // Normal looking rootname, lets process the data
                  processDataset(dataset);

                  // Close event with success
                  papp.closeEvent("FILE_SUCCESS", anEvent.eid);
               }
            }
            catch(Exception ex)
            {
               msgObj.print(Msg.Severity.E, Msg.Type.EXEC,
                  "Error closing file event");
            }

            // Completion message
            msgObj.print("--- Processing completed for: " + dataset + " ---");
         }
         // Handle an OSF trigger, as in our example usage.
         else if (anEvent.theEventType == EVENT_TYPE.OSF)
         {
            // Get special entry that contains fields with the
            // field names, sizes, id's from blackboard state
            BBStateInfo bbData = anEvent.bb.getState();

            // Locate DATASET field
            int datasetID = -999;
            for (int i = 0; i < bbData.entryInfo.length; ++i)
            {
               if (bbData.entryInfo[i].value.equals("DATASET"))
               {
                  datasetID = bbData.entryInfo[i].id;
                  break;
               }
            }

            if (datasetID == -999)  // something wrong: not osf entry
            {
               msgObj.print(Msg.Severity.E, Msg.Type.MISUSE,
                  "OSF event contains incorrect entry!");
               System.exit(1);
            }
       
            // Get dataset name from event trigger
            String dataset = "unknown";
            for (int i = 0; i < anEvent.triggers[0].length; ++i)
            {
               Field[] ent = anEvent.triggers[0];
               if (ent[i].id == datasetID)
               {
                  dataset = ent[i].value;
                  // This value comes with a suffix of underscores
                  // which we remove to get the dataset name.
                  while (dataset.endsWith("_") && dataset.length() > 1)
                  {  // this is not the most efficient algorithm, but it
                     // serves the example well enough
                     dataset = dataset.substring(0, dataset.length()-1);
                  }
                  break;
               }
            }

            // Starting message
            msgObj.print("--- Processing started for: " + dataset + " ---");

            // Everything looks good, lets process the data
            boolean success = true;
            try
            {
               processDataset(dataset);
            }
            catch(Exception ex)
            {
               msgObj.print(Msg.Severity.E, Msg.Type.EXEC,
                  "Error processing dataset:\n" + ex.toString());
               success = false;
            }

            // Now close the event
            try
            {
               if (success)
                  papp.closeEvent("OSF_SUCCESS", anEvent.eid);
               else
                  papp.closeEvent("OSF_ERROR", anEvent.eid);
            }
            catch(Exception ex)
            {
               msgObj.print(Msg.Severity.E, Msg.Type.EXEC,
                  "Error closing osf event");
            }

            // Completion message
            msgObj.print("--- Processing completed for: " + dataset + " ---");
         }
         else // all others ignore
         {
            msgObj.print(Msg.Severity.W, Msg.Type.BADVAL,
               "Unhandled event type received; ignoring");
            try
            {
               papp.closeEvent("IGNEVT", anEvent.eid);
            }
            catch(Exception ex)
            {
               msgObj.print(Msg.Severity.E, Msg.Type.EXEC,
                  "Error closing unhandled event");
            }
         } // checks on theEventType

      } // event loop / while loop

   } // main()
}

What does the resource file "jpoller.resource" look like ?

!--------------------------------------------------------------------
!
! An OPUS Java Internal Poller example with resource value fetches,
! using an OSF trigger
!
!--------------------------------------------------------------------
!
PROCESS_NAME = jpoller
TASK = < java -classpath $OPUS_CLASSPATH OPUS.UTIL.JIntPoller -p $PATH_FILE -r jpoller -ssrl INFO >
DESCRIPTION = 'Example Java Internal Poller'
SYSTEM = SAMPLE
CLASS = ALL
!
OSF_RANK          = 1            ! Sets this to use an OSF trigger
OSF_TRIGGER1.KW   = w            ! when OSF column KW = "w" (waiting)
OSF_PROCESSING.KW = p            ! then set column KW to "p" when triggered
OSF_SUCCESS.KW    = c            ! then set column KW to "c" on success
OSF_SUCCESS.HD    = w            ! then set column HD to "w", to start it
OSF_ERROR.KW      = e            ! or to "e" if the process has a failure
!
POLLING_TIME      = 10           ! Wait (seconds) before polling for next
MINBLOCKS         = 2000         ! blocks required on output disk
INPATH            = input_data   ! make INPATH available to process
OUTPATH           = success_dir  ! check for space in success directory
ERRPATH           = error_dir    ! make ERRPATH available to process
!
! And the path file should contain the following key-value pairs:
!
! STAGE_FILE             = OPUS_DEFINITIONS_DIR:g2f_pipeline.stage
! OPUS_OBSERVATIONS_DIR  = (set this to your OSF directory)
! input_data             = (set this to the location of input files)
! success_dir            = (set this to the dir to copy files on success)
! error_dir              = (set this to the dir to copy files on error)
!

What does the output look like in the log file ?

OPUS: +++ opus_run_process.csh started +++
OPUS:
OPUS: Pipeline Software Release OPUS xx.x SHARE x.x ********* dd Mon YYYY *********
OPUS:
OPUS: Input parameters:
OPUS:     PROCESS_NAME = jpoller
ODCL:     PATH_FILE    = /home/sontag/opus_test/definitions//quick.path
ODCL:     PATH_BASENAME = quick.path
ODCL:     PATH_BASEROOT = quick
OPUS:     TIME_STAMP   = 41f1328f (date: 21-Jan-05 16:49:19)
OPUS:     PASSWORD? No
OPUS:
OPUS: Fetching TASK line from process resource file...(odcl_get_resource_command)
OPUS:
OPUS:
OPUS: Task to be run: java (/usr/local/j2sdk1.4.2_03/bin/sparcv9/java)
OPUS:
OPUS:
OPUS: Creating process PSTAT...(odcl_create_psffile)
OPUS:
OPUS:
OPUS: Running the process...
OPUS:
2005021164938-I-INFO About to register pipeline application...
2005021164938-I-INFO Registered successfully
2005021164938-I-INFO The key/val pairs from the process resource file:

2005021164938-I-INFO CLASS = ALL
2005021164938-I-INFO DESCRIPTION = Example Java Internal Poller
2005021164938-I-INFO ERRPATH = /home/sontag/opus_test/quick/error/
2005021164938-I-INFO INPATH = /home/sontag/opus_test/quick/input/
2005021164938-I-INFO MINBLOCKS = 2000
2005021164938-I-INFO OSF_ERROR.KW = e
2005021164938-I-INFO OSF_PROCESSING.KW = p
2005021164938-I-INFO OSF_RANK = 1
2005021164938-I-INFO OSF_SUCCESS.HD = w
2005021164938-I-INFO OSF_SUCCESS.KW = c
2005021164938-I-INFO OSF_TRIGGER1.KW = w
2005021164938-I-INFO OUTPATH = /home/sontag/opus_test/quick/input/
2005021164938-I-INFO POLLING_TIME = 10
2005021164938-I-INFO PROCESS_NAME = jpoller
2005021164938-I-INFO SYSTEM = SAMPLE
2005021164938-I-INFO TASK = < java -classpath $OPUS_CLASSPATH OPUS.UTIL.JIntPoller -p $PATH_FILE -r jpoller -ssrl INFO >
2005021164938-I-INFO 

2005021183049-D-INFO Got an event, id: 1, name: OSF_TRIGGER1
2005021183049-I-INFO --- Processing started for: my_osf ---
2005021183049-I-INFO Serious processing being done on: my_osf...
2005021183054-I-INFO --- Processing completed for: my_osf ---
2005021183134-D-INFO Got an event, id: 2, name: halt
2005021183134-I-INFO Received HALT event.  Stopping.
OPUS:
OPUS: Process exited. Cleaning up...(odcl_cleanup)
OPUS:

Back to the Top

Back to Example