Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/BPEL Workflow Processor"

(Advanced process definition)
(Changing the number of records in a pipeline)
 
(8 intermediate revisions by 3 users not shown)
Line 21: Line 21:
 
If you want to use a "real" database you will have to make the JDBC driver available to bundle "org.apache.ode", and check the ODE documentation on how to prepare the database schema for ODE.
 
If you want to use a "real" database you will have to make the JDBC driver available to bundle "org.apache.ode", and check the ODE documentation on how to prepare the database schema for ODE.
  
Additional to the initial setup you may update existing pipelines if they are not predefined (system pipelines). You can do that with the [http://wiki.eclipse.org/SMILA/Documentation/Processing/JSON_REST_API REST API] or internally with the [http://build.eclipse.org/rt/smila/javadoc/current/org/eclipse/smila/processing/WorkflowProcessor.html WorkflowProcessor].
+
Additional to the initial setup you may update existing pipelines if they are not predefined (system pipelines). You can do that with the [http://wiki.eclipse.org/SMILA/Documentation/Processing/JSON_REST_API REST API] or internally with the [http://build.eclipse.org/rt/smila/javadoc/current/org/eclipse/smila/processing/WorkflowProcessor.html WorkflowProcessor]. Such pipelines are stored in the [[SMILA/Documentation/ObjectStore/Bundle org.eclipse.smila.objectstore|ObjectStore  service]] in store <tt>bpel</tt>.
  
 
== Pipeline definition using BPEL ==
 
== Pipeline definition using BPEL ==
Line 67: Line 67:
 
# Copy the above snippet to a new file with the suffix <tt>.bpel</tt> and saved it to the folder <tt>configuration/org.eclipse.smila.processing.bpel/''$pipeline.dir''</tt>.
 
# Copy the above snippet to a new file with the suffix <tt>.bpel</tt> and saved it to the folder <tt>configuration/org.eclipse.smila.processing.bpel/''$pipeline.dir''</tt>.
 
# Then replace ''$PIPELINENAME'' by the desired name of your pipeline.
 
# Then replace ''$PIPELINENAME'' by the desired name of your pipeline.
 +
## Please note that the pipeline name must only contain characters from the following range: "a-zA-Z._-". If the pipeline does not conform to this naming restrictions, it will not be accessible and SMILA will print a warning in the log file that the pipeline name is invalid.
 
# Next, copy the files <tt>record.xsd</tt>, and <tt>processor.wsdl</tt> from the <tt>xml</tt> directory in bundle <tt>org.eclipse.smila.processing.bpel</tt> to the same folder next to your .bpel file (if not already there).
 
# Next, copy the files <tt>record.xsd</tt>, and <tt>processor.wsdl</tt> from the <tt>xml</tt> directory in bundle <tt>org.eclipse.smila.processing.bpel</tt> to the same folder next to your .bpel file (if not already there).
 
# Then, still in the same folder, create or edit a file named <tt>deploy.xml</tt> containing the following content but replace ''$PIPELINENAME'' by the name of the new pipeline:
 
# Then, still in the same folder, create or edit a file named <tt>deploy.xml</tt> containing the following content but replace ''$PIPELINENAME'' by the name of the new pipeline:
Line 129: Line 130:
  
 
Replace the class name with the class name of the pipelet to use and add configuration parameters as needed - this should be documented by the pipelet provider. The configuration is a generic AnyMap object like the one used as record metadata, see [[SMILA/Documentation/Data Model and Serialization Formats]] for details. If the output variable is the same as the input variable (which is usually sufficient), you can omit the ''output'' attribute.
 
Replace the class name with the class name of the pipelet to use and add configuration parameters as needed - this should be documented by the pipelet provider. The configuration is a generic AnyMap object like the one used as record metadata, see [[SMILA/Documentation/Data Model and Serialization Formats]] for details. If the output variable is the same as the input variable (which is usually sufficient), you can omit the ''output'' attribute.
 +
 +
==== Pipelet invocations in BPEL loops (forEach) ====
  
 
In versions later than 0.9 the invokePipelet activity supports to give an additional "index" variable. This is to support advanced pipelines that can be invoked with multiple records at once (usually used in the PipelineProcessingWorker to reduce BPEL overhead) and still can use conditions to invoke pipelets based on attribute values. You can then do forEach loops on the record list and evaluate conditions on each record. For example the following loop would invoke the HtmlToTextPipelet only for records that have the mime type "text/html":
 
In versions later than 0.9 the invokePipelet activity supports to give an additional "index" variable. This is to support advanced pipelines that can be invoked with multiple records at once (usually used in the PipelineProcessingWorker to reduce BPEL overhead) and still can use conditions to invoke pipelets based on attribute values. You can then do forEach loops on the record list and evaluate conditions on each record. For example the following loop would invoke the HtmlToTextPipelet only for records that have the mime type "text/html":
Line 174: Line 177:
 
</source>
 
</source>
 
See the [https://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/core/SMILA.application/configuration/org.eclipse.smila.processing.bpel/pipelines/AddPipeline.bpel AddPipeline.bpel] in the standard SMILA configuration for an example with complete context.
 
See the [https://dev.eclipse.org/svnroot/rt/org.eclipse.smila/trunk/core/SMILA.application/configuration/org.eclipse.smila.processing.bpel/pipelines/AddPipeline.bpel AddPipeline.bpel] in the standard SMILA configuration for an example with complete context.
 +
 +
==== Changing the number of records in a pipeline ====
 +
 +
A special case are pipelines that change the number of records processed in a call. Examples would be:
 +
 +
* filtering out records, i.e. removing records that should not be processed further.
 +
* document splitting, i.e. creating a set of records from a single one and replacing the source records with the "splits", e.g. using the [[SMILA/Documentation/Bundle_org.eclipse.smila.processing.pipelets#org.eclipse.smila.processing.pipelets.DocumentSplitterPipelet|DocumentSplitterPipelet]].
 +
 +
This works fine only in pipelines that do not contain a loop over the records: You can use a single variable for all pipelets that always reflects the current set of records and can be used in the <tt><reply></tt> activity at the end to return the final set of processed records. In a loop it is not possible (or at least not simple) to maintain this single variable if the record set changes all the time (and even in parallel), so you would have to construct it somehow at the end of the loop (descriptions how to do this are highly appreciated ;-).
 +
 +
So, if you do record splitting or filtering, there are two feasible alternatives:
 +
* If all records are processed in the same way (i.e. no BPEL elements like <tt><if></tt> are necessary): no problem, just create the sequence of pipelets and use one variable all the time. You can invoke the pipeline with a set of records in one call and everything will be fine (assuming that all pipelets support processing multiple records, which usually is the case).
 +
* If you need conditions in the pipeline, you can process only a single record per invocation. Be sure to set the Parameter "pipelineRunBulkSize" of the [[SMILA/Documentation/Worker/PipelineProcessorWorker#Configuration|PipelineProcessorWorker]] to 1 if you use this pipeline with in asynchronous workflows. Then you can still access this record in conditions using index=1, for example:
 +
<source lang="xml">
 +
    <if name="Is PDF document?">
 +
      <condition>$request.records/rec:Record[1]/rec:Val[@key="MimeType"] = 'application/pdf'</condition>
 +
      ...
 +
</source>
  
 
=== Pipeline invocations ===
 
=== Pipeline invocations ===
Line 192: Line 213:
 
<source lang="xml">
 
<source lang="xml">
 
<invoke name="invokeSubPipeline" operation="process" portType="proc:ProcessorPortType"  
 
<invoke name="invokeSubPipeline" operation="process" portType="proc:ProcessorPortType"  
   partnerLink="$SUBPIPELINENAME" inputVariable="request" outputVariable="request"
+
   partnerLink="$SUBPIPELINENAME" inputVariable="request" outputVariable="request" />
/>
+
 
</source>
 
</source>
  

Latest revision as of 08:07, 27 June 2014

This page describes how to configure the SMILA BPEL workflow processor and how to call SMILA pipelets from BPEL processes. We do not assume any BPEL knowledge here, i.e. this page should contain everything to enable you to create at least simple BPEL processes for being used in SMILA.

Basic configuration

The BPEL WorkflowProcessor expects its configuration in configuration/org.eclipse.smila.processing.bpel. In this directory it expects a file named processor.properties that describes the main configuration. This file can contain the following SMILA specific properties:

  • pipeline.dir (default="pipelines"): The name of a folder below configuration/org.eclipse.smila.processing.bpel which contains the BPEL process files (together with all needed XSD and WSDL files) and the ODE specific deploy.xml file. See below for details.
  • pipeline.timeout (default="300"): Maximum time in seconds allowed for processing a pipeline. If a pipeline invocation takes longer, it is aborted with an error. You may want to increase this value in case you expect longer processing times in your application (e.g. when analyzing very large documents).
  • record.filter (default = none): A record filter defining the attributes and annotations that should be contained in BPEL workflow objects. If none is set, the workflow objects will contain only the record IDs to be processed. Add only those attributes and annotations to the filter that are actually used in any pipeline, because adding too many (and too huge) elements to the workflow object may decrease performance and use more memory. As the WorkflowProcessor uses the Blackboard to filter objects, you must define the filters in org.eclipse.smila.blackboard/RecordFilters.xml.

As the BPEL WorkflowProcessor is based on the Apache ODE BPEL engine, you can also add all ODE specific configuration properties to this file, just use the prefix ode. See ODE documentation for details. You have to add at least the configuration for a database connection which ODE needs for internal purposes (e.g. storing process definitions). For SMILA purposes usually an in-memory Apache Derby instance is completely sufficient, the required Derby library is includedn in SMILA. To use it, set the following properties:

ode.db.mode=internal
ode.db.int.driver=org.apache.derby.jdbc.EmbeddedDriver
ode.db.int.jdbcurl=jdbc:derby:memory:odedb;create=true
ode.db.int.username=sa
ode.db.int.password=

If you want to use a "real" database you will have to make the JDBC driver available to bundle "org.apache.ode", and check the ODE documentation on how to prepare the database schema for ODE.

Additional to the initial setup you may update existing pipelines if they are not predefined (system pipelines). You can do that with the REST API or internally with the WorkflowProcessor. Such pipelines are stored in the ObjectStore service in store bpel.

Pipeline definition using BPEL

The minimal BPEL process for SMILA pipelines looks like this:

<?xml version="1.0" encoding="utf-8" ?>
<process name="$PIPELINENAME" targetNamespace="http://www.eclipse.org/smila/processor" 
  xmlns="http://docs.oasis-open.org/wsbpel/2.0/process/executable"
  xmlns:bpel="http://docs.oasis-open.org/wsbpel/2.0/process/executable"
  xmlns:xsd="http://www.w3.org/2001/XMLSchema" 
  xmlns:proc="http://www.eclipse.org/smila/processor" 
  xmlns:rec="http://www.eclipse.org/smila/record">
 
    <import location="processor.wsdl" namespace="http://www.eclipse.org/smila/processor"
        importType="http://schemas.xmlsoap.org/wsdl/" />
 
    <partnerLinks>
        <partnerLink name="Pipeline" partnerLinkType="proc:ProcessorPartnerLinkType" 
            myRole="service" />
    </partnerLinks>
 
    <extensions>
        <extension namespace="http://www.eclipse.org/smila/processor" mustUnderstand="no" />
    </extensions>
 
    <variables>
        <variable name="request" messageType="proc:ProcessorMessage" />
    </variables>
 
    <sequence>
        <receive name="start" partnerLink="Pipeline" portType="proc:ProcessorPortType" 
            operation="process" variable="request" createInstance="yes" />
 
        <!-- pipelet invocations will be added here -->
 
        <reply name="end" partnerLink="Pipeline" portType="proc:ProcessorPortType" 
            operation="process" variable="request" />
    </sequence>
</process>

To create a new pipeline:

  1. Copy the above snippet to a new file with the suffix .bpel and saved it to the folder configuration/org.eclipse.smila.processing.bpel/$pipeline.dir.
  2. Then replace $PIPELINENAME by the desired name of your pipeline.
    1. Please note that the pipeline name must only contain characters from the following range: "a-zA-Z._-". If the pipeline does not conform to this naming restrictions, it will not be accessible and SMILA will print a warning in the log file that the pipeline name is invalid.
  3. Next, copy the files record.xsd, and processor.wsdl from the xml directory in bundle org.eclipse.smila.processing.bpel to the same folder next to your .bpel file (if not already there).
  4. Then, still in the same folder, create or edit a file named deploy.xml containing the following content but replace $PIPELINENAME by the name of the new pipeline:
<deploy xmlns="http://www.apache.org/ode/schemas/dd/2007/03" xmlns:proc="http://www.eclipse.org/smila/processor">
 
    <!-- other pipelines -->
 
    <process name="proc:$PIPELINENAME">
        <in-memory>true</in-memory>
        <provide partnerLink="Pipeline">
            <service name="proc:$PIPELINENAME" port="ProcessorPort" />
        </provide>
    </process>
</deploy>

You can now add pipelet invocations to your pipeline BPEL. To add another pipelet you have to add only another BPEL file and copy the <process> element in deploy.xml for the new pipeline.

Pipelet invocations

Pipelets are classes that implement interface org.eclipse.smila.processing.Pipelet (in bundle org.eclipse.smila.processing) and have a corresponding definition file (the name must end in ".json") in the SMILA-INF directory of the bundles that provides them. They are configured by the WorkflowProcessor on pipeline initialization. One instance is created for each time they occur in any pipeline, instances are not shared between multiple pipelines.

The json file must at least contain the class name of the class that implements the Pipelet interface. E.g.

{
  "class" : "org.eclipse.smila.processing.pipelets.AddValuesPipelet" 
}

You can add more stuff to this file, e.g. a description of the configuration parameter the pipelet expects and supports. Examples that come with the base SMILA distribution are

  • in bundle org.eclipse.smila.processing.pipelets:
    • org.eclipse.smila.processing.pipelets.AddValuesPipelet: add some attribute values to each record.
    • org.eclipse.smila.processing.pipelets.HtmlToTextPipelet: Extract plain text and metadata from an HTML document in an attribute or attachment of each record and writes it to configurable attributes or attachments.
  • in bundle org.eclipse.smila.processing.pipelets.xmlprocessing:
    • A collection of pipelets for XML processing (XSLT, XPath selection, ...) of documents.
  • in bundle org.eclipse.smila.solr:
    • Pipelets for adding records to an Solr index and searching it.

To use such a pipelet in your pipeline, use the SMILA specific BPEL extension activity <invokePipelet> somewhere between <receive> and <reply> in your BPEL process:

<extensionActivity>
  <proc:invokePipelet name="invokeSomePipelet">
    <proc:pipelet class="org.eclipse.smila.pipelet.SomePipelet" />
    <proc:variables input="request" output="request" />
    <proc:configuration>
      <rec:Val key="single-parameter">value</rec:Val>
      <rec:Seq key="multi-parameter">
        <rec:Val>value1</rec:Val>
        <rec:Val>value2</rec:Val>
      <rec:Seq>
      <rec:Map key="complex-parameter">
        <rec:Val key="sub-parameter1">sub-value1</rec:Val>
        <rec:Val key="sub-parameter2">sub-value2</rec:Val>
      </rec:Map>
      <!-- more configuration parameters -->
    </proc:configuration>
  </proc:invokePipelet>
</extensionActivity>

Replace the class name with the class name of the pipelet to use and add configuration parameters as needed - this should be documented by the pipelet provider. The configuration is a generic AnyMap object like the one used as record metadata, see SMILA/Documentation/Data Model and Serialization Formats for details. If the output variable is the same as the input variable (which is usually sufficient), you can omit the output attribute.

Pipelet invocations in BPEL loops (forEach)

In versions later than 0.9 the invokePipelet activity supports to give an additional "index" variable. This is to support advanced pipelines that can be invoked with multiple records at once (usually used in the PipelineProcessingWorker to reduce BPEL overhead) and still can use conditions to invoke pipelets based on attribute values. You can then do forEach loops on the record list and evaluate conditions on each record. For example the following loop would invoke the HtmlToTextPipelet only for records that have the mime type "text/html":

<forEach counterName="index" parallel="yes" name="iterateRecords">
  <startCounterValue>1</startCounterValue>
  <finalCounterValue>count($request.records/rec:Record)</finalCounterValue>
  <scope>
    <if name="is HTML document">
      <condition>$request.records/rec:Record[position()=$index]/rec:Val[@key="MimeType"]="text/html"</condition>
      <extensionActivity>
        <proc:invokePipelet name="extract text from HTML">
          <proc:pipelet class="org.eclipse.smila.processing.bpel.pipelets.HtmlToTextPipelet" />
          <proc:variables input="request" index="index" />
          <proc:configuration>
            ...
          </proc:configuration>
        </proc:invokePipelet>
      </extensionActivity>
    </if>
  </scope>
</forEach>

The invokePipelet activity assumes that the index variables takes the values 1 to (number of records). Note that the pipelet is invoked in parallel for each record in this example.

If an output variable would be specified it would be assigned a list with only the one processed record in it. This can be useful if more than one pipelets would be invoked. If no output variable is specified, the input variable is not modified in this case.

Hint: If it is necessary that the input variable reflects the values of attributes changed by the pipelet (e.g. because it is needed in a following condition and you cannot use the single output record of the pipelet itself for the test) you must copy the output record back to the input variable using a bit of BPEL code:

  ...
  <extensionActivity>
    <proc:invokePipelet name="some pipelet">
    <proc:pipelet class="org.eclipse.smila.example.Pipelet" />
    <proc:variables input="request" index="index" output="oneRecord"/>
  </extensionActivity>
 <assign name="copy result into original variable for next tests">
    <copy>
      <from>$oneRecord.records/rec:Record[1]</from>
      <to>$request.records/rec:Record[position()=$index]</to>
    </copy>
  </assign>
  ...

See the AddPipeline.bpel in the standard SMILA configuration for an example with complete context.

Changing the number of records in a pipeline

A special case are pipelines that change the number of records processed in a call. Examples would be:

  • filtering out records, i.e. removing records that should not be processed further.
  • document splitting, i.e. creating a set of records from a single one and replacing the source records with the "splits", e.g. using the DocumentSplitterPipelet.

This works fine only in pipelines that do not contain a loop over the records: You can use a single variable for all pipelets that always reflects the current set of records and can be used in the <reply> activity at the end to return the final set of processed records. In a loop it is not possible (or at least not simple) to maintain this single variable if the record set changes all the time (and even in parallel), so you would have to construct it somehow at the end of the loop (descriptions how to do this are highly appreciated ;-).

So, if you do record splitting or filtering, there are two feasible alternatives:

  • If all records are processed in the same way (i.e. no BPEL elements like <if> are necessary): no problem, just create the sequence of pipelets and use one variable all the time. You can invoke the pipeline with a set of records in one call and everything will be fine (assuming that all pipelets support processing multiple records, which usually is the case).
  • If you need conditions in the pipeline, you can process only a single record per invocation. Be sure to set the Parameter "pipelineRunBulkSize" of the PipelineProcessorWorker to 1 if you use this pipeline with in asynchronous workflows. Then you can still access this record in conditions using index=1, for example:
    <if name="Is PDF document?">
      <condition>$request.records/rec:Record[1]/rec:Val[@key="MimeType"] = 'application/pdf'</condition>
      ...

Pipeline invocations

You can also invoke one pipeline from another to group pipelet invocations that belong together. To do this you have to use the standard BPEL invoke activity to invoke a BPEL partner link for the sub pipeline:

  • define a partner link in the <partnerLinks> section of the BPEL file, replace $SUBPIPELINENAME with the name of pipeline to invoke as defined in its <process> element:
<partnerLinks>
  <partnerLink name="Pipeline" partnerLinkType="proc:ProcessorPartnerLinkType" myRole="service" />
  <partnerLink name="$SUBPIPELINENAME" partnerLinkType="proc:ProcessorPartnerLinkType" partnerRole="service" />
</partnerLinks>
  • add an BPEL <invoke> activity between <receive> and <reply>, replace $SUBPIPELINENAME with the pipeline name and adapt the inputVariable and outputVariable attributes if necessary (omitting outputVariable is not allowed here!):
<invoke name="invokeSubPipeline" operation="process" portType="proc:ProcessorPortType" 
  partnerLink="$SUBPIPELINENAME" inputVariable="request" outputVariable="request" />
  • add a declaration for the partner link in the deploy.xml entry of your pipeline:
<deploy xmlns="http://www.apache.org/ode/schemas/dd/2007/03" xmlns:proc="http://www.eclipse.org/smila/processor">
    <process name="proc:$PIPELINENAME">
        <in-memory>true</in-memory>
        <provide partnerLink="Pipeline">
            <service name="proc:$PIPELINENAME" port="ProcessorPort" />
        </provide>
        <invoke partnerLink="$SUBPIPELINENAME">
            <service name="proc:$SUBPIPELINENAME" port="ProcessorPort" />
        </invoke>
    </process>
</deploy>

Advanced process definition

You can of course use all other BPEL elements, too, to create your pipelines like conditions, iterations, parallel flows, invocation of external Web Services, etc. However, to describe them is beyond the scope of this introduction and requires "real" knowledge about BPEL and XPath (and WSDL, for invoking Web Services).

Back to the top