Discussion about Processing Performance
This page contains some thoughts about issues of the current implementation and ideas to improve overall processing performance.
The Queue is the core component in regards to processing. It is filled by Routers and it's messages are consumed by Listeners and everything depends on a fast enqueuing and dequeuing of messages. The current implementation sends and receives single messages only. For each received message a BPEL pipeline is initialized and executed with a single record.
Send and Receive Queue messages in blocks
The components that provide the records to be sent (ConnectivityManager) and processed the received records (WorkflowProcessor, Pipelets/ProcessingServices) already offer the possibility to be executed not only with single records but with arrays of records (or record Ids). Therefore we should send/receive more than one single message. This can be achieved by sending/receiving multiple messages before committing the JMS session in the the Router and Listener implementations. Both should not work with fixed block sizes and wait for it to be filled but use dynamic block sizes (with a maximum size). One drawback of this optimization is that we do not get feedback (exceptions) on single records anymore. If an error occurs then processing of all records of the current block is aborted.
Daniel Stucky: I did a prototype implementation for the Listener and also tried to do it for the router. Some interfaces (e.g. TaskExecutionService, AbstractTask) had to be improved to support arrays of Record, Id and Operation objects. The ConnectivityManagerImpl had to be modified so that the incoming array of records is not processed record by record but passed in whole to the Router. The Router is very hard to implement, as records from different data sources may get mixed in a single message block to be sent. However, each record may get routed by a different rule (e.g. routed to different Queues). So we would need to manage buffers per rule and watch these buffers by separate threads in order to send the content of the buffer if it has reached it's maximum size or if a timeout has elapsed. This logic is not trivial. I did a quick hack implementation (for just one rule) and did some tests. Sending blocks of messages in one session does improve import performance but the gain is not in proportion to the effort needed to implement this behavior. So this should NOT be implementeed!!!
In contrast the Listener was easy to implement. Here are the result of the tests I made importing 5078 htmls files concerning the Listener changes:
|#records||Listener Block Size||BPEL invokes||runtime||improvement|
As we can see the number of BPEL invokes is drastically reduced. With a block size of 20 the minimal number of invokes would be 254. The actual number is still twice as much. This leads to the conclusion that the Listener wants to receive more messages from the Queue than are available.
CrawlerController using blocks of Records
Another option is to call ConnectivityManager.add(Record) with more than one record as it is currently implemented. Therefore the CrawlerController (to be more precise the class CrawlThread) has to buffer incoming records from the Crawler and send them in a bigger array to ConnectivityManager. The drawback is that if one record produces an error processing of the whole array is aborted. In addition the usage of memory is slightly increased.
Daniel Stucky: I also implemented another prototype doing the same tests as before with a array size > 1.
|#records||Add Block Size||Listener Block Size||BPEL invokes||runtime||improvement|
Of course this improvement is only beneficial if the Listener works with a block size > 1 !
In the Router the records metadata and attachments are persisted in the stores and a JMS message is created and sent to the Queue. The calling CrawlerController blocks execution until the call to ConnectivityManager.add(Record) returns. It seems a good practice to do the persisting and messaging in a separate Thread so that ConnectivityManager does not block the CrawlerController and it can continue to send the next block of records.
A major drawback is that the CrawlerController does not get any feedback whether the records have been successfully added to the Queue or not and so can't update the state in DeltaIndexingManager (set the visited flag). The ConnectivityManager thread in execution would have to use a callback on CrawlerController or directly communicate with DeltaIndexing (using the CrawlerControllers session) to update it.
Daniel Stucky: I did another prototype implementation to find out that either the number of threads increases drastically (about half the number of records sent) or (if limited to a maximum count) you have to synchronize add() calls. While making tests I could not perceive any performance gains, it actually got even slower.
This should NOT be implemented !!!
We should implement the following enhancements to gain a processing performance boost:
- make use of Record and Id arrays (prerequisite for all other enhancements)
- change the ConnectivityManager implementation to pass the array to the router
- change interfaces and implementations of TaskExecutionService, AbstractTask, etc.
- use a dynamic block size in the Listener when receiving messages
- use a dynamic block size in the Crawlhread
Of course some values should be configurable (maximum array/block sizes, timeouts).
If there are no objections I will create a bugzilla Id and implement these enhancements.