Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/Worker/PipelineProcessorWorker"

m (PipelineProcessingWorker definition in workers.json)
m (Error handling)
 
(28 intermediate revisions by 5 users not shown)
Line 1: Line 1:
{{note| Available since SMILA 0.9.0!}}
+
= PipelineProcessorWorker (bundle org.eclipse.smila.processing.worker) =
  
= PipelineProcessingWorker (bundle org.eclipse.smila.processing.worker) =
+
The PipelineProcessorWorker is a [[SMILA/Glossary#W|worker]] designed to process synchronous pipelines inside an [[SMILA/Glossary#W|asynchronous workflow]]. The worker in principal is independent of a dedicated pipeline processing implementation. However, in SMILA we use [[SMILA/Documentation/BPEL_Workflow_Processor|BPEL pipelines]] for synchronous workflows, so in common speech the worker is also called ''BPEL worker''.
  
The PipelineProcessingWorker is a [[SMILA/Glossary#W|worker]] designed to process synchronous pipelines inside a [[SMILA/Glossary#W|asynchronous workflow]]. The worker in principal is independent of a dedicated pipeline processing implementation. However, in SMILA we use [[SMILA/Documentation/BPEL_Workflow_Processor|BPEL pipelines]] for synchronous workflows, so in common speech the worker is also called ''BPEL worker''.
+
The BPEL pipelines that can be used for execution are those defined in SMILA for BPEL processing. So there's no need to copy or configure them separately to use them with the PipelineProcessorWorker.
  
The BPEL pipelines that can be used for execution are those defined in SMILA for BPEL processing. So there's no need to copy or configure them separately to use them with the PipelineProcessingWorker.
+
BPEL pipelines resp. pipelets are able to process records in parallel. Therefore, the PipelineProcessorWorker can divide the records of the input bulk in bunches of records to be processed in a single call to reduce the overhead caused by invoking a BPEL pipeline. This can be configured via ''pipelineRunBulkSize'' parameter (see below).
 
+
BPEL pipelines resp. pipelets are able to process records in parallel. Therefore the PipelineProcessingWorker can divide the records of the input bulk in bunches of records to be processed in parallel. This can be configured via ''numberOfParallelRecords'' parameter (see below).
+
  
 
== JavaDoc ==
 
== JavaDoc ==
This page gives only a rough overview of the service. Please refer to the [http://build.eclipse.org/rt/smila/javadoc/current/index.html?org/eclipse/smila/processing/worker/PipelineProcessingWorker.html JavaDoc] for detailed information about the Java components.
+
This page gives only a rough overview of the service. Please refer to the [http://build.eclipse.org/rt/smila/javadoc/current/index.html?org/eclipse/smila/processing/worker/PipelineProcessorWorker.html JavaDoc] for detailed information about the Java components.
  
 
== Configuration ==
 
== Configuration ==
The PipelineProcessingWorker is configured via incoming task parameters. These parameters could have been set e.g. in a [[SMILA/Glossary#J|job]]  definition.
+
The PipelineProcessorWorker is configured via incoming task parameters. These parameters could have been set e.g. in a [[SMILA/Glossary#J|job]]  definition.
  
 
{|{{Greytable}}
 
{|{{Greytable}}
Line 20: Line 18:
 
! Default value
 
! Default value
 
|-
 
|-
| pipelineName
+
| <tt>pipelineName</tt>
| name of the synchronous (BPEL) pipeline to execute
+
| Default Name of the synchronous (BPEL) pipeline to execute.
 +
Additionally, each input record can contain an attribute "_pipeline" with a single string value to choose a different pipeline to process this record. If this attribute does not specify an existing pipeline (or the value is not a single string), the default pipeline is used to process this record (a warning is written to the logfile).
 
| ---
 
| ---
 
|-
 
|-
| numberOfParallelRecords
+
| <tt>pipelineRunBulkSize</tt>
| number of records to be processed in parallel by the synchronous workflow (value <= 0 -> use default value)
+
| Number of records to be processed in a single call by the synchronous workflow (a value <= 0 means default value)
 +
If records use the "_pipeline" attribute to select a different pipeline, or if multiple records in the bulk have the same record ID, it is possible that fewer records are processed in a single call than specified by the <tt>pipelineRunBulkSize</tt>.
 
| 1
 
| 1
 +
|-
 +
| <tt>keepAttachmentsInMemory</tt>
 +
| By default, attachments on processed records are kept in memory. If you don't have much memory or very large attachments it may be useful to set this parameter to false and have a BinaryStorage service activated. Then attachments are stored in BinaryStorage while processing and less memory is used. However, processing will probably be slower in this case. It may be better to reduce the pipelineRunBulkSize instead. After processing the attachments will be removed from BinaryStorage again. If no BinaryStorage service is active, all attachments will be kept in memory and this parameter will not have any effect.
 +
| true
 +
|-
 +
| <tt>writeAttachmentsToOutput</tt>
 +
| By default, attachments on incoming records are also added to the output records (if any are written). If this parameter is set to <tt>false</tt>, only record metadata is written to the output bulk. This can save a lot of IO if attachments are not needed anymore in the workflow after this worker.
 +
| true
 
|-
 
|-
 
|}
 
|}
Line 37: Line 45:
 
   "parameters":{
 
   "parameters":{
 
     "pipelineName": "myBpelPipeline",
 
     "pipelineName": "myBpelPipeline",
     "numberOfParallelRecords": "10",
+
     "pipelineRunBulkSize": "10",
 
     ...
 
     ...
 
   },
 
   },
Line 44: Line 52:
 
</pre>
 
</pre>
  
== PipelineProcessingWorker definition in workers.json ==
+
== PipelineProcessorWorker definition in workers.json ==
 
<pre>
 
<pre>
 +
GET /smila/jobmanager/workers/pipelineProcessor/
 +
 +
HTTP/1.x 200 OK
 +
 
{
 
{
   "name" : "pipelineProcessingWorker",
+
   "name" : "pipelineProcessor",
 
   "readOnly" : true,
 
   "readOnly" : true,
   "parameters" : [ {
+
   "parameters" : [  
     "name" : "pipelineName"
+
    {
  }, {
+
     "name" : "pipelineName"  
     "name" : "numberOfParallelRecords",
+
    },  
 +
    {
 +
    "name" : "<pipeletParameters>",
 +
    "optional" : true,
 +
    "type" : "any"
 +
    },
 +
    {
 +
     "name" : "pipelineRunBulkSize",
 
     "optional" : true
 
     "optional" : true
 
   } ],
 
   } ],
Line 67: Line 86:
 
</pre>
 
</pre>
  
The output bucket of the worker is optional so in an [[SMILA/Glossary#W|asynchronous workflow]] the worker does not need to have a successor. If the output bucket is not defined, the result records pf the pipeline processing are not persisted to a bulk, but thrown away. This makes sense if the pipeline itself stores the records somewhere, e.g. adds them to an index.
+
The output bucket of the worker is optional, hence in an [[SMILA/Glossary#W|asynchronous workflow]] the worker does not need to have a successor. If the output bucket is not defined, the result records of the pipeline processing are not persisted to a bulk, but thrown away. This makes sense if the pipeline stores the records somewhere itself, e.g. adds them to an index.
  
 
== Access task parameters in pipelets ==
 
== Access task parameters in pipelets ==
  
The worker adds all task parameters to a map in attribute <tt>_parameters</tt> in each record before giving it to the workflow processor, so each pipelet can access them. The helper class <tt>org.eclipse.smila.processing.parameters.ParameterAccesssor</tt> supports this by checking for requested parameters first in this <tt>_parameters</tt> map, then at the top-level of a record and then in the pipelet configuration. Therefore it's possible to override properties from the pipelet configuration by setting them as task parameters, if the pipelet uses the ParameterAccessor to access parameters in records and configuration. This is done for example by the [[SMILA/Documentation/LuceneIndexPipelet|Lucene indexing]] and [[SMILA/Documentation/SesameOntologyManager|Sesame]] pipelets.
+
The worker adds all task parameters to a map in attribute <tt>_parameters</tt> in each record before giving it to the workflow processor, so each pipelet can access them. The helper class <tt>org.eclipse.smila.processing.parameters.ParameterAccesssor</tt> supports this by checking for requested parameters first in this <tt>_parameters</tt> map, then at the top-level of a record and then in the pipelet configuration. Therefore it's possible to override properties from the pipelet configuration by setting them as task parameters, if the pipelet uses the <tt>ParameterAccessor</tt> to access parameters in records and configuration. This is done for example by the [[SMILA/Documentation/SesameOntologyManager|Sesame]] pipelets.
 +
 
 +
If the internal parameter <tt>_failOnError</tt> was not set before, the worker will set the parameter to "false". This means that the called pipelets should continue processing records and not stop when processing defect records. The pipelets themselves must implement this behavior. How to achieve this is explained in [[SMILA/Development_Guidelines/How_to_write_a_Pipelet|How to write a Pipelet]].
  
 
== Error handling ==
 
== Error handling ==
  
The following errors may occur when a [[SMILA/Glossary#T|task]] for the PipelineProcessingWorker is processed:
+
The following errors may occur when a [[SMILA/Glossary#T|task]] for the PipelineProcessorWorker is processed:
* No (resp. invalid) pipeline parameter
+
* Invalid "pipelineName" parameter:
** if the given pipeline parameter is not set (or invalid) the task will fail with a non-recoverable error
+
** If the given pipeline parameter is not set (or invalid) the task will fail with a non-recoverable error.
* ProcessingException while processing bunch of parallel records.  
+
* ProcessingException while processing a bunch of parallel records.  
** recoverable ProcessingException: the current task will fail with a recoverable error, so the whole task (with all records) will be repeated.
+
** Recoverable <tt>ProcessingException</tt>: The current task will fail with a recoverable error, so the whole task (with all records) will be repeated.
** non-recoverable ProcessingException: an error will be logged and the worker will continue with the next bunch of records. The records of the current bunch will be lost. (This is implemented in that way to not fail the whole task (with the whole input record bulk) if a single record is defect.)
+
** Non-recoverable <tt>ProcessingException</tt>: An error will be logged and the worker will continue with the next bunch of records. The records of the current bunch will be lost. (This is implemented in a way as to ''not'' fail the whole task with all its input records in case of a single record defect.)

Latest revision as of 09:07, 29 October 2014

PipelineProcessorWorker (bundle org.eclipse.smila.processing.worker)

The PipelineProcessorWorker is a worker designed to process synchronous pipelines inside an asynchronous workflow. The worker in principal is independent of a dedicated pipeline processing implementation. However, in SMILA we use BPEL pipelines for synchronous workflows, so in common speech the worker is also called BPEL worker.

The BPEL pipelines that can be used for execution are those defined in SMILA for BPEL processing. So there's no need to copy or configure them separately to use them with the PipelineProcessorWorker.

BPEL pipelines resp. pipelets are able to process records in parallel. Therefore, the PipelineProcessorWorker can divide the records of the input bulk in bunches of records to be processed in a single call to reduce the overhead caused by invoking a BPEL pipeline. This can be configured via pipelineRunBulkSize parameter (see below).

JavaDoc

This page gives only a rough overview of the service. Please refer to the JavaDoc for detailed information about the Java components.

Configuration

The PipelineProcessorWorker is configured via incoming task parameters. These parameters could have been set e.g. in a job definition.

Parameter Description Default value
pipelineName Default Name of the synchronous (BPEL) pipeline to execute.

Additionally, each input record can contain an attribute "_pipeline" with a single string value to choose a different pipeline to process this record. If this attribute does not specify an existing pipeline (or the value is not a single string), the default pipeline is used to process this record (a warning is written to the logfile).

---
pipelineRunBulkSize Number of records to be processed in a single call by the synchronous workflow (a value <= 0 means default value)

If records use the "_pipeline" attribute to select a different pipeline, or if multiple records in the bulk have the same record ID, it is possible that fewer records are processed in a single call than specified by the pipelineRunBulkSize.

1
keepAttachmentsInMemory By default, attachments on processed records are kept in memory. If you don't have much memory or very large attachments it may be useful to set this parameter to false and have a BinaryStorage service activated. Then attachments are stored in BinaryStorage while processing and less memory is used. However, processing will probably be slower in this case. It may be better to reduce the pipelineRunBulkSize instead. After processing the attachments will be removed from BinaryStorage again. If no BinaryStorage service is active, all attachments will be kept in memory and this parameter will not have any effect. true
writeAttachmentsToOutput By default, attachments on incoming records are also added to the output records (if any are written). If this parameter is set to false, only record metadata is written to the output bulk. This can save a lot of IO if attachments are not needed anymore in the workflow after this worker. true

Sample job definition that sets the parameters:

{
  "name":"myJob",
  "parameters":{
    "pipelineName": "myBpelPipeline",
    "pipelineRunBulkSize": "10",
    ...
   },
  "workflow":"myWorkflow"
}

PipelineProcessorWorker definition in workers.json

GET /smila/jobmanager/workers/pipelineProcessor/

HTTP/1.x 200 OK

{
  "name" : "pipelineProcessor",
  "readOnly" : true,
  "parameters" : [ 
    {
    "name" : "pipelineName" 
    }, 
    {
    "name" : "<pipeletParameters>",
    "optional" : true,
    "type" : "any"
    },
    {
    "name" : "pipelineRunBulkSize",
    "optional" : true
  } ],
  "input" : [ {
    "name" : "input",
    "type" : "recordBulks"
  } ],
  "output" : [ {
    "name" : "output",
    "type" : "recordBulks",
    "modes" : [ "optional" ]
  } ]
}

The output bucket of the worker is optional, hence in an asynchronous workflow the worker does not need to have a successor. If the output bucket is not defined, the result records of the pipeline processing are not persisted to a bulk, but thrown away. This makes sense if the pipeline stores the records somewhere itself, e.g. adds them to an index.

Access task parameters in pipelets

The worker adds all task parameters to a map in attribute _parameters in each record before giving it to the workflow processor, so each pipelet can access them. The helper class org.eclipse.smila.processing.parameters.ParameterAccesssor supports this by checking for requested parameters first in this _parameters map, then at the top-level of a record and then in the pipelet configuration. Therefore it's possible to override properties from the pipelet configuration by setting them as task parameters, if the pipelet uses the ParameterAccessor to access parameters in records and configuration. This is done for example by the Sesame pipelets.

If the internal parameter _failOnError was not set before, the worker will set the parameter to "false". This means that the called pipelets should continue processing records and not stop when processing defect records. The pipelets themselves must implement this behavior. How to achieve this is explained in How to write a Pipelet.

Error handling

The following errors may occur when a task for the PipelineProcessorWorker is processed:

  • Invalid "pipelineName" parameter:
    • If the given pipeline parameter is not set (or invalid) the task will fail with a non-recoverable error.
  • ProcessingException while processing a bunch of parallel records.
    • Recoverable ProcessingException: The current task will fail with a recoverable error, so the whole task (with all records) will be repeated.
    • Non-recoverable ProcessingException: An error will be logged and the worker will continue with the next bunch of records. The records of the current bunch will be lost. (This is implemented in a way as to not fail the whole task with all its input records in case of a single record defect.)

Back to the top