Discussion about Processing Performance
This page contains some thoughts about issues of the current implementation and ideas to improve overall processing performance.
The Queue is the core component in regards to processing. It is filled by Routers and it's messages are consumed by Listeners and everything depends on a fast enqueuing and dequeuing of messages. The current implementation sends and receives single messages only. For each received message a BPEL pipeline is initialized and executed with a single record.
Send and Receive Queue messages in blocks
The components that provide the records to be sent (ConnectivityManager) and processed the received records (WorkflowProcessor, Pipelets/ProcessingServices) already offer the possibility to be executed not only with single records but with arrays of records (or record Ids). Therefore we should send/receive more than one single message. This can be achieved by sending/receiving multiple messages before committing the JMS session in the the Router and Listener implementations. Both should not work with fixed block sizes and wait for it to be filled but use dynamic block sizes (with a maximum size). One drawback of this optimization is that we do not get feedback (exceptions) on single records anymore. If an error occurs then processing of all records of the current block is aborted.
Daniel Stucky: I did a prototype implementation for the Listener and also tried to do it for the Router. Some interfaces (e.g. TaskExecutionService, AbstractTask) had to be improved to support arrays of Record, Id and Operation objects. The ConnectivityManagerImpl had to be modified so that the incoming array of records are not processed record by record but passed in whole to the Router.
The Router implementation gets a little more complicated, as records from different data sources may be contained in the passed Record array (this does currently not happened when used by the CrawlerController). So each record of the array may be routed by a different RouterRule (e.g. routed to different pipelines or even to different Queues). So we need to rearrange the records of the incoming array into arrays per RouterRule and send those new smaller arrays to the queue in one session. How many messages are sent in one session depends solely in the size of the array.
A more general approach to increase messages per session (one session per rule) would be to buffer messages per rule in the Send Task. We would have to watch these buffers by separate threads in order to send the content of the buffer if it has reached its maximum size or if a timeout has elapsed. This logic is not trivial. I did a quick hack implementation (for just one rule) and did some tests. Sending blocks of messages in one session does improve import performance but the gain is not in proportion to the effort needed to implement this behavior. There is already a component specified that could do the trick: the Buffer of the ConnectivityManager. It could be used to optimize the records passed top the Router (e.g. by combining all records of the same data source). So buffering in the Send Task should NOT be implemented!!!
In contrast the Listener was easier to implement, as you can receive messages from the Queue directly or using a timeout to wait for messages to arrive. And all received messages are processed by the same pipeline, so no extra checking is required. Here are the result of the tests I made importing 5078 html files concerning the Listener changes:
|#records||Listener Block Size||BPEL invokes||runtime||improvement|
As we can see the number of BPEL invokes is drastically reduced. With a block size of 20 the minimal number of invokes would be 254. The actual number is still twice as much. This leads to the conclusion that the Listener wants to receive more messages from the Queue than are available.
CrawlerController using blocks of Records
So the next option is to call ConnectivityManager.add(Record) with more than one record as it is currently implemented. Therefore the CrawlerController (to be more precise the class CrawlThread) has to buffer incoming records from the Crawler and send them in a bigger array to ConnectivityManager. The drawback is that if one record produces an error processing of the whole array is aborted. In addition the usage of memory is slightly increased.
Daniel Stucky: I also implemented another prototype doing the same tests as before with a array size > 1.
|#records||Add Block Size||Listener Block Size||BPEL invokes||runtime||improvement|
Of course this improvement is only beneficial if the Listener works with a block size > 1 !
In the Router the records metadata and attachments are persisted in the stores and a JMS message is created and sent to the Queue. The calling CrawlerController blocks execution until the call to ConnectivityManager.add(Record) returns. It seems a good practice to do the persisting and messaging in a separate Thread so that ConnectivityManager does not block the CrawlerController and it can continue to send the next block of records.
A major drawback is that the CrawlerController does not get any feedback whether the records have been successfully added to the Queue or not and so can't update the state in DeltaIndexingManager (set the visited flag). The ConnectivityManager thread in execution would have to use a callback on CrawlerController or directly communicate with DeltaIndexing (using the CrawlerControllers session) to update it.
Daniel Stucky: I did another prototype implementation to find out that either the number of threads increases drastically (about half the number of records sent) or (if limited to a maximum count) you have to synchronize add() calls. Compared to the already optimized version using blocks it got even slower, though the number of BPEL invokes was significantly lower.
|#records||Listener Block Size||BPEL invokes||runtime||method call||improvement|
So this should NOT be implemented !!!
We should implement the following enhancements to gain a processing performance boost:
- make use of Record and Id arrays (prerequisite for all other enhancements)
- change the ConnectivityManager implementation to pass the array to the router
- change interfaces and implementations of TaskExecutionService, AbstractTask, etc.
- build arrays per Router-Rule in the Router and sent those arrays in one session
- use a dynamic block size in the Listener when receiving messages (receive multiple messages per session)
- use a dynamic block size in the Crawlhread
Of course some values should be configurable (maximum array/block sizes, timeouts).
If there are no objections I will create a bugzilla Id and implement these enhancements.