Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/HowTo/How to write a Pipelet"

(Pipelet Usage)
(Implementation)
(44 intermediate revisions by 7 users not shown)
Line 4: Line 4:
 
Pipelets are not standalone services, but their lifecycle and configuration is managed by the workflow engine. Each occurrence of a pipelet in a workflow uses a different pipelet instance. For more information  take a look at [[SMILA/Documentation/Pipelets|Pipelets]].
 
Pipelets are not standalone services, but their lifecycle and configuration is managed by the workflow engine. Each occurrence of a pipelet in a workflow uses a different pipelet instance. For more information  take a look at [[SMILA/Documentation/Pipelets|Pipelets]].
  
'''Before writing your own pipelet we recommend you to take a look at the''' [http://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/core/org.eclipse.smila.integration.helloworld/ HelloWorldPipelet]. This pipelet is an example of a very simple processing pipelet and can be used as a template for your pipelets.
+
'''Before writing your own pipelet we recommend you to take a look at the '''[https://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/examples/org.eclipse.smila.integration.pipelet/code/src/org/eclipse/smila/integration/pipelet/HelloWorldPipelet.java HelloWorldPipelet.java]. This pipelet is an example of a very simple processing pipelet and can be used as a template for your pipelets. You can also get it by exporting the [http://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/examples example bundles] from SMILA's repository and import it into your SMILA workspace and use them as templates:
 +
* [http://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/examples/org.eclipse.smila.integration.pipelet org.eclipse.smila.integration.pipelet]: template bundle for pipelet development
 +
You can also download these examples from the [http://www.eclipse.org/smila/downloads.php release downloads] or the [http://build.eclipse.org/rt/smila/nightly/ nightly build downloads].
  
 
== Implementation ==
 
== Implementation ==
 
Follow these instructions to implement a pipelet in SMILA:
 
Follow these instructions to implement a pipelet in SMILA:
  
* If needed, create a new plugin project. You can add multiple pipelets to a single project. See [http://wiki.eclipse.org/SMILA/Development_Guidelines#Creation_of_bundles] for details.
+
* If needed, create a new plugin project. You can add multiple pipelets to a single project. See [http://wiki.eclipse.org/SMILA/Development_Guidelines#Creation_of_bundles Development Guidelines: Bundle creation] for details.
  
 
* In the MANIFEST.MF, add at least these as "Imported Packages" (of course, you will need more to develop your pipelet, depending on what you want to do):
 
* In the MANIFEST.MF, add at least these as "Imported Packages" (of course, you will need more to develop your pipelet, depending on what you want to do):
Line 15: Line 17:
 
** <tt>org.eclipse.smila.datamodel</tt>
 
** <tt>org.eclipse.smila.datamodel</tt>
 
** <tt>org.eclipse.smila.processing</tt>
 
** <tt>org.eclipse.smila.processing</tt>
 +
** <tt>org.eclipse.smila.utils</tt>
 +
** <tt>org.eclipse.smila.processing.parameters</tt> (to use <tt>ParameterAccessor</tt>)
 +
** <tt>org.eclipse.smila.processing.util</tt> (to use <tt>ResultCollector</tt>)
  
* Create a class that implements the interface <tt>org.eclipse.smila.processing.Pipelet</tt> and make sure that the class has a public no-argument constructor.
+
* Create a class that implements the interface <tt>org.eclipse.smila.processing.Pipelet</tt> and make sure that the class has a public no-argument constructor (or none at all).
  
* Implement <tt>void configure(AnyMap configuration)</tt>. This method is called prior to process. Here you can read the configuration provided for the pipelet in the [[SMILA/Glossary#P|pipeline]]. To share those properties either store the whole configuration in a member variable or better check the configuration for validity and completeness and store the settings in separate member variables.
+
* Implement <tt>void configure(AnyMap configuration)</tt>. This method is called prior to process. Here you can read the configuration provided for the pipelet in the [[SMILA/Glossary#P|pipeline]]. To share those properties store the whole configuration in a member variable or better check the configuration for validity and completeness and then store the settings in a member variable. You can later access the configuration with the ParameterAccessor (see code sample below). The advantage of using the parameter accessor is that you can also override your configuration with configuration parameters set in the single records (e.g. the task parameters are included in the record by <tt>PipelineProcessorWorker</tt>, so you could override the pipelets configuration with job parameters as long as you use the <tt>ParameterAccessor</tt> to access parameters in the <tt>process</tt> method).
 +
{{Tip|Do not access OSGi Services in configure()
 +
You should not try to use OSGi services in the <tt>configure()</tt> method, because during startup of SMILA <tt>configure()</tt> is called immediately during initialization of the BPEL engine, and it is easily possible that the required service has not been activated at that time, and if <tt>configure()</tt> throws an exception because it cannot lookup the required service, the pipeline will not be deployed and cannot be executed. It's better to lookup the services later in the <tt>process()</tt> method, because all services should be available then.}}
 +
* Implement <tt>String[] process(Blackboard blackboard, String[] recordIds)</tt>. Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs. But it is possible to return another list of IDs as result. Use the <tt>ResultCollector.addResult(...)</tt> to collect the ids you want to return and <tt>ResultCollector.addFailedResult(...)</tt> to control the error handling behaviour of your pipelet. When creating the ResultCollector you have to determine if the Collector should drop the ids that caused errors from the result id list it creates (using <tt>resultCollector.getResultIds()</tt>)or if these ids should still be part of the result id set. The ResultCollector will also control exception handling for you when setting failed record ids (depending on the existence and value of the parameter <tt>_failOnError</tt>, default is <tt>false</tt>). If <tt>_failOnError</tt> is set to true the ResultCollector will throw a ProcessingException, if not, the error will just be logged (on how to use the ResultCollector see the code sample below).
 +
* Create a pipelet description json file for your pipelet in the <tt>SMILA-INF</tt> folder of the providing bundle. The file name is arbitrary but must have ".json" extension and the file content must contain at least the class name of the pipelet (<tt>{"class" : "your.class.name.here"}</tt>). Then they can be detected by the <tt>PipeletTracker</tt> service. If you would like to register multiple classes, use one separate description file for each class. Don't forget to add the <tt>SMILA-INF</tt> folder to the <tt>bin.includes</tt> entry of the bundle's <tt>build.properties</tt> file.
  
* Implement <tt>String[] process(Blackboard blackboard, String[] recordIds)</tt>. Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs, so just return it. But it is possible to return another list of IDs as result.
+
* Consider thread-safe-ness. Because pipelets may be accessed by multiple threads, make sure that access to member variables (e.g. the configuration) is read-only. For best practices: use local variables instead of member variables if possible.
  
* Register your class in <tt>META-INF/MANIFEST.MF</tt> of the providing bundle using the header name "<tt>SMILA-Pipelets</tt>". Then they can be detected by the <tt>PipeletTracker</tt> service. If you would like to register multiple classes, separate them with commas.
+
=== Further reading ===
 
+
For more information on the <tt>ResultCollector</tt> see [http://build.eclipse.org/rt/smila/javadoc/current/org/eclipse/smila/processing/util/ResultCollector.html JavaDoc].
* Consider thread-safe-ness. Because pipelets may be accessed by multiple threads, make sure that access to member variables (e.g. the configuration) is read-only. For best practices: use local variables instead of member variables if possible.
+
  
 
== Configuration ==
 
== Configuration ==
Line 45: Line 53:
  
 
public class MyPipelet implements Pipelet {
 
public class MyPipelet implements Pipelet {
 
  public MyPipelet() {
 
  }
 
  
 
   public void configure(AnyMap configuration) throws ProcessingException {
 
   public void configure(AnyMap configuration) throws ProcessingException {
Line 59: Line 64:
 
</source>
 
</source>
  
And this is how to register the pipelet class in the bundle's manifest <tt>MANIFEST.MF</tt>:
+
And this is how to register the pipelet class in the bundle:
 +
Create a folder <tt>SMILA-INF</tt> in the bundle and add a file <tt>MyPipelet.json</tt> to this folder:
  
 
<pre>
 
<pre>
...
+
{
SMILA-Pipelets: org.eclipse.smila.mypackage.MyPipelet
+
  "class": "org.eclipse.smila.mypackage.MyPipelet",
...
+
  "parameters": [
 +
    <put your parameter descriptions here>
 +
  ],
 +
  "description": "The textual description of my pipelet. This is optional but good style."
 +
}
 
</pre>
 
</pre>
  
And finally, this is a sample showing how a pipelet is invoked in the BPEL pipeline using an <tt>&lt;extensionActivity&gt;</tt>. It also shows how the pipelet is configured using a <tt>&lt;configuration&gt;</tt>.
+
{{note|
 +
The description as well as the parameters section is optional but gives users a hint on which parameters you pipelet uses and what they mean as well as what your pipelet does. The parameters section follows the same restrictions as the workers' parameter descriptions, see [[SMILA/Documentation/ParameterDefinition]] for details.}}
 +
 
 +
Now add the folder <tt>SMILA-INF</tt> to the build.properties (or just check it in the <tt>Build</tt> view of the <tt>MANIFEST.MF</tt> file in your IDE.
 +
 
 +
And finally, this is a sample showing how a pipelet is invoked in the BPEL pipeline using an <tt>&lt;extensionActivity&gt;</tt>. It also shows how the pipelet is configured using a <tt>&lt;configuration&gt;</tt> section.
  
 
<source lang="xml">
 
<source lang="xml">
Line 76: Line 91:
 
         <proc:variables input="request" output="request" />
 
         <proc:variables input="request" output="request" />
 
         <proc:configuration>
 
         <proc:configuration>
             <rec:Val name="aStringParam">some value</rec:Val>
+
             <rec:Val key="aStringParam">some value</rec:Val>
             <rec:Vale name="aDateParam" type="datetime">2008-06-11 16:08:00</rec:Val>
+
             <rec:Val key="aDateParam" type="datetime">2008-06-11T16:08:00.000+0200</rec:Val>
 
         </proc:configuration>       
 
         </proc:configuration>       
 
     </proc:invokePipelet>
 
     </proc:invokePipelet>
Line 83: Line 98:
 
...
 
...
 
</source>
 
</source>
 +
 +
{{note|
 +
You can also configure Pipelines with SMILA's BPEL designer. See [[SMILA/BPEL_Designer|BPEL Designer]] for more information and how to install it.}}
  
 
=== Piplet configuration usage ===
 
=== Piplet configuration usage ===
Line 93: Line 111:
 
   <proc:invokePipelet name="addValuesToNonExistingAttribute">
 
   <proc:invokePipelet name="addValuesToNonExistingAttribute">
 
     <proc:pipelet class="org.eclipse.smila.processing.pipelets.AddValuesPipelet" />
 
     <proc:pipelet class="org.eclipse.smila.processing.pipelets.AddValuesPipelet" />
     <proc:variables input="request" />
+
     <proc:variables input="request" output="request"/>
 
     <proc:configuration>
 
     <proc:configuration>
 
       <rec:Val key="outputAttribute">out</rec:Val>
 
       <rec:Val key="outputAttribute">out</rec:Val>
Line 107: Line 125:
  
 
<source lang="java">
 
<source lang="java">
 +
public class AddValuesPipelet implements Pipelet {
  
private static final String PARAM_ATTRIBUTE = "outputAttribute";
+
  /** config property name for attribute name to add values to. */
private static final String PARAM_VALUES = "valuesToAdd";
+
  private static final String PARAM_ATTRIBUTE = "outputAttribute";
  
private String _outputAttribute;
+
  /** config property name for the values to add. */
private Any _values;
+
  private static final String PARAM_VALUES = "valuesToAdd";
  
public void configure(final AnyMap configuration) throws ProcessingException {
+
   /** the pipelet's configuration. */
   _outputAttribute = configuration.getStringValue(PARAM_ATTRIBUTE);
+
   private AnyMap _config;
   _values = configuration.get(PARAM_VALUES);
+
}
+
  
public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
+
  /** local logger. */
  if (_outputAttribute != null && _values != null) {
+
  private final Log _log = LogFactory.getLog(getClass());
     try {
+
 
       for (final String id : recordIds) {
+
  /** add Any values to an attribute as described in pipelet config or parameters. */
         for (final Any value : _values) {
+
  @Override
          blackboard.getMetadata(id).add(_outputAttribute, value);
+
  public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException {
 +
    final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _config);
 +
     final ResultCollector resultCollector =
 +
       new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
 +
    for (final String id : recordIds) {
 +
      paramAccessor.setCurrentRecord(id);
 +
      // the attribute to which to add the values.
 +
      final String outputAttribute = paramAccessor.getParameter(PARAM_ATTRIBUTE, null);
 +
      // the values to add.
 +
      final Any values = paramAccessor.getParameterAny(PARAM_VALUES);
 +
      if (values != null && outputAttribute != null) {
 +
         try {
 +
          for (final Any value : values) {
 +
            blackboard.getMetadata(id).add(outputAttribute, value);
 +
          }
 +
          resultCollector.addResult(id);
 +
        } catch (final Exception ex) {
 +
          resultCollector.addFailedResult(id, ex);
 
         }
 
         }
 
       }
 
       }
    } catch (final Exception ex) {
 
      throw new ProcessingException(ex);
 
 
     }
 
     }
 +
    return resultCollector.getResultIds();
 +
  }
 +
 +
  /** {@inheritDoc} */
 +
  @Override
 +
  public void configure(final AnyMap configuration) throws ProcessingException {
 +
    _config = configuration;
 
   }
 
   }
  return recordIds;
 
 
}
 
}
 
</source>
 
</source>
  
 
[[Category:SMILA]]
 
[[Category:SMILA]]

Revision as of 11:07, 20 September 2012

This page describes how to implement and configure your own pipelet in case you wish to add functionality to SMILA.

What are pipelets?

Pipelets are not standalone services, but their lifecycle and configuration is managed by the workflow engine. Each occurrence of a pipelet in a workflow uses a different pipelet instance. For more information take a look at Pipelets.

Before writing your own pipelet we recommend you to take a look at the HelloWorldPipelet.java. This pipelet is an example of a very simple processing pipelet and can be used as a template for your pipelets. You can also get it by exporting the example bundles from SMILA's repository and import it into your SMILA workspace and use them as templates:

You can also download these examples from the release downloads or the nightly build downloads.

Implementation

Follow these instructions to implement a pipelet in SMILA:

  • In the MANIFEST.MF, add at least these as "Imported Packages" (of course, you will need more to develop your pipelet, depending on what you want to do):
    • org.eclipse.smila.blackboard
    • org.eclipse.smila.datamodel
    • org.eclipse.smila.processing
    • org.eclipse.smila.utils
    • org.eclipse.smila.processing.parameters (to use ParameterAccessor)
    • org.eclipse.smila.processing.util (to use ResultCollector)
  • Create a class that implements the interface org.eclipse.smila.processing.Pipelet and make sure that the class has a public no-argument constructor (or none at all).
  • Implement void configure(AnyMap configuration). This method is called prior to process. Here you can read the configuration provided for the pipelet in the pipeline. To share those properties store the whole configuration in a member variable or better check the configuration for validity and completeness and then store the settings in a member variable. You can later access the configuration with the ParameterAccessor (see code sample below). The advantage of using the parameter accessor is that you can also override your configuration with configuration parameters set in the single records (e.g. the task parameters are included in the record by PipelineProcessorWorker, so you could override the pipelets configuration with job parameters as long as you use the ParameterAccessor to access parameters in the process method).
Idea.png
Do not access OSGi Services in configure() You should not try to use OSGi services in the configure() method, because during startup of SMILA configure() is called immediately during initialization of the BPEL engine, and it is easily possible that the required service has not been activated at that time, and if configure() throws an exception because it cannot lookup the required service, the pipeline will not be deployed and cannot be executed. It's better to lookup the services later in the process() method, because all services should be available then.
  • Implement String[] process(Blackboard blackboard, String[] recordIds). Here you have to place the "business logic" of your pipelet. In most cases the result is the same as the input recordIDs. But it is possible to return another list of IDs as result. Use the ResultCollector.addResult(...) to collect the ids you want to return and ResultCollector.addFailedResult(...) to control the error handling behaviour of your pipelet. When creating the ResultCollector you have to determine if the Collector should drop the ids that caused errors from the result id list it creates (using resultCollector.getResultIds())or if these ids should still be part of the result id set. The ResultCollector will also control exception handling for you when setting failed record ids (depending on the existence and value of the parameter _failOnError, default is false). If _failOnError is set to true the ResultCollector will throw a ProcessingException, if not, the error will just be logged (on how to use the ResultCollector see the code sample below).
  • Create a pipelet description json file for your pipelet in the SMILA-INF folder of the providing bundle. The file name is arbitrary but must have ".json" extension and the file content must contain at least the class name of the pipelet ({"class" : "your.class.name.here"}). Then they can be detected by the PipeletTracker service. If you would like to register multiple classes, use one separate description file for each class. Don't forget to add the SMILA-INF folder to the bin.includes entry of the bundle's build.properties file.
  • Consider thread-safe-ness. Because pipelets may be accessed by multiple threads, make sure that access to member variables (e.g. the configuration) is read-only. For best practices: use local variables instead of member variables if possible.

Further reading

For more information on the ResultCollector see JavaDoc.

Configuration

If your pipelet requires a configuration:

  • Add a <configuration> element to the <extensionActivity> section of your pipelet in the BPEL pipeline.

Examples

Pipelet Usage

This is a template for MyPipelet.java:

package org.eclipse.smila.mypackage

import org.eclipse.smila.blackboard.Blackboard;
import org.eclipse.smila.processing.ProcessingException;
import org.eclipse.smila.processing.Pipelet;
import org.eclipse.smila.datamodel.AnyMap;
 
public class MyPipelet implements Pipelet {
 
  public void configure(AnyMap configuration) throws ProcessingException {
    // read the configuration properties
  }
 
  public String[] process(Blackboard blackboard, String[] recordIds) throws ProcessingException {
    // process the records and create a result
  }
}

And this is how to register the pipelet class in the bundle: Create a folder SMILA-INF in the bundle and add a file MyPipelet.json to this folder:

{
   "class": "org.eclipse.smila.mypackage.MyPipelet",
   "parameters": [
     <put your parameter descriptions here>
   ],
   "description": "The textual description of my pipelet. This is optional but good style."
}
Note.png
The description as well as the parameters section is optional but gives users a hint on which parameters you pipelet uses and what they mean as well as what your pipelet does. The parameters section follows the same restrictions as the workers' parameter descriptions, see SMILA/Documentation/ParameterDefinition for details.


Now add the folder SMILA-INF to the build.properties (or just check it in the Build view of the MANIFEST.MF file in your IDE.

And finally, this is a sample showing how a pipelet is invoked in the BPEL pipeline using an <extensionActivity>. It also shows how the pipelet is configured using a <configuration> section.

...
<extensionActivity>
    <proc:invokePipelet name="invokeMyPipelet">
        <proc:pipelet class="org.eclipse.smila.mypackage.MyPipelet" />
        <proc:variables input="request" output="request" />
        <proc:configuration>
            <rec:Val key="aStringParam">some value</rec:Val>
            <rec:Val key="aDateParam" type="datetime">2008-06-11T16:08:00.000+0200</rec:Val>
        </proc:configuration>       
    </proc:invokePipelet>
</extensionActivity>
...
Note.png
You can also configure Pipelines with SMILA's BPEL designer. See BPEL Designer for more information and how to install it.


Piplet configuration usage

The following example shows the usage of multiple values for properties:

...
<extensionActivity>
  <proc:invokePipelet name="addValuesToNonExistingAttribute">
    <proc:pipelet class="org.eclipse.smila.processing.pipelets.AddValuesPipelet" />
    <proc:variables input="request" output="request"/>
    <proc:configuration>
      <rec:Val key="outputAttribute">out</rec:Val>
      <rec:Seq key="valuesToAdd">
        <rec:Val>value1</rec:Val>
        <rec:Val>value2</rec:Val>
      </rec:Seq>
    </proc:configuration>
  </proc:invokePipelet>
</extensionActivity>
...
public class AddValuesPipelet implements Pipelet {
 
  /** config property name for attribute name to add values to. */
  private static final String PARAM_ATTRIBUTE = "outputAttribute";
 
  /** config property name for the values to add. */
  private static final String PARAM_VALUES = "valuesToAdd";
 
  /** the pipelet's configuration. */
  private AnyMap _config;
 
  /** local logger. */
  private final Log _log = LogFactory.getLog(getClass());
 
  /** add Any values to an attribute as described in pipelet config or parameters. */
  @Override
  public String[] process(final Blackboard blackboard, final String[] recordIds) throws ProcessingException {
    final ParameterAccessor paramAccessor = new ParameterAccessor(blackboard, _config);
    final ResultCollector resultCollector =
      new ResultCollector(paramAccessor, _log, ProcessingConstants.DROP_ON_ERROR_DEFAULT);
    for (final String id : recordIds) {
      paramAccessor.setCurrentRecord(id);
      // the attribute to which to add the values.
      final String outputAttribute = paramAccessor.getParameter(PARAM_ATTRIBUTE, null);
      // the values to add.
      final Any values = paramAccessor.getParameterAny(PARAM_VALUES);
      if (values != null && outputAttribute != null) {
        try {
          for (final Any value : values) {
            blackboard.getMetadata(id).add(outputAttribute, value);
          }
          resultCollector.addResult(id);
        } catch (final Exception ex) {
          resultCollector.addFailedResult(id, ex);
        }
      }
    }
    return resultCollector.getResultIds();
  }
 
  /** {@inheritDoc} */
  @Override
  public void configure(final AnyMap configuration) throws ProcessingException {
    _config = configuration;
  }
}