Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/QueueWorker"

(Example)
(Configuration)
Line 121: Line 121:
  
 
The configuration is nearly the same as RouterConfig. There are only these three following additional attributes:
 
The configuration is nearly the same as RouterConfig. There are only these three following additional attributes:
{{Greytable}}
+
{|{{Greytable}}
# <source lang="xml"><Source BrokerId=”broker1” Queue=”SMILA.connectivity”/></source>
+
| <source lang="xml"><Source BrokerId=”broker1” Queue=”SMILA.connectivity”/></source>
- a source reference to the used broker (BrokerId should be specified in JMS Broker Connection)
+
| a source reference to the used broker (BrokerId should be specified in JMS Broker Connection).
# WaitMassageTime
+
|-
- timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
+
| WaitMassageTime
# Threads  
+
| timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
- startup number of threads which listen to the queue under this rule, default value is 2 threads.
+
|-
 +
| Threads  
 +
| startup number of threads which listen to the queue under this rule, default value is 2 threads.
 +
|}
  
 
Default configuration:
 
Default configuration:

Revision as of 06:54, 24 March 2009

Introduction

The QueueWorker components provide a bridge between Connectivity and Osgi-Runtime Processes that are responsible to processing the crawled content. The Connectivity is responsible for interacting with crawlers, which in turn gather raw data information from various data sources and pack them into records. DFPs allow manipulating or post processing the crawled information in those records. In most cases the processed data is stored in some form of store or index at the end of a DFP. Because not all data is equal (e.g. post processed the same way, except in the case of most simple of setups), the QueueWorker components are used to determine which record needs to be feed into which DFP. It thus acts as router between the raw record and its post processing. To decide which DFP is the right for which records, the QW investigates the records properties and based on configured rules assigns the DFP. The QueueWorker components are developed for the usage of JMS (Java Message Service). SMILA is distributed with an embedded JMS-Queue: Apache Active MQ.

Workflow

QueueWorker Bundle

The QueueWorker contains different components/features to take records in and out of queues . They are located in bundle org.eclipse.smila.connectivity.queue.worker.


The main functions are to manage a set of services which are needed to receive records from the Connectivity Framework, filter them and push them into queues. The Listener component of the QueueWorker is used as part of an OSGI Runtime that process the crawled information. It takes entries from the queue and after processing puts them back to queue. The QueueWorker contains also a Recycler component that can take records (which usually contain crawled or manipulated information) from storages (usually a RecordStorage) and send them back to queues. All components are configured with “Rules” which execute “Tasks”.

JMS-Embedded Broker Configuration

JMS stands for Java Massage Service. The API is a messaging standard which allows applications written in Java to create, send, receive and read messages. In SMILA it is used by Router and Listener to send or get messages to/from Active MQ. There is a jms.properties file to configure the embedded Active MQ broker. It is located in configuration/org.eclipse.smila.connectivity.queue.broker.main

Default Configuration:

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=vm:(broker:(tcp://localhost:61616?jms.useAsyncSend=true)?persistent=true)?marshal=false
initialRedeliveryDelay=500L
redeliveryPolicy.maximumRedeliveries=2
redeliveryPolicy.backOffMultiplier=2
useExponentialBackOff=true

JMS Broker Connection

A Broker Connection provides a cached connection pool to JMS brokers. The QueueWorker components use Broker Connections to communicate with different Queues in different JMS brokers.

Configuration

The different components of the QueueWorker can use connections to different brokers. The connections are defined in a special configuration file. A configuration has a unique Id and contains a list of specific JMS properties like:

  1. URL to broker
  2. user and password for login
  3. the connection type (factory) of specific broker

Below is an example of the default configuration file. It is located in configuration/org.eclipse.smila.queue.worker/QueueWorkerConnectionConfig.xml

Default configuration(for the use of the embedded Broker):

<ConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <ConnectionConfig Id="broker1">
    <URL>tcp://localhost:61616</URL>
    <User>any</User>
    <Password>any</Password>
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
  </ConnectionConfig>
 
</ConnectionsConfig>

The schema for this configuration file is located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

Example

This example uses two completely independent brokers. Keep in mind that there should be a unique Id for every connection.

<ConnectionsConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <ConnectionConfig Id="broker1">
    <URL>tcp://localhost:61616</URL>
    <User>any</User>
    <Password>any</Password>
    <ConnectionFactory>org.apache.activemq.ActiveMQConnectionFactory</ConnectionFactory>
  </ConnectionConfig>
 
  <ConnectionConfig Id="broker2">
    <URL>tcp://10.0.0.1:61617</URL>
    <User>any1</User>
    <Password>any1</Password>
    <ConnectionFactory>com.ibm.mq.jms.MQQueueConnectionFactory</ConnectionFactory>
    <ConnectionProperty Name="useExponentialBackOff">true</ConnectionProperty>
  </ConnectionConfig>
 
</ConnectionsConfig>

Router

The Router is one of the QueueWorker services. The main goal is to put record into JMS queue (which is hosted/provided? by a JMS broker). This happens within the <Send> task. The Router looks for every record that was crawled by a rule that fits with its condition (see LINK:above condition). But it is also possible to configure the Router only with Process-Task which will effectively make the MQ obsolete which can be useful in small and resource-limited environments. In that case the router will directly put the record into the pipeline.

Configuration

The configuration file is located in configuration/org.eclipse.smila.connectivity.queue.worker/QueueWorkerRouterConfig.xml. It simply consists of a list of routing rules.

Default configuration:

<QueueWorkerRouterConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="Default Route Rule">
    <Condition></Condition>
    <Task> 
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
</QueueWorkerRouterConfig>

The schema is located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

<Condition> tag

The Router is used to send different records (e.g. those were crawled by different crawlers (have different DataSourceID) to different queues. The Condition Tag can be used to apply Rules to specifc Records.

Listener

What is a Listener?

The Listener is another QueueWorker component. This component can be used to “listen” for specific records on specific queues. If a record matching the condition of a rule exists the Listener takes the record from the queue and pushed it to BPEL pipelines. Generally the Listener works with the <Process> task, but it’s also possible to use other QueueWorker tasks maybe for resending a record into a new JMS queue after the pipeline has finished.

Configuration

The configuration is nearly the same as RouterConfig. There are only these three following additional attributes:

<Source BrokerId=”broker1” Queue=”SMILA.connectivity”/>
a source reference to the used broker (BrokerId should be specified in JMS Broker Connection).
WaitMassageTime timeout (in seconds) for attempts to pull JMS message from queue, default value is 1 second.
Threads startup number of threads which listen to the queue under this rule, default value is 2 threads.

Default configuration:

<QueueWorkerListenerConfig xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="schemas/QueueWorkerConfig.xsd"
>
  <Rule Name="ADD Rule" WaitMessageTimeout="10" Threads="2">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='ADD'</Condition>
    <Task>
      <Process Workflow="AddPipeline"/>
    </Task>
  </Rule>
 
  <Rule Name="Delete Rule" WaitMessageTimeout="10" Threads="2">
    <Source BrokerId="broker1" Queue="SMILA.connectivity"/>
    <Condition>Operation='DELETE'</Condition>
    <Task>
      <Process Workflow="DeletePipeline"/>
    </Task>
  </Rule>
 
</QueueWorkerListenerConfig>

There is a schema located in org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

<Condition> tag

The Condition tag is used for Listener in that way that the Listener only “listens” for records that apply to this condition. Usually the conditions are used to fit to JMS-Properties that were sent over the queue with the <Send> tag (see LINK: Send Tag).

Usually the condition is used to match the following JMS-Properties: - Operation = the operation to do. - DataSourceID = a data source like web or file For example this condition tags may be useful to change workflow for data file system and web (see 5 minutes to success -> changing workflow). More detailed information can be found on conditions page.

RecordFilter

A Record filter, as the name suggest, filters a record. Often a record is complicated object with many attributes holding the metadata but also the binary content of the crawled data. As a consequence the serialized binary size of the record can be very large and would need to be handled (stored!) by the MQ. Further, most attributes are only needed once or twice during processing which makes it less desirable to always carrying them around in the MQ. To lighten the record of such less needed data the records can be stripped of these, containing only the relevant information. However, keep in mind that stripped data must be fetched from the record or binary store when not held in memory which incurs some performance penalty.

Configuration

The filters are defined in an xml config file. Each filter is identified by its name attribute. Every filter specifies copy-rules for attributes/annotations. Because the RecordFilter is invoked by BlackboardService the configuration file is located in configuration/org.eclipse.smila.blackboard/RecordFilter.xml

Default Configuration:

<RecordFilters
  xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance
  xsi:noNamespaceSchemaLocation="../org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd">
  <Filter name="workflow-object">    
    <Attribute name="MimeType" keepAnnotations="false"/>              
  </Filter>
  <Filter name="only-attributes">
    <Attribute name="*"/>
  </Filter>
  <Filter name="no-filter">
    <Attribute name="*" keepAnnotations="true"/>
    <Annotation name="*"/>
  </Filter>
  <Filter name="nothing"/>
</RecordFilters>


Only the Send-task may use a RecordFilter and is selected by its name. The given attribute/annotation tags decide which attributes and annotation will be copied to the record pushed into the MQ. The name attribute, is a selector for the attribute or annotation and supports '*' as a wildcard. The keepAnnotations attribute is a flag to copy internal attribute annotations or not. A RecordFilterNotFoundException will be thrown and reported in the log if no filter is chosen or a given filter is not found.

Example

<RecordFilters>
  <Filter name="only-attributes">
    <Attribute name="*"/>
  </Filter>
<!-- all attributes like MimeType, Size and so on but without sub-annotations. -->
  <Filter name="no-filter">
    <Attribute name="*" keepAnnotations="true"/>
    <Annotation name="*"/>
  </Filter>
<!-- a full copy of the record will be created. -->
  <Filter name="nothing"/>
<!-- a record only containing the id is copied. -->
  <Filter name="only-attribute1">
    <Attribute name="attribute1"/>
  </Filter>
<!-- only a specified attributes is copied. -->
  <Filter name="filter-single-and-datetime">
    <Attribute name="single value"/>
    <Attribute name="datetime value"/>
  </Filter>
<!-- only single and datetime values are copied. -->
</RecordFilters>

There is a schema available at org.eclipse.smila.datamodel.tools/schemas/RecordFilters.xsd

RecordRecycler

The current RecordRecycler is identical to the Router with the exception that RecordRecycler gets records from RecordStorage while the router gets them from the Connectivity. Thus it can be used to take records from the RecordStore and processes them (again) without having to recrawl the data sources.

Configuration

The RecordRecycler is high configurable with the xml configuration files in configuration/org.eclipse.smila.connectivity.queue.worker/recyclers. The RecordRecycler is able to execute any Router/Listener specific task.

Default Configuration (recyclers1.xml):

<QueueWorkerRecordRecyclerJob xmlns="http://www.eclipse.org/smila/queue"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="../schemas/QueueWorkerConfig.xsd"
>
 
  <Rule Name="First recycle Rule">
    <Condition>DataSourceID='myDataSource'</Condition>
    <Task>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing"/>
    </Task>
  </Rule>
 
  <Rule Name="Second recycle Rule">
    <Condition>NOT(DataSourceID='myDataSource')</Condition>
    <Task>
      <Send BrokerId="broker1" Queue="SMILA.connectivity" RecordFilter="nothing">
        <SetProperty Name="doAdd">no</SetProperty>
      </Send>
    </Task>
  </Rule>
 
</QueueWorkerRecordRecyclerJob>

With the aid of <Condition> tag the DataSourceID can be defined. This information is used to found the exact processing rule. Just like Router/Listener <Condition> tag, detail information can be found on conditions page.

There is a schema located at org.eclipse.smila.connectivity.queue.worker/schemas/QueueWorkerConfig.xsd.

Usage of JMX Management Agent

The JMX Agent name is org.eclipse.smila.connectivity.queue.worker.RecordRecycler. The following commands are available to be executed: - startRecycle (String configurationID, String dataSourceID)

            starts the recycle process
            example: startRecycle (recycler1, file) 

- stopRecycle (String dataSource ID)

             stops the recycle process
             example: stopRecycle (file)

- getRecordsRecycled (String dataSourceID)

             gets the number of recycled records
             example: getRecrodsRecycled (dataSourceID)

- getconfiguration

              lists all available configuration files in the configuration folder

- getStatus (String dataSourceID)

              shows the current status of recycle process
              example: getStatus (file)


In Detail: QueueWorker Tasks

There are two pre-defined tasks <Send> and <Process>. The QueueWorker components Router and Listener use these tasks to do their job. The Router usually uses the <Send> task to send records (which contain one crawled entry) to a queue. The Listener uses the <Process> task to take a Record from a queue and feed it into a BPEL workflow for processing this record. At the end of the processes it creates a <Send> task for putting the record result (back) into a specific queue. Because of that, the Queue Worker provides all tasks and every component has the possibility to execute each of these tasks.

Send-task

This task is used to send a Record to a JMS queue. The <Send> element may have the following attributes: -BrokerId (required) = Id of the broker specified in JMS Broker Connection. - Queue (required) = the specified queue. - RecordFilter = attribute to set the record filter used to downsize the record. - PersistentDelivery = attribute correspondent to JMS delivery mode. If it is set to true (by default), message delivery will be persistent, otherwise not. - WithAttachments = used to get attachments from binary storage back into record after filtering them out. This might be needed in clustering scenarios where the MQ is used to transport the records to diff. nodes. By default this att. it's false (means that attachments are not attached to the Record).

he Send element may contain any number of SetProperty elements. Each SetProperty element will add a same named and valued JMS Property to the MQ Entry of the record being put into the queue. Additionally there are two JMS properties that are added by the SMILA Framework to each record, namely: DataSourceID and Operation. The first will contain the DataSourceID value and the second either ADD or DELETE.

Send-tag example

<Send BrokerId="broker1" Queue="SMILA.connectivity"  RecordFilter="filterName" PersistentDelivery="false" WithAttachments="true">
  <SetProperty Name="doSomething">some value</SetProperty>
  <SetProperty Name="doSomethingElse">some other value</SetProperty>
</Send>

Process-tag

he <Process> task is only a plain tag with the attribute “Workflow”. This task takes a record from JMS broker and pushes it to a BPEL pipeline. - Workflow = the BPEL pipeline/workflow to execute.

Process-tag example

<Process Workflow="AddPipeline"/>

QueueWorker Conditions

Conditions are attributes of rules which are used by Router, Listener and RecordRecycler. Each router, listener and recycler has his own configuration file which may contain several rules. A condition is a string which syntax is based on a subset of the SQL92 conditional expression syntax as ref'ed in the "Message Selector Syntax" section of [specification], and is applied against the JMS properties of an MQ entry. Within SMILA basically the two JMS properties, Operation and DataSourceID are supported, but generally every JMS property can be tested, including your own. An empty condition element represents an condition that is always true.

Router and Listener use the rules and their conditions differently:

Router / RecordRecycler

The primary role of the router is to put JMS records into the desired MQ. The target MQ is defines in the Send-Task, which record is defined by the conditions of the rule. Other than the Listener the router will consult all rules sequentially from top bottom for any given record and execute the task for the first matching rule. It is therefore a good idea to define as a last rule with an empty condition and send records that did not match any rules this far into a special queue, such as the dead letter queue.

Listener

The primary role of the Listener is to take records from the MQ and carry out the associated task. Hence the conditions are used as a "select clause" on all records of the queue. The task is then executed on one arbitrary record that matches the condition. Note that a listener creates at least one thread for each rule as defined by the Worker attribute at the rule. Hence the rules are executed concurrently. If therefore the conditions for diff. rules (and thus tasks) are not disjunctive, it is not deterministic which task is executed for which record. This is usually not desired so care must be taken when designing the rules.

Examples

- Operation='ADD' (= if operation=add) - DataSourceID LIKE 'web%' (= if dataSourceId begins with web…) - NOT(DataSourceID LIKE 'web%') (= if dataSourceId begins not with web…)

Tips

As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.

As described before, you can set with the Send-Task your own JMS Properties on a record. This can be used in conjunction with cascaded DFPs, where the result of one DFP is to be fed into another. Such a scenario is the case when diff. types of documents are crawled but processed basically the same after they have been converted to a common format. Here the records need to be marked with a JMS property according to their document type and be directed to the appropriate conversion DFP to do the conversion into the common format. The listener of these conversion DFPs then sends the JMS record back to the queue adding a new JMS property, with the name 'converted' and the value 'true'. Another DFP can select those records via its listener, whose conditions tests for this new JMS Property.

TODO


A complete use case of own parameters and different listeners in multiple steps is given on example of use page.

TODO

Useful Links

  1. [JMS specification (JSR 914)]
  2. [JMS API SDK documentation]
  3. [Active MQ]
  4. [Active MQ – JNDI Support]
  5. [Active MQ – VM Transport Reference]

Back to the top