Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/QueueWorker"

Line 103: Line 103:
 
</QueueWorkerConnectionsConfig>
 
</QueueWorkerConnectionsConfig>
 
</source>
 
</source>
 +
 +
 +
== JMS Message Properties ==
 +
Each JMS message may contain properties. In SMILA the following properties are predefined and are automatically included with every message:
 +
* <b>DataSourceID</b> - the data source id
 +
* <b>Operation</b> - the kind of operation (<tt>ADD</tt>, <tt>DELETE</tt> or <tt>NONE</tt>
 +
 +
SMILA also supports user defined message properties. These are simply added to each record by named values of the predefined Annotation <tt>MessageProperties</tt>. If a record contains the annotation <tt>MessageProperties</tt> each named value of it is converted to a String message property.
 +
 +
It's also possible to add additional properties with static values in the <tt>Send</tt> task of your Router or Listener configuration (see [[#Send|Send task]]below).
  
  
Line 407: Line 417:
 
|}
 
|}
  
The Send element may contain any number of SetProperty elements. Each SetProperty element will add a same named and valued JMS Property to the MQ Entry of the record being put into the queue.
+
The Send element may contain any number of SetProperty elements. Each SetProperty element will add a same named and valued JMS Property to the MQ Entry of the record being put into the queue. These are in addition to any predefined or dynamically set JMS properties (see [[#JMS Message Properties|JMS Message Properties]]).
Additionally there are two JMS properties that are added by the SMILA Framework to each record, namely: DataSourceID and Operation. The first will contain the DataSourceID value and the second either ADD or DELETE.
+
  
 
Example:
 
Example:

Revision as of 11:12, 14 July 2009

Introduction

The QueueWorker components provide a bridge between Connectivity and OSGi-Runtime Processes that are responsible for processing the crawled content. The Connectivity is responsible for interacting with crawlers and agents, which in turn gather raw data information from various data sources and pack them into records. Data Flow Processes (DFP) allow manipulating or post processing the crawled information in those records. In most cases the processed data is stored in some form of a store or an index at the end of a DFP. Because not all data is equal (e.g. post processed the same way, except in the case of most simple of setups), the QueueWorker components are used to determine which record needs to be feed into which DFP. It thus acts as a router between the raw record and its post processing. To decide which DFP is the right one for a records, the QW investigates the message properties and assigns the DFP based on configured rules. The QueueWorker components are developed for the usage with JMS (Java Message Service). SMILA is distributed with an embedded JMS provider: Apache ActiveMQ.

Workflow

QueueWorker Bundle

The QueueWorker contains different components/features to take records in and out of queues. They are located in bundle org.eclipse.smila.connectivity.queue.worker.

The Router is needed to receive records from the Connectivity Framework, filter them and push them into queues. The Listener component of the QueueWorker is used as part of an OSGi runtime that processes the crawled information. It takes entries from the queue and calls one or more processing pipelines. Optionally, the processed records may be resend to a queue again. The QueueWorker contains also a Recycler component that can take records (which usually contain crawled or manipulated information) from storages (usually a RecordStorage) and send them back to queues. All components are configured with “Rules” which execute “Tasks”.

JMS Broker

JMS

JMS stands for Java Massage Service. The API is a messaging standard which allows applications written in Java to create, send, receive and read messages. An implementation of the JMS API is called a JMS Provider. SMILA embeds ActiveMQ as default JMS provider.

JMS Queues and Topics

A JMS Queue is a container for messages that should be received by exactly one consumer. Messages are delivered in the order they were sent. A message is removed from the queue once it has been read successfully.

In opposite to JMS Queues, so called JMS Topics implement a publish and subscribe semantic: When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message.

Note: JMS Topics are not used/supported in SMILA at the current state of implementation

Note on ActiveMQ: In ActiveMQ you do not have to create/configure destinations (= queues/topics) up front before you can use them. The ActiveMQ broker auto-creates the physical resources associated with a destination on demand (i.e. when messages are sent to a new destination on a broker).

JMS Broker

The JMS Message Broker is the main component on the server side of a JMS provider. It maintains queues and connections, routes messages to queues, acknowledges messages and handles transactions. In SMILA, an ActiveMQ Broker is embedded in the application per default.

Broker Configuration

In SMILA there is a jms.properties file to configure the embedded JMS broker. It is located in configuration/org.eclipse.smila.connectivity.queue.broker.main

Default Configuration:

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=vm:(broker:(tcp://localhost:61616?jms.useAsyncSend=true)?persistent=true)?marshal=false

The java.naming.provider.url parameter in this example defines a broker to which a JMS client can connect via TCP transport protocol.

Note on ActiveMQ: If you deactivate the org.eclipse.smila.connectivity.queue.broker.main bundle on SMILA startup, an internal broker will be created automatically by ActiveMQ. This broker can only be connected from a client running in the same SMILA instance by using the VM protocol (vm://localhost). So, if you need to connect to a broker (resp. its queues) via TCP from a different machine, the org.eclipse.smila.connectivity.queue.broker.main bundle should be activated (which is default in SMILA) and jms.properties allowing a TCP connection have to be set - like it is done in the default jms.properties shown above.


Broker Connection

A Broker Connection provides a cached connection pool to JMS brokers. The QueueWorker components use Broker Connections to communicate with different Queues in different JMS brokers.

Configuration

The different components of the QueueWorker can use connections to different brokers. The connections are defined in a special configuration file. A configuration has a unique Id and contains a list of specific JMS properties like:

  1. URL to broker
  2. user and password for login
  3. the connection type (factory) of specific broker

Below is an example of the default configuration file. It is located in configuration/org.eclipse.smila.queue.worker/QueueWorkerConnectionConfig.xml:

<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <ConnectionConfig Id="broker1">
    <URL>vm://localhost</URL>
    <User>any</User>
    <Password>any</Password>
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
  </ConnectionConfig>
 
</QueueWorkerConnectionsConfig>

The schema for this configuration file is located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

Example

This example uses two completely independent brokers. Keep in mind that there should be a unique Id for every connection.

<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <ConnectionConfig Id="broker1">
    <URL>tcp://localhost:61616</URL>
    <User>any</User>
    <Password>any</Password>
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
  </ConnectionConfig>
 
  <ConnectionConfig Id="broker2">
    <URL>tcp://10.0.0.1:61617</URL>
    <User>any1</User>
    <Password>any1</Password>
    <ConnectionFactory>com.ibm.mq.jms.MQQueueConnectionFactory</ConnectionFactory>
    <ConnectionProperty Name="useExponentialBackOff">true</ConnectionProperty>
  </ConnectionConfig>
 
</QueueWorkerConnectionsConfig>


JMS Message Properties

Each JMS message may contain properties. In SMILA the following properties are predefined and are automatically included with every message:

  • DataSourceID - the data source id
  • Operation - the kind of operation (ADD, DELETE or NONE

SMILA also supports user defined message properties. These are simply added to each record by named values of the predefined Annotation MessageProperties. If a record contains the annotation MessageProperties each named value of it is converted to a String message property.

It's also possible to add additional properties with static values in the Send task of your Router or Listener configuration (see Send taskbelow).


Router

The Router is one of the QueueWorker services. The main goal is to put record into one or more JMS queues (which are hosted/provided by a JMS broker). This happens within the Send task. The Router looks for every record that was crawled by a rule that fits with its condition. But it is also possible to configure the Router only with a Process task, which will effectively make the MQ obsolete which can be useful in small and resource-limited environments. In that case the router will directly put the record into the pipeline.

Configuration

The configuration file is located in configuration/org.eclipse.smila.connectivity.queue.worker/QueueWorkerRouterConfig.xml. It simply consists of a list of routing rules.

Default configuration:

<QueueWorkerRouterConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="Default Route Rule">
    <Condition></Condition>
    <Task> 
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
</QueueWorkerRouterConfig>

The schema is located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

Example

This example routes each record to more than one queue by defining several Send tasks:

<QueueWorkerRouterConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="Double Send Route Rule">
    <Condition></Condition>
    <Task> 
      <Send BrokerId="broker1" Queue="queue1" RecordFilter="nothing"/>
      <Send BrokerId="broker2" Queue="queue2" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
</QueueWorkerRouterConfig>

A detailed description of the Routing Rules can be found here.


Listener

The Listener is another QueueWorker component. This component can be used to “listen” for specific records on specific queues. If a record matching the condition of a rule exists the Listener takes the record from the queue and pushed it to BPEL pipelines. Generally the Listener works with the Process task, but it’s also possible to use Send tasks maybe for resending a record into a new JMS queue after the pipeline has finished.

Configuration

The configuration is nearly the same as the Router Configuration. There are only these following add-ons:

  • Source element: a source reference to the used broker (BrokerId should be specified in Broker Connection configuration)
  • WaitMassageTime attribute: timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
  • MaxMessageBlockSize attribute: maximum number of messages to receive from the queue in one session. If less messages are available those are consumed without waiting for more messages. Default value is 1 message.
  • Threads attribute: startup number of threads which listen to the queue under this rule, default value is 1 thread.

Default configuration:

<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="ADD Rule" WaitMessageTimeout="10" Threads="4" MaxMessageBlockSize="20">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD' and NOT(DataSourceID LIKE '%feeds%')</Condition>
    <Task>
      <Process Workflow="AddPipeline"/>
    </Task>
  </Rule>
 
  <Rule Name="Delete Rule" WaitMessageTimeout="10" Threads="2" MaxMessageBlockSize="20">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='DELETE'</Condition>
    <Task>
      <Process Workflow="DeletePipeline"/>
    </Task>
  </Rule>
 
  <Rule Name="ADD Feed Rule" WaitMessageTimeout="10" Threads="2" MaxMessageBlockSize="20">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD' and DataSourceID LIKE '%feeds%'</Condition>
    <Task>
      <Process Workflow="AddFeedPipeline"/>
    </Task>
  </Rule>
 
</QueueWorkerListenerConfig>

There is a schema located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

Example

An example for a preprocessing workflow using more than one queue can be found in the Tips section.

A detailed description of the Listener Rules can be found here.


RecordRecycler

A RecordRecycler is a special QueueWorker component which won't be needed in a SMILA standard scenario. You can use it if you need to reprocess (re-index) your data without accessing the original data sources again.

The current RecordRecycler is identical to the Router with the exception that RecordRecycler gets records from RecordStorage while the router gets them from the Connectivity. Thus it can be used to take records from the RecordStore and processes them (again) without having to recrawl the data sources.

Configuration

The RecordRecycler is high configurable with the xml configuration files in configuration/org.eclipse.smila.connectivity.queue.worker/recyclers. The RecordRecycler is able to execute any Router/Listener specific task.

Default Configuration (recyclers1.xml):

<QueueWorkerRecordRecyclerJob xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="../schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="First recycle Rule">
    <Condition>DataSourceID='myDataSource'</Condition>
    <Task>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
  <Rule Name="Second recycle Rule">
    <Condition>NOT(DataSourceID='myDataSource')</Condition>
    <Task>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
        <SetProperty Name="doAdd">no</SetProperty>
      </Send>
    </Task>
  </Rule>
 
</QueueWorkerRecordRecyclerJob>

There is a schema located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

A detailed description of the Rules can be found here.

Usage of JMX Management Agent

The JMX Agent name is org.eclipse.smila.connectivity.queue.worker.RecordRecycler. The following commands are available to be executed:

Signature Description Example
startRecycle (String configurationID, String dataSourceID) starts the recycle process startRecycle (recycler1, file)
stopRecycle (String dataSource ID) stops the recycle process stopRecycle (file)
getRecordsRecycled (String dataSourceID) gets the number of recycled records getRecrodsRecycled (dataSourceID)
getStatus (String dataSourceID) shows the current status of recycle process getStatus (file)
getConfigurations() lists all available recycler configurations


In Detail: QueueWorker Rules, Conditions and Tasks

QueueWorker Rules (used by Router-, Listener- and Recycler configuration) consist of:

  • Rule attributes
  • Source (Listener configuration only)
  • Condition
  • Tasks (Send/Process)

Here's an example of a Listener Rule:

<Rule Name="ADD Rule" WaitMessageTimeout="10" Threads="2">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD'</Condition>
    <Task>
      <Process Workflow="AddPipeline"/>
    </Task>
</Rule>

Its elements will be explained in detail in the following sections.

Rule attributes

In the Rule element you can specify the following attributes:

Attribute Description
Name (unique) name of the rule
WaitMassageTime (Listener only) timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
Threads (Listener only) startup number of threads which listen to the queue under this rule, default value is 2 threads.

Source

The Source element is only valid for the Listener configuration. It specifies a reference to the used broker (queue) connection. The given BrokerId must exist in the JMS Broker Connection configuration.

Condition

Conditions are elements of rules which are used by Router, Listener and RecordRecycler. A condition is a string which syntax is based on a subset of the SQL92 conditional expression syntax as ref'ed in the "Message Selector Syntax" section of [specification], and is applied against the JMS properties of an MQ entry. Within SMILA basically the two JMS properties, Operation and DataSourceID are supported, but generally every JMS property can be tested, including your own. An empty condition element represents an condition that is always true.

Router and Listener use the rules and their conditions differently:

Router / RecordRecycler
The primary role of the router is to put JMS records into the desired MQ. The target MQ is defines in the Send-Task, which record is defined by the conditions of the rule. Other than the Listener the router will consult all rules sequentially from top bottom for any given record and execute the task for the first matching rule. It is therefore a good idea to define as a last rule with an empty condition and send records that did not match any rules this far into a special queue, such as the dead letter queue.

Listener
The primary role of the Listener is to take records from the MQ and carry out the associated task. Hence the conditions are used as a "select clause" on all records of the queue. The task is then executed on one arbitrary record that matches the condition. Note that a listener creates at least one thread for each rule as defined by the Threads attribute at the rule. Hence the rules are executed concurrently. If therefore the conditions for diff. rules (and thus tasks) are not disjunctive, it is not deterministic which task is executed for which record. This is usually not desired so care must be taken when designing the rules.

Examples

Condition sample Description
<empty> empty conditions (<Condition></Condition>) are always true
Operation='ADD' if Operation equals to ADD
DataSourceID LIKE 'web%' if dataSourceId begins with web
NOT(DataSourceID LIKE 'web%' if dataSourceId does not start with web

Tasks

There are two pre-defined tasks <Send> and <Process>. The QueueWorker components Router and Listener use these tasks to do their job. The Router usually uses the <Send> task to send records (which contain one crawled entry) to a queue. The Listener uses the <Process> task to take a Record from a queue and feed it into a BPEL workflow for processing this record. At the end of the process it may create a <Send> task again for putting the record result (back) into a queue. The Queue Worker provides all tasks and every component has the possibility to execute each of these tasks.

Process

The Process task is only a plain tag with the attribute “Workflow”. This task takes a record from JMS broker and pushes it to a BPEL pipeline.

Attribute Description
Workflow the BPEL pipeline/workflow to execute.

Example:

<Process Workflow="AddPipeline"/>

Send

The Send task is used to send a Record to a JMS queue. The <Send> element may have the following attributes:

Attribute Requirement Default value Description
BrokerId required Id of the broker specified in JMS Broker Connection
Queue required target JMS queue
RecordFilter optional attribute to set the record filter used to downsize the record (see description below)
PersistentDelivery optional true attribute correspondent to JMS delivery mode. If it is set to true (by default), message delivery will be persistent, otherwise not
WithAttachments optional false used to get attachments from binary storage back into record after filtering them out. This might be needed in clustering scenarios where the MQ is used to transport the records to diff. nodes. By default this att. it's false (means that attachments are not attached to the Record)

The Send element may contain any number of SetProperty elements. Each SetProperty element will add a same named and valued JMS Property to the MQ Entry of the record being put into the queue. These are in addition to any predefined or dynamically set JMS properties (see JMS Message Properties).

Example:

<Send BrokerId="broker1" Queue="SMILA.connectivity"  RecordFilter="filterName" PersistentDelivery="false" WithAttachments="true">
  <SetProperty Name="doSomething">some value</SetProperty>
  <SetProperty Name="doSomethingElse">some other value</SetProperty>
</Send>
RecordFilter

A Record filter, as the name suggest, filters attributes of a record when sending it to a queue. Often a record is complicated object with many attributes holding the metadata but also the binary content of the crawled data. As a consequence the serialized binary size of the record can be very large and would need to be handled (stored!) by the MQ. Further, most attributes are only needed once or twice during processing which makes it less desirable to always carrying them around in the MQ. To lighten the record of such less needed data the records can be stripped of these, containing only the relevant information. However, keep in mind that stripped data must be fetched from the record or binary store when not held in memory which incurs some performance penalty.

The filters are defined in an xml config file. Each filter is identified by its name attribute. Every filter specifies copy-rules for attributes/annotations. Because the RecordFilter is invoked via Blackboard service the configuration file is located in configuration/org.eclipse.smila.blackboard/RecordFilter.xml

Default Configuration:

<RecordFilters
  xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance
  xsi:noNamespaceSchemaLocation="../org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd">
  <Filter name="workflow-object">    
    <Attribute name="MimeType" keepAnnotations="false"/>              
  </Filter>
  <Filter name="only-attributes">
    <Attribute name="*"/>
  </Filter>
  <Filter name="no-filter">
    <Attribute name="*" keepAnnotations="true"/>
    <Annotation name="*"/>
  </Filter>
  <Filter name="nothing"/>
</RecordFilters>

Only the Send-task may use a RecordFilter and is selected by its name. The given attribute/annotation tags decide which attributes and annotation will be copied to the record pushed into the MQ. The name attribute, is a selector for the attribute or annotation and supports '*' as a wildcard. The keepAnnotations attribute is a flag to copy internal attribute annotations or not. A RecordFilterNotFoundException will be thrown if no filter is chosen or a given filter is not found.

Example:

<RecordFilters>
  <Filter name="only-attributes">
    <Attribute name="*"/>
  </Filter>
<!-- all attributes like MimeType, Size and so on but without sub-annotations. -->
  <Filter name="no-filter">
    <Attribute name="*" keepAnnotations="true"/>
    <Annotation name="*"/>
  </Filter>
<!-- a full copy of the record will be created. -->
  <Filter name="nothing"/>
<!-- a record only containing the id is copied. -->
  <Filter name="only-attribute1">
    <Attribute name="attribute1"/>
  </Filter>
<!-- only a specified attributes is copied. -->
  <Filter name="filter-single-and-datetime">
    <Attribute name="single value"/>
    <Attribute name="datetime value"/>
  </Filter>
<!-- only single and datetime values are copied. -->
</RecordFilters>

There is a schema available at org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd


Tips

Persistent Queues

Queue persistence can be configured in one of the following ways:

1. persistent parameter in java.naming.provider.url (configuration/org.eclipse.smila.connectivity.queue.broker.main/jms.properties)

java.naming.provider.url=vm:(broker:(tcp://localhost:61616?jms.useAsyncSend=true)?persistent=true)?marshal=false

This is a global setting which will be applied to all queues.

2. PersistentDelivery attribute in Send Task (Router/Recycler/Listener configuration)

<Send BrokerId="broker1" Queue="SMILA.connectivity"  PersistentDelivery="false"/>

By setting PersistentDelivery=true/false you can define persistence handling on a single message granularity.

Hint on ActiveMQ: If persistence is disabled in the jms.properties, this cannot be overwritten by PersistentDelivery=true in the Send task. On the other hand, an enabled persistence in the jms.properties may be overwritten by setting PersistentDelivery=false.

Redelivery and Dead Letter Queue

You can define how a broker (connection) should handle failures during message delivery, e.g. when processing a record causes an exception in a BPEL pipeline.

<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue">
...
<URL>vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=2</URL>
...
</QueueWorkerConnectionsConfig>

maximumRedeliveries=2 means, that in case of a failure the record will be delivered to the pipeline two more times. If that fails again, the record will be sent to the Dead Letter Queue (DLQ).

Hints on ActiveMQ:

  • In ActiveMQ the DLQ will be created dynamically (if needed) and named "ActiveMQ.DLQ" per default. You can access this queue like any other queue, e.g. to define a special processing for failed records.
  • As defined in ActiveMQ DLQ default settings (DeadLetterStrategy), non-persistent messages will not be sent to the DLQ.

Automatic reconnect

When using ActiveMQ as JMS provider, you can define that connections should automatically be reconnected after being interrupted, e.g. because of a network failure. This can be done by using the failover protocol in the connection configuration:

<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue">
...
<URL>failover:(tcp://10.0.0.1:61617)</URL>
...
</QueueWorkerConnectionsConfig>

Hint: The failover protocol can also be used to provide alternative connections that may substitute the current connection if it fails. See ActiveMQ documentation for details.

Preprocessing by using JMS properties

As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where a preprocessing has to be done, i.e. the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.

As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.

A complete use case of own parameters and different listeners in multiple steps:

<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <!--  detects document mime type and send back to queue -->
  <Rule Name="Detect MIME type Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD' and NOT(detected='true')</Condition>
    <Task>
      <Process Workflow="DetectMimeTypePipeline"/>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
        <SetProperty name="detected" value="true"/>
      </Send>
    </Task>
  </Rule>
 
  <!--  converts documents and send back to queue -->
  <Rule Name="Convert Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD' and detected='true' and NOT(converted='true')</Condition>
    <Task>
      <Process Workflow="ConvertPipeline"/>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
        <SetProperty name="detected" value="true"/>
        <SetProperty name="converted" value="true"/>
      </Send>
    </Task>
  </Rule>
 
  <!--  add documents to index -->
  <Rule Name="ADD Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD' and converted='true'</Condition>
    <Task>
      <Process Workflow="AddPipeline"/>
    </Task>
  </Rule>
 
</QueueWorkerListenerConfig>

Preprocessing by using several queues

A similar preprocessing scenario like the one described before can be established without JMS parameters by using more than one queue:

<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <!--  detects document mime type and sends to next queue -->
  <Rule Name="Detect MIME type Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD'</Condition>
    <Task>
      <Process Workflow="DetectMimeTypePipeline"/>
      <Send BrokerId="broker1" Queue="SMILA.connectivity-2" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
  <!--  converts documents and sends to next queue -->
  <Rule Name="Convert Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity-2"/>
    <Condition>Operation='ADD'</Condition>
    <Task>
      <Process Workflow="ConvertPipeline"/>
      <Send BrokerId="broker1" Queue="SMILA.connectivity-3" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
  <!--  add documents to index -->
  <Rule Name="ADD Rule">
    <Source BrokerId="broker1" Queue="SMILA.connectivity-3"/>
    <Condition>Operation='ADD'</Condition>
    <Task>
      <Process Workflow="AddPipeline"/>
    </Task>
  </Rule>
 
</QueueWorkerListenerConfig>


Useful Links

  1. [JMS specification (JSR 914)]
  2. [JMS API SDK documentation]
  3. [Active MQ]
  4. [Active MQ – JNDI Support]
  5. [Active MQ – VM Transport Reference]