Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/HowTo/How to write a Pipelet"

(Implementation)
m (Typo only)
Line 20: Line 20:
 
* Create a class that implements the interface <tt>org.eclipse.smila.processing.Pipelet</tt> and make sure that the class has a public no-argument constructor.
 
* Create a class that implements the interface <tt>org.eclipse.smila.processing.Pipelet</tt> and make sure that the class has a public no-argument constructor.
  
* Implement <tt>void configure(AnyMap configuration)</tt>. This method is called prior to process. Here you can read the configuration provided for the pipelet in the [[SMILA/Glossary#P|pipeline]]. To share those properties store the whole configuration in a member variable or better check the configuration for validity and completeness and then store the settings in a member variable. You can later access the configuration with the ParameterAccessor (see code sample below). The advantage of using the parameter accessor is that you can also override cour configuration with configuration parameters set in the single records (e.g. the task parameters are included in the record by <tt>PipelineProcessingWorker</tt>, so you could override the pipelets configuration with job parameters as long as you use the <tt>ParameterAccessor</tt> to access parameters in the <tt>process</tt> method).
+
* Implement <tt>void configure(AnyMap configuration)</tt>. This method is called prior to process. Here you can read the configuration provided for the pipelet in the [[SMILA/Glossary#P|pipeline]]. To share those properties store the whole configuration in a member variable or better check the configuration for validity and completeness and then store the settings in a member variable. You can later access the configuration with the ParameterAccessor (see code sample below). The advantage of using the parameter accessor is that you can also override your configuration with configuration parameters set in the single records (e.g. the task parameters are included in the record by <tt>PipelineProcessingWorker</tt>, so you could override the pipelets configuration with job parameters as long as you use the <tt>ParameterAccessor</tt> to access parameters in the <tt>process</tt> method).
  
 
* Implement <tt>String[] process(Blackboard blackboard, String[] recordIds)</tt>. Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs. But it is possible to return another list of IDs as result. Use the <tt>ResultCollector.addResult(...)</tt> to collect the ids you want to return and <tt>ResultCollector.addFailedResult(...)</tt> to control the error handling behaviour of your pipelet. When creating the ResultCollector you have to determine if the Collector should drop the ids that caused errors from the result id list it creates (using <tt>resultCollector.getResultIds()</tt>)or if these ids should still be part of the result id set. The ResultCollector will also control exception handling for you when setting failed record ids (depending on the existence and value of the parameter <tt>_failOnError</tt>, default is <tt>false</tt>). If <tt>_failOnError</tt> is set to true the ResultCollector will throw a ProcessingException, if not, the error will just be logged (on how to use the ResultCollector see the code sample below).
 
* Implement <tt>String[] process(Blackboard blackboard, String[] recordIds)</tt>. Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs. But it is possible to return another list of IDs as result. Use the <tt>ResultCollector.addResult(...)</tt> to collect the ids you want to return and <tt>ResultCollector.addFailedResult(...)</tt> to control the error handling behaviour of your pipelet. When creating the ResultCollector you have to determine if the Collector should drop the ids that caused errors from the result id list it creates (using <tt>resultCollector.getResultIds()</tt>)or if these ids should still be part of the result id set. The ResultCollector will also control exception handling for you when setting failed record ids (depending on the existence and value of the parameter <tt>_failOnError</tt>, default is <tt>false</tt>). If <tt>_failOnError</tt> is set to true the ResultCollector will throw a ProcessingException, if not, the error will just be logged (on how to use the ResultCollector see the code sample below).

Revision as of 04:49, 20 September 2011

This page describes how to implement and configure your own pipelet in case you wish to add functionality to SMILA.

What are pipelets?

Pipelets are not standalone services, but their lifecycle and configuration is managed by the workflow engine. Each occurrence of a pipelet in a workflow uses a different pipelet instance. For more information take a look at Pipelets.

Before writing your own pipelet we recommend you to take a look at the HelloWorldPipelet.java. This pipelet is an example of a very simple processing pipelet and can be used as a template for your pipelets.

Implementation

Follow these instructions to implement a pipelet in SMILA:

  • In the MANIFEST.MF, add at least these as "Imported Packages" (of course, you will need more to develop your pipelet, depending on what you want to do):
    • org.eclipse.smila.blackboard
    • org.eclipse.smila.datamodel
    • org.eclipse.smila.processing
    • org.eclipse.smila.processing.parameters (to use ParameterAccessor)
    • org.eclipse.smila.processing.util (to use ResultCollector)
  • Create a class that implements the interface org.eclipse.smila.processing.Pipelet and make sure that the class has a public no-argument constructor.
  • Implement void configure(AnyMap configuration). This method is called prior to process. Here you can read the configuration provided for the pipelet in the pipeline. To share those properties store the whole configuration in a member variable or better check the configuration for validity and completeness and then store the settings in a member variable. You can later access the configuration with the ParameterAccessor (see code sample below). The advantage of using the parameter accessor is that you can also override your configuration with configuration parameters set in the single records (e.g. the task parameters are included in the record by PipelineProcessingWorker, so you could override the pipelets configuration with job parameters as long as you use the ParameterAccessor to access parameters in the process method).
  • Implement String[] process(Blackboard blackboard, String[] recordIds). Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs. But it is possible to return another list of IDs as result. Use the ResultCollector.addResult(...) to collect the ids you want to return and ResultCollector.addFailedResult(...) to control the error handling behaviour of your pipelet. When creating the ResultCollector you have to determine if the Collector should drop the ids that caused errors from the result id list it creates (using resultCollector.getResultIds())or if these ids should still be part of the result id set. The ResultCollector will also control exception handling for you when setting failed record ids (depending on the existence and value of the parameter _failOnError, default is false). If _failOnError is set to true the ResultCollector will throw a ProcessingException, if not, the error will just be logged (on how to use the ResultCollector see the code sample below).
  • Register your class in META-INF/MANIFEST.MF of the providing bundle using the header name "SMILA-Pipelets". Then they can be detected by the PipeletTracker service. If you would like to register multiple classes, separate them with commas.
  • Consider thread-safe-ness. Because pipelets may be accessed by multiple threads, make sure that access to member variables (e.g. the configuration) is read-only. For best practices: use local variables instead of member variables if possible.

further reading

For more information on the ResultCollector see ResultCollector

Configuration

If your pipelet requires a configuration:

  • Add a <configuration> element to the <extensionActivity> section of your pipelet in the BPEL pipeline.

Examples

Pipelet Usage

This is a template for MyPipelet.java:

package org.eclipse.smila.mypackage

import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.datamodel.AnyMap;
 
public class MyPipelet implements Pipelet {
 
  public void configure(AnyMap configuration) throws ProcessingException {
    // read the configuration properties
  }
 
  public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
    // process the records and create a result
  }
}

And this is how to register the pipelet class in the bundle's manifest MANIFEST.MF:

...
SMILA-Pipelets: org.eclipse.smila.mypackage.MyPipelet
...

And finally, this is a sample showing how a pipelet is invoked in the BPEL pipeline using an <extensionActivity>. It also shows how the pipelet is configured using a <configuration>.

...
<extensionActivity>
    <proc:invokePipelet name="invokeMyPipelet">
        <proc:pipelet class="org.eclipse.smila.mypackage.MyPipelet" />
        <proc:variables input="request" output="request" />
        <proc:configuration>
            <rec:Val key="aStringParam">some value</rec:Val>
            <rec:Val key="aDateParam" type="datetime">2008-06-11T16:08:00.000+0200</rec:Val>
        </proc:configuration>       
    </proc:invokePipelet>
</extensionActivity>
...

Piplet configuration usage

The following example shows the usage of multiple values for properties:

...
<extensionActivity>
  <proc:invokePipelet name="addValuesToNonExistingAttribute">
    <proc:pipelet class="org.eclipse.smila.processing.pipelets.AddValuesPipelet" />
    <proc:variables input="request" output="request"/>
    <proc:configuration>
      <rec:Val key="outputAttribute">out</rec:Val>
      <rec:Seq key="valuesToAdd">
        <rec:Val>value1</rec:Val>
        <rec:Val>value2</rec:Val>
      </rec:Seq>
    </proc:configuration>
  </proc:invokePipelet>
</extensionActivity>
...
public class AddValuesPipelet implements Pipelet {
  /** parameter for source attribute name. */
  private static final String SOURCE_ATT = "source_attribute_name";
 
  /** parameter for target attribute name. */
  private static final String TARGET_ATT = "target_attribute_name";
 
  /** the pipelet's configuration. */
  private AnyMap _config;
 
  /** local logger. */
  private final Log _log = LogFactory.getLog(getClass());
 
  /** {@inheritDoc} */
  @Override
  public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException {
    final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _config);
    final ResultCollector resultCollector =
      new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
    for (final String id : recordIds) {
      try {
        paramAccessor.setCurrentRecord(id);
        String inValue = "";
        String outValue = "";
        // get parameter either from record or configuration via parameter accessor.
        final String sourceAttr = paramAccessor.getRequiredParameter(SOURCE_ATT);
        final String targetAttr = paramAccessor.getRequiredParameter(TARGET_ATT);
 
        if (blackboard.getMetadata(id).containsKey(sourceAttr)) {
          inValue = blackboard.getMetadata(id).getStringValue(sourceAttr);
        }
 
        outValue = inValue + " --- Hello world!!!";
 
        final Value outLiteral = blackboard.getDataFactory().createStringValue(outValue);
        blackboard.getMetadata(id).put(targetAttr, outLiteral);
        resultCollector.addResult(id);
      } catch (final Exception e) {
        resultCollector.addFailedResult(id, e);
      }
    }
    return resultCollector.getResultIds();
  }
 
  /** {@inheritDoc} */
  @Override
  public void configure(final AnyMap config) throws ProcessingException {
    _config = config;
  }
}