Skip to main content
Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/QueueWorker"

(Send)
 
(11 intermediate revisions by 5 users not shown)
Line 1: Line 1:
== Introduction ==
+
{{note|This has been removed in SMILA 0.9 by the [[SMILA/Documentation/JobManager|JobManager framework]]}}
 
+
The QueueWorker components provide a bridge between Connectivity and OSGi-Runtime Processes that are responsible for processing the crawled content.
+
The Connectivity is responsible for interacting with crawlers and agents, which in turn gather raw data information from various data sources and pack them into records. Data Flow Processes (DFP) allow manipulating or post processing the crawled information in those records. In most cases the processed data is stored in some form of a store or an index at the end of a DFP.
+
Because not all data is equal (e.g. post processed the same way, except in the case of most simple of setups), the QueueWorker components are used to determine which record needs to be feed into which DFP. It thus acts as a router between the raw record and its post processing.
+
To decide which DFP is the right one for a records, the QW investigates the message properties and assigns the DFP based on configured rules.
+
The QueueWorker components are developed for the usage with JMS (Java Message Service). SMILA is distributed with an embedded JMS provider: Apache ActiveMQ. 
+
 
+
[[Image:workflow.jpg|Workflow]]
+
 
+
=== QueueWorker Bundle ===
+
The QueueWorker contains different components/features to take records in and out of queues. They are located in bundle org.eclipse.smila.connectivity.queue.worker.
+
 
+
The Router is needed to receive records from the Connectivity Framework, filter them and push them into queues. The Listener component of the QueueWorker is used as part of an OSGi runtime that processes the crawled information. It takes entries from the queue and calls one or more processing pipelines. Optionally, the processed records may be resend to a queue again. The QueueWorker contains also a Recycler component that can take records (which usually contain crawled or manipulated information) from storages (usually a [[SMILA/Documentation/Record_Storage|RecordStorage]]) and send them back to queues. All components are configured with “Rules” which execute “Tasks”.
+
 
+
== JMS Broker ==
+
 
+
=== JMS ===
+
JMS stands for Java Massage Service. The API is a messaging standard which allows applications written in Java to create, send, receive and read messages. An implementation of the JMS API is called a ''JMS Provider''. SMILA embeds [http://activemq.apache.org ActiveMQ] as default JMS provider.
+
 
+
=== JMS Queues and Topics ===
+
A ''JMS Queue'' is a container for messages that should be received by exactly one consumer. Messages are delivered in the order they were sent. A message is removed from the queue once it has been read successfully.
+
 
+
In opposite to JMS Queues, so called ''JMS Topics'' implement a publish and subscribe semantic: When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message.
+
 
+
'''Note''': JMS Topics are not used/supported in SMILA at the current state of implementation
+
 
+
'''Note on ActiveMQ''': In ActiveMQ you do not have to create/configure destinations (= queues/topics) up front before you can use them. The ActiveMQ broker auto-creates the physical resources associated with a destination on demand (i.e. when messages are sent to a new destination on a broker).
+
 
+
=== JMS Broker ===
+
The JMS Message Broker is the main component on the server side of a JMS provider. It maintains queues and connections, routes messages to queues, acknowledges messages and handles transactions. In SMILA, an ActiveMQ Broker is embedded in the application per default.
+
 
+
=== Broker Configuration ===
+
In SMILA there is a '''jms.properties''' file to configure the embedded JMS broker. It is located in configuration/org.eclipse.smila.connectivity.queue.broker.main
+
 
+
Default Configuration:
+
 
+
<source lang="java">
+
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
+
java.naming.provider.url=vm:(broker:(tcp://localhost:61616?jms.useAsyncSend=true)?persistent=true)?marshal=false
+
</source>
+
 
+
The ''java.naming.provider.url'' parameter in this example defines a broker to which a JMS client can connect via TCP transport protocol.
+
 
+
'''Note on ActiveMQ:''' If you deactivate the org.eclipse.smila.connectivity.queue.broker.main bundle on SMILA startup, an internal broker will be created automatically by ActiveMQ. This broker can only be connected from a client running in the same SMILA instance by using the VM protocol (vm://localhost). So, if you need to connect to a broker (resp. its queues) via TCP from a different machine, the org.eclipse.smila.connectivity.queue.broker.main bundle should be activated (which is default in SMILA) and jms.properties allowing a TCP connection have to be set - like it is done in the default jms.properties shown above.
+
 
+
 
+
== Broker Connection ==
+
A Broker Connection provides a cached connection pool to JMS brokers. The QueueWorker components use Broker Connections to communicate with different Queues in different JMS brokers.
+
 
+
=== Configuration ===
+
The different components of the QueueWorker can use connections to different brokers. The connections are defined in a special configuration file.
+
A configuration has a unique Id and contains a list of specific JMS properties like:
+
# URL to broker
+
# user and password for login
+
# the connection type (factory) of specific broker
+
 
+
Below is an example of the default configuration file. It is located in configuration/org.eclipse.smila.queue.worker/QueueWorkerConnectionConfig.xml:
+
 
+
<source lang="xml">
+
<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <ConnectionConfig Id="broker1">
+
    <URL>vm://localhost</URL>
+
    <User>any</User>
+
    <Password>any</Password>
+
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
+
  </ConnectionConfig>
+
 
+
</QueueWorkerConnectionsConfig>
+
</source>
+
 
+
The schema for this configuration file is located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.
+
 
+
=== Example ===
+
 
+
This example uses two completely independent brokers. Keep in mind that there should be a unique Id for every connection.
+
 
+
<source lang="xml">
+
<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <ConnectionConfig Id="broker1">
+
    <URL>tcp://localhost:61616</URL>
+
    <User>any</User>
+
    <Password>any</Password>
+
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
+
  </ConnectionConfig>
+
 
+
  <ConnectionConfig Id="broker2">
+
    <URL>tcp://10.0.0.1:61617</URL>
+
    <User>any1</User>
+
    <Password>any1</Password>
+
    <ConnectionFactory>com.ibm.mq.jms.MQQueueConnectionFactory</ConnectionFactory>
+
    <ConnectionProperty Name="useExponentialBackOff">true</ConnectionProperty>
+
  </ConnectionConfig>
+
 
+
</QueueWorkerConnectionsConfig>
+
</source>
+
 
+
 
+
== JMS Message Properties ==
+
Each JMS message may contain properties. In SMILA the following properties are predefined and are automatically included with every message:
+
* <b>DataSourceID</b> - the data source id
+
* <b>Operation</b> - the kind of operation (<tt>ADD</tt>, <tt>DELETE</tt> or <tt>NONE</tt>
+
 
+
SMILA also supports user defined message properties. These are simply added to each record by named values of the predefined Annotation <tt>MessageProperties</tt>. If a record contains the annotation <tt>MessageProperties</tt> each named value of it is converted to a String message property.
+
 
+
It's also possible to add additional properties with static values in the <tt>Send</tt> task of your Router or Listener configuration (see [[#Send|Send task]]below).
+
 
+
 
+
== Router ==
+
 
+
The Router is one of the QueueWorker services. The main goal is to put record into one or more JMS queues (which are hosted/provided by a JMS broker). This happens within the [[SMILA/Documentation/QueueWorker#Send|Send task]]. The Router looks for every record that was crawled by a rule that fits with its [[SMILA/Documentation/QueueWorker#Condition|condition]].
+
But it is also possible to configure the Router only with a [[SMILA/Documentation/QueueWorker#Process|Process task]], which will effectively make the MQ obsolete which can be useful in small and resource-limited environments. In that case the router will directly put the record into the pipeline.
+
 
+
=== Configuration ===
+
The configuration file is located in configuration/org.eclipse.smila.connectivity.queue.worker/QueueWorkerRouterConfig.xml.
+
It simply consists of a list of routing rules.
+
 
+
Default configuration:
+
<source lang="xml">
+
<QueueWorkerRouterConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <Rule Name="Default Route Rule">
+
    <Condition></Condition>
+
    <Task>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
+
    </Task>
+
  </Rule>
+
 
+
</QueueWorkerRouterConfig>
+
</source>
+
 
+
The schema is located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.
+
 
+
=== Example ===
+
This example routes each record to more than one queue by defining several Send tasks:
+
 
+
<source lang="xml">
+
<QueueWorkerRouterConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <Rule Name="Double Send Route Rule">
+
    <Condition></Condition>
+
    <Task>
+
      <Send BrokerId="broker1" Queue="queue1" RecordFilter="nothing"/>
+
      <Send BrokerId="broker2" Queue="queue2" RecordFilter="nothing"/>
+
    </Task>
+
  </Rule>
+
 
+
</QueueWorkerRouterConfig>
+
</source>
+
 
+
A detailed description of the Routing Rules can be found [[SMILA/Documentation/QueueWorker#In_Detail:_QueueWorker_Rules,_Conditions_and_Tasks|here]].
+
 
+
 
+
== Listener ==
+
 
+
The Listener is another QueueWorker component. This component can be used to “listen” for specific records on specific queues. If a record matching the condition of a rule exists the Listener takes the record from the queue and pushed it to BPEL pipelines. Generally the Listener works with the [[SMILA/Documentation/QueueWorker#Process|Process task]], but it’s also possible to use [[SMILA/Documentation/QueueWorker#Send|Send tasks]] maybe for resending a record into a new JMS queue after the pipeline has finished.
+
 
+
=== Configuration ===
+
 
+
The configuration is nearly the same as the Router Configuration. There are only these following add-ons:
+
* ''Source'' element: a source reference to the used broker (BrokerId should be specified in Broker Connection configuration)
+
* ''WaitMassageTime'' attribute: timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
+
* ''MaxMessageBlockSize'' attribute: maximum number of messages to receive from the queue in one session. If less messages are available those are consumed without waiting for more messages. Default value is 1 message.
+
* ''Threads'' attribute: startup number of threads which listen to the queue under this rule, default value is 1 thread.
+
 
+
Default configuration:
+
<source lang="xml">
+
<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <Rule Name="ADD Rule" WaitMessageTimeout="10" Threads="4" MaxMessageBlockSize="20">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD' and NOT(DataSourceID LIKE '%feeds%')</Condition>
+
    <Task>
+
      <Process Workflow="AddPipeline"/>
+
    </Task>
+
  </Rule>
+
 
+
  <Rule Name="Delete Rule" WaitMessageTimeout="10" Threads="2" MaxMessageBlockSize="20">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='DELETE'</Condition>
+
    <Task>
+
      <Process Workflow="DeletePipeline"/>
+
    </Task>
+
  </Rule>
+
 
+
  <Rule Name="ADD Feed Rule" WaitMessageTimeout="10" Threads="2" MaxMessageBlockSize="20">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD' and DataSourceID LIKE '%feeds%'</Condition>
+
    <Task>
+
      <Process Workflow="AddFeedPipeline"/>
+
    </Task>
+
  </Rule>
+
   
+
</QueueWorkerListenerConfig>
+
</source>
+
 
+
There is a schema located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.
+
 
+
=== Example ===
+
An example for a preprocessing workflow using more than one queue can be found in the [[SMILA/Documentation/QueueWorker#Tips|Tips section]].
+
 
+
A detailed description of the Listener Rules can be found [[SMILA/Documentation/QueueWorker#In_Detail:_QueueWorker_Rules,_Conditions_and_Tasks|here]].
+
 
+
 
+
== RecordRecycler ==
+
A RecordRecycler is a special QueueWorker component which won't be needed in a SMILA standard scenario. You can use it if you need to reprocess (re-index) your data without accessing the original data sources again.
+
 
+
The current RecordRecycler is identical to the Router with the exception that RecordRecycler gets records from RecordStorage while the router gets them from the Connectivity. Thus it can be used to take records from the RecordStore and processes them (again) without having to recrawl the data sources.
+
 
+
=== Configuration ===
+
The RecordRecycler is high configurable with the xml configuration files in configuration/org.eclipse.smila.connectivity.queue.worker/recyclers. The RecordRecycler is able to execute any Router/Listener specific task.
+
 
+
Default Configuration (recyclers1.xml):
+
<source lang="xml">
+
<QueueWorkerRecordRecyclerJob xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="../schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <Rule Name="First recycle Rule">
+
    <Condition>DataSourceID='myDataSource'</Condition>
+
    <Task>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
+
    </Task>
+
  </Rule>
+
 
+
  <Rule Name="Second recycle Rule">
+
    <Condition>NOT(DataSourceID='myDataSource')</Condition>
+
    <Task>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
+
        <SetProperty Name="doAdd">no</SetProperty>
+
      </Send>
+
    </Task>
+
  </Rule>
+
 
+
</QueueWorkerRecordRecyclerJob>
+
</source>
+
 
+
There is a schema located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.
+
 
+
A detailed description of the Rules can be found [[SMILA/Documentation/QueueWorker#In_Detail:_QueueWorker_Rules,_Conditions_and_Tasks|here]].
+
 
+
=== Usage of JMX Management Agent ===
+
The JMX Agent name is org.eclipse.smila.connectivity.queue.worker.RecordRecycler.
+
The following commands are available to be executed:
+
{| {{Greytable}}
+
! Signature
+
! Description
+
! Example
+
|-
+
| startRecycle (String configurationID, String dataSourceID)
+
| starts the recycle process
+
| startRecycle (recycler1, file)
+
|-
+
| stopRecycle (String dataSource ID)
+
| stops the recycle process
+
| stopRecycle (file)
+
|-
+
| getRecordsRecycled (String dataSourceID)
+
| gets the number of recycled records
+
| getRecrodsRecycled (dataSourceID)
+
|-
+
| getStatus (String dataSourceID)
+
| shows the current status of recycle process
+
| getStatus (file)
+
|-
+
| getConfigurations()
+
| lists all available recycler configurations
+
|
+
|}
+
 
+
 
+
== In Detail: QueueWorker Rules, Conditions and Tasks ==
+
 
+
QueueWorker Rules (used by Router-, Listener- and Recycler configuration) consist of:
+
* Rule attributes
+
* Source (Listener configuration only)
+
* Condition
+
* Tasks (Send/Process)
+
 
+
Here's an example of a Listener Rule:
+
<source lang="xml">
+
<Rule Name="ADD Rule" WaitMessageTimeout="10" Threads="2">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD'</Condition>
+
    <Task>
+
      <Process Workflow="AddPipeline"/>
+
    </Task>
+
</Rule>
+
</source>
+
 
+
Its elements will be explained in detail in the following sections.
+
 
+
=== Rule attributes ===
+
 
+
In the Rule element you can specify the following attributes:
+
 
+
{|{{Greytable}}
+
! Attribute
+
! Description
+
|-
+
| Name
+
| (unique) name of the rule
+
|-
+
| WaitMassageTime (Listener only)
+
| timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
+
|-
+
| Threads (Listener only)
+
| startup number of threads which listen to the queue under this rule, default value is 2 threads.
+
|}
+
 
+
=== Source ===
+
 
+
The Source element is only valid for the Listener configuration. It specifies a reference to the used broker (queue) connection. The given BrokerId must exist in the JMS Broker Connection configuration.
+
 
+
=== Condition ===
+
 
+
Conditions are elements of rules which are used by Router, Listener and RecordRecycler. A condition is a string which syntax is based on a subset of the SQL92 conditional expression syntax as ref'ed in the "Message Selector Syntax" section of [[http://java.sun.com/products/jms/docs.html|JMS specification]], and is applied against the JMS properties of an MQ entry. Within SMILA basically the two JMS properties, Operation and DataSourceID are supported, but generally every JMS property can be tested, including your own.
+
An empty condition element represents an condition that is always true.
+
 
+
Router and Listener use the rules and their conditions differently:
+
 
+
''' Router / RecordRecycler '''<br/>
+
The primary role of the router is to put JMS records into the desired MQ. The target MQ is defines in the Send-Task, which record is defined by the conditions of the rule. Other than the Listener the router will consult all rules sequentially from top bottom for any given record and execute the task for the first matching rule. It is therefore a good idea to define as a last rule with an empty condition and send records that did not match any rules this far into a special queue, such as the dead letter queue.
+
 
+
''' Listener '''<br/>
+
The primary role of the Listener is to take records from the MQ and carry out the associated task. Hence the conditions are used as a "select clause" on all records of the queue. The task is then executed on one arbitrary record that matches the condition. Note that a listener creates at least one thread for each rule as defined by the Threads attribute at the rule. Hence the rules are executed concurrently. If therefore the conditions for diff. rules (and thus tasks) are not disjunctive, it is not deterministic which task is executed for which record. This is usually not desired so care must be taken when designing the rules.
+
 
+
''' Examples '''<br/>
+
{| {{Greytable}}
+
! Condition sample
+
! Description
+
|-
+
| <empty>
+
| empty conditions (<Condition></Condition>) are always true
+
|-
+
| Operation='ADD'
+
| if Operation equals to ADD
+
|-
+
| DataSourceID LIKE 'web%'
+
| if dataSourceId begins with web
+
|-
+
| NOT(DataSourceID LIKE 'web%'
+
| if dataSourceId does not start with web
+
|}
+
 
+
=== Tasks ===
+
 
+
There are two pre-defined tasks <Send> and <Process>. The QueueWorker components Router and Listener use these tasks to do their job. The Router usually uses the <Send> task to send records (which contain one crawled entry) to a queue. The Listener uses the <Process> task to take a Record from a queue and feed it into a BPEL workflow for processing this record. At the end of the process it may create a <Send> task again for putting the record result (back) into a queue. The Queue Worker provides all tasks and every component has the possibility to execute each of these tasks.
+
 
+
==== Process ====
+
The Process task is only a plain tag with the attribute “Workflow”. This task takes a record from JMS broker and pushes it to a BPEL pipeline.
+
 
+
{| {{Greytable}}
+
! Attribute
+
! Description
+
|-
+
| Workflow
+
| the BPEL pipeline/workflow to execute.
+
|}
+
 
+
Example:
+
<source lang="xml">
+
<Process Workflow="AddPipeline"/>
+
</source>
+
 
+
==== Send ====
+
The Send task is used to send a Record to a JMS queue. The <Send> element may have the following attributes:
+
 
+
{| {{Greytable}}
+
! Attribute
+
! Requirement
+
! Default value
+
! Description
+
|-
+
| BrokerId
+
| required
+
|
+
| Id of the broker specified in JMS Broker Connection
+
|-
+
| Queue
+
| required
+
|
+
| target JMS queue
+
|-
+
| RecordFilter
+
| optional
+
|
+
| attribute to set the record filter used to downsize the record (see description below). If no RecordFilter attribute is set, the full record (including the attachments) will be send to the queue).
+
|-
+
| PersistentDelivery
+
| optional
+
| true
+
| attribute correspondent to JMS delivery mode. If it is set to true (by default), message delivery will be persistent, otherwise not
+
|-
+
| WithAttachments
+
| optional
+
| false
+
| used to get attachments from binary storage back into record after filtering them out.  This might be needed in clustering scenarios where the MQ is used to transport the records to diff. nodes. By default this att. it's false (means that attachments are not attached to the Record)
+
|}
+
 
+
The Send element may contain any number of SetProperty elements. Each SetProperty element will add a same named and valued JMS Property to the MQ Entry of the record being put into the queue. These are in addition to any predefined or dynamically set JMS properties (see [[#JMS Message Properties|JMS Message Properties]]).
+
 
+
Example:
+
 
+
<source lang="xml">
+
<Send BrokerId="broker1" Queue="SMILA.connectivity"  RecordFilter="filterName" PersistentDelivery="false" WithAttachments="true">
+
  <SetProperty Name="doSomething">some value</SetProperty>
+
  <SetProperty Name="doSomethingElse">some other value</SetProperty>
+
</Send>
+
</source>
+
 
+
===== RecordFilter =====
+
A Record filter, as the name suggest, filters attributes of a record when sending it to a queue.
+
Often a record is complicated object with many attributes holding the metadata but also the binary content of the crawled data. As a consequence the serialized binary size of the record can be very large and would need to be handled (stored!) by the MQ.  Further, most attributes are only needed once or twice during processing which makes it less desirable to always carrying them around in the MQ.
+
To lighten the record of such less needed data the records can be stripped of these, containing only the relevant information.  However, keep in mind that stripped data must be fetched from the record or binary store when not held in memory which incurs some performance penalty.
+
A Record Filter will always cut the attachment off from a record. (If you dont use a RecordFilter (see Router/Listener Configuration), the attachments will stay)
+
 
+
The filters are defined in an xml config file. Each filter is identified by its name attribute. Every filter specifies copy-rules for attributes/annotations. Because the RecordFilter is invoked via [[SMILA/Documentation/Usage_of_Blackboard_Service|Blackboard service]] the configuration file is located in configuration/org.eclipse.smila.blackboard/RecordFilter.xml
+
 
+
Default Configuration:
+
 
+
<source lang="xml">
+
<RecordFilters
+
  xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance
+
  xsi:noNamespaceSchemaLocation="../org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd">
+
  <Filter name="workflow-object">   
+
    <Attribute name="MimeType" keepAnnotations="false"/>             
+
  </Filter>
+
  <Filter name="only-attributes">
+
    <Attribute name="*"/>
+
  </Filter>
+
  <Filter name="no-filter">
+
    <Attribute name="*" keepAnnotations="true"/>
+
    <Annotation name="*"/>
+
  </Filter>
+
  <Filter name="nothing"/>
+
</RecordFilters>
+
</source>
+
 
+
Only the Send-task may use a RecordFilter and is selected by its name. The given attribute/annotation tags decide which attributes and annotation will be copied to the record pushed into the MQ. The name attribute, is a selector for the attribute or annotation and supports '*' as a wildcard. The keepAnnotations attribute is a flag to copy internal attribute annotations or not. A RecordFilterNotFoundException will be thrown if no filter is chosen or a given filter is not found.
+
 
+
Example:
+
 
+
<source lang="xml">
+
<RecordFilters>
+
  <Filter name="only-attributes">
+
    <Attribute name="*"/>
+
  </Filter>
+
<!-- all attributes like MimeType, Size and so on but without sub-annotations. -->
+
  <Filter name="no-filter">
+
    <Attribute name="*" keepAnnotations="true"/>
+
    <Annotation name="*"/>
+
  </Filter>
+
<!-- a full copy of the record will be created. -->
+
  <Filter name="nothing"/>
+
<!-- a record only containing the id is copied. -->
+
  <Filter name="only-attribute1">
+
    <Attribute name="attribute1"/>
+
  </Filter>
+
<!-- only a specified attributes is copied. -->
+
  <Filter name="filter-single-and-datetime">
+
    <Attribute name="single value"/>
+
    <Attribute name="datetime value"/>
+
  </Filter>
+
<!-- only single and datetime values are copied. -->
+
</RecordFilters>
+
</source>
+
 
+
There is a schema available at org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd
+
 
+
== Tips ==
+
 
+
=== Persistent Queues ===
+
 
+
Queue persistence can be configured in one of the following ways:
+
 
+
1. ''persistent'' parameter in java.naming.provider.url (configuration/org.eclipse.smila.connectivity.queue.broker.main/jms.properties)
+
 
+
<source lang=text>java.naming.provider.url=vm:(broker:(tcp://localhost:61616?jms.useAsyncSend=true)?persistent=true)?marshal=false</source>
+
 
+
This is a global setting which will be applied to all queues.
+
 
+
2. ''PersistentDelivery'' attribute in Send Task (Router/Recycler/Listener configuration)
+
 
+
<source lang=xml><Send BrokerId="broker1" Queue="SMILA.connectivity"  PersistentDelivery="false"/></source>
+
 
+
By setting ''PersistentDelivery=true/false'' you can define persistence handling on a single message granularity.
+
 
+
'''Hint on ActiveMQ:''' If persistence is disabled in the jms.properties, this cannot be overwritten by PersistentDelivery=true in the Send task. On the other hand, an enabled persistence in the jms.properties may be overwritten by setting PersistentDelivery=false.
+
 
+
=== Redelivery and Dead Letter Queue ===
+
 
+
You can define how a broker (connection) should handle failures during message delivery, e.g. when processing a record causes an exception in a BPEL pipeline.
+
 
+
<source lang=xml>
+
<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue">
+
...
+
<URL>vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=2</URL>
+
...
+
</QueueWorkerConnectionsConfig>
+
</source>
+
 
+
''maximumRedeliveries=2'' means, that in case of a failure the record will be delivered to the pipeline two more times. If that fails again, the record will be sent to the Dead Letter Queue (DLQ).
+
 
+
'''Hints on ActiveMQ:'''
+
* In ActiveMQ the DLQ will be created dynamically (if needed) and named "ActiveMQ.DLQ" per default. You can access this queue like any other queue, e.g. to define a special processing for failed records.
+
* As defined in ActiveMQ DLQ default settings (DeadLetterStrategy), non-persistent messages will not be sent to the DLQ.
+
 
+
=== Automatic reconnect ===
+
 
+
When using ActiveMQ as JMS provider, you can define that connections should automatically be reconnected after being interrupted, e.g. because of a network failure. This can be done by using the '''failover''' protocol in the connection configuration:
+
 
+
<source lang=xml>
+
<QueueWorkerConnectionsConfig xmlns="http://www.eclipse.org/smila/queue">
+
...
+
<URL>failover:(tcp://10.0.0.1:61617)</URL>
+
...
+
</QueueWorkerConnectionsConfig>
+
</source>
+
 
+
'''Hint:''' The failover protocol can also be used to provide alternative connections that may substitute the current connection if it fails. See ActiveMQ documentation for details.
+
 
+
=== Preprocessing by using JMS properties ===
+
 
+
As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where a preprocessing has to be done, i.e. the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.
+
 
+
As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.
+
 
+
A complete use case of own parameters and different listeners in multiple steps:
+
 
+
<source lang="xml">
+
<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <!--  detects document mime type and send back to queue -->
+
  <Rule Name="Detect MIME type Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD' and NOT(detected='true')</Condition>
+
    <Task>
+
      <Process Workflow="DetectMimeTypePipeline"/>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
+
        <SetProperty name="detected" value="true"/>
+
      </Send>
+
    </Task>
+
  </Rule>
+
 
+
  <!--  converts documents and send back to queue -->
+
  <Rule Name="Convert Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD' and detected='true' and NOT(converted='true')</Condition>
+
    <Task>
+
      <Process Workflow="ConvertPipeline"/>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
+
        <SetProperty name="detected" value="true"/>
+
        <SetProperty name="converted" value="true"/>
+
      </Send>
+
    </Task>
+
  </Rule>
+
 
+
  <!--  add documents to index -->
+
  <Rule Name="ADD Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD' and converted='true'</Condition>
+
    <Task>
+
      <Process Workflow="AddPipeline"/>
+
    </Task>
+
  </Rule>
+
 
+
</QueueWorkerListenerConfig>
+
</source>
+
 
+
=== Preprocessing by using several queues ===
+
 
+
A similar preprocessing scenario like the one described before can be established without JMS parameters by using more than one queue:
+
 
+
<source lang="xml">
+
<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
+
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
+
>
+
 
+
  <!--  detects document mime type and sends to next queue -->
+
  <Rule Name="Detect MIME type Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
+
    <Condition>Operation='ADD'</Condition>
+
    <Task>
+
      <Process Workflow="DetectMimeTypePipeline"/>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity-2" RecordFilter="nothing"/>
+
    </Task>
+
  </Rule>
+
 
+
  <!--  converts documents and sends to next queue -->
+
  <Rule Name="Convert Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity-2"/>
+
    <Condition>Operation='ADD'</Condition>
+
    <Task>
+
      <Process Workflow="ConvertPipeline"/>
+
      <Send BrokerId="broker1" Queue="SMILA.connectivity-3" RecordFilter="nothing"/>
+
    </Task>
+
  </Rule>
+
 
+
  <!--  add documents to index -->
+
  <Rule Name="ADD Rule">
+
    <Source BrokerId="broker1" Queue="SMILA.connectivity-3"/>
+
    <Condition>Operation='ADD'</Condition>
+
    <Task>
+
      <Process Workflow="AddPipeline"/>
+
    </Task>
+
  </Rule>
+
 
+
</QueueWorkerListenerConfig>
+
</source>
+
 
+
 
+
 
+
== Useful Links ==
+
 
+
# [[http://java.sun.com/products/jms/docs.html  JMS specification (JSR 914)]]
+
# [[http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/Message.html  JMS API SDK documentation]]
+
# [[http://activemq.apache.org  Active MQ]]
+
# [[http://activemq.apache.org/jndi-support.html  Active MQ – JNDI Support]]
+
# [[http://activemq.apache.org/vm-transport-reference.html  Active MQ – VM Transport Reference]]
+
 
+
[[Category:SMILA]]
+

Latest revision as of 05:59, 24 January 2012

Note.png
This has been removed in SMILA 0.9 by the JobManager framework

Back to the top