Using JobManager to run imports
The idea is to apply the jobmanagement framework for doing crawl jobs, too. The advantages are:
- we don't need a separate execution framework for crawling anymore
- integrators can use same programming model for creating crawler components than for processing workers.
- same control and monitoring APIs for crawling and processing
- better performance through inherent asynchronicity
- better error tolerance through inherent failsafety
- Parallelization of crawling process possible
We can reach this goal by splitting up the crawl process into several workers. Basically, a crawling workflow always looks like this:
Workers with names starting with "(DS)" are specific for the crawled data source type. E.g. to crawl a file system you apparently need a different crawler worker than for a web server. Not each component may be necessary for each data source type, and it is possible to adapt components to add or remove functionality.
The crawling job is separated from the processing (e.g. indexing) workflow. A final worker in the crawl workflow pushes all records to the other workflow. This makes it possible to have several datasources being crawled into a single index. Also, in update crawl runs it is easier to detect when the actual crawling is done so that it can be determined which records have to be deleted because they were not visited in this run. We assume that the processing job is just running all the time.
- Crawler: (data source specific, see File Crawler and Web Crawler)
- two output slots:
- one for crawled resources (files, web pages), that match configurable filters (name patterns, extensions, size, mimetype, ...)
- one for resources still to crawl (outgoing links, sub-directories). This one leads to follow-up tasks for the same worker.
- The worker can create multiple output bulks for each slot per task so that the following workers can parallelize better.
- In general, it doesn't get content of a resource, but only the path or URL (or whatever identifies it) and metadata of the resources, to minimize IO load especially during "update runs" where most of the resources have not changed and therefore need not need to be fetched.
- If it has to fetch the content anyway (e.g. Web Crawler has to parse HTML to find follow-up links), it may add it to the crawled records to prevent additional fetching.
- The crawler can use or contain a "VisitedLinks" service if the data source can have cycles or multiple paths to the same resource (typical: web crawler) so that it does not produce multiple records that represent the same resource.
- Identifies compound objects: Set a special attribute on compound objects so that they can be routed for extraction later in the workflow. See below for details on compound handling.
- two output slots:
- Checks with DeltaService whether a resource is new in the data source or has been changed since the last run, depending on some of the metadata produced by the crawler (modification date from file system or HTTP headers).
- If resource has not changed since the last run, this is marked in DeltaService and the record is not written to the output bulk.
- Else the record is written to the output bulk (an additional attribute describes if it's a new record or one to update) and must be pushed to the processing workflow in the end.
- The DeltaChecker can write identified compound objects to a separate output slot to route them to a extractor worker.
- Fetcher: (data source specific, see File Fetcher and Web Fetcher)
- Worker that gets the content of the resource, if the record does not contain it already.
- Detect compounds (like archive files (zip, tgz), for example) and does not fetch the content, but just copy the records containing the IDs (URL/file name wtc.) to a compound output bulk for later extraction, as we do not want to put extremely large compound objects into bulks.
- Compound extractor: (data source specific)
- for handling compounds: fetch the compound data to a local temp filesystem, extract it and add the records to output bulks, just like the ones written by the fetcher.
- The compound extractor is data source specific for two reason:
- it must access the data source itself to fetch the compound content because we want to avoid the overhead of putting really large objects in the workflow bulks.
- it must produce records that look as if they were produced by the corresponding data source crawler and fetcher workers, so it must understand the same configuration.
- Update Pusher:
- Push resulting records to BulkBuilder and mark them as updated in the DeltaService. To prevent duplicates and to skip unchanged objects extracted from compounds, it checks the DeltaService again if this check is enabled. In the completion phase of the job run, it can performs the "delta-delete" operation (see below).
The crawl job is started as a runOnce job, i.e. the jobmanager creates an initial task that causes the crawler to start crawling (data source configuration and start links would be given as job parameters, we may need some additional component to manage data source configurations), which creates follow-up crawl and record tasks. When all tasks have been processed the job run finishes automatically (because it was started in runOnce mode).
Optionally, a final "completion run" can be triggered that causes the UpdatePusher to examine the DeltaService for records that have not been visisted in this job run, because they have been removed from the data source and therefore should be removed from the target of the import process, too. For each of these records one "to-delete" record is pushed to the BulkBuilder. To support an efficient delta-delete on large data sources it will be necessary to parallelize this "delta-delete" operation, because it could take rather long to scan the complete DeltaService and create the to-delete records sequentially. To support this the DeltaService should put the state entries for one source in different partitions or shards that can be scanned independently. Then one task can be created for each shard to scan it for deleted records, and they can be processed in parallel by multiple instances of the UpdatePusher.
The "delta-delete" run is not triggered if there have been tasks with fatal errors before in the crawl phase of the job run. Therefore it cannot happen that the complete import target is cleared after nothing could be imported from the data source due to errors.
The usage of the DeltaService is determined by a job parameter named deltaImportStrategy. It can have one of four values:
- disabled: the DeltaService is disabled, no entries are written for imported records, and no delta-delete is performed. This reduces the import time if the delta information is not needed for the application. However, as no initial delta information is recorded in this mode, switching to another mode for incremental updates does usually not make sense.
- initial: the delta information for each imported record is written to the DeltaService, but neither a check of the already existing information nor the delta-delete is performed. This is useful to make an initial import faster. Afterwards incremental updates can be done using one of the following modes.
- additive: existing delta information is checked to prevent unchanged objects from being re-imported unnecessarily, and the delta information for all imported records is written. Only the final delta-delete step is ommitted. This is useful if objects removed from the data source do not need to be removed from the import target, too.
- full: This is the default mode with the aim to keep the import target in sync with the data source: delta information is checked and recorded, and the delta-delete is done.
If one of the first two values (disabled and initial) is used, the DeltaChecker worker just writes every records from the input bulk to the output bulk without really doing anything. Thus, to improve the performance even more, for jobs using these modes the DeltaChecker worker can be removed from the workflow completely and the output of the crawler worker can be processed by the fetcher worker immediately.
Compounds are objects that consists of many smaller objects that should be imported as individual objects. A typical examples are archive files like ZIP or TAR.GZ files that contain a lot of documents that should be indexed, another type would be an email with attachments. To import these compound elements they must be extracted from the compound objects. This can be a recursive process as a compound can contain compound objects again. To prevent that the extraction process thwarts the import of "ordinary" objects we decided to handle them in specialized workers, not in the crawler or fetcher workers themselves. These workers are called extractor workers.
Extractor workers are data source specific for two reasons:
- They must access the data source to fetch the content. Because compounds can be very large, we want to prevent the overhead of writing the to the workflow bulks before extracting them.
- They must produce records that look like records procuced by the corresponding crawler so that they can be processed in the same way. So the extractor must understand the same configuration as the crawler and fetcher workers.
However, the actual extraction process is usually generic, so we provide a generic Compound Extractor Service that can be shared by the specific extraction workers. There is also a generic base class for such workers that should make implementing a extractor worker for new data source types easy.
Therefore the compound handling consists of two parts:
- Identification: to be able to handle compounds seperately in the workflow they must be marked as being compounds. We expect this to be done in the crawler worker that can use metadata of the object to classify it as a compound, e.g. the filename (especially the extension) or the mimetype, if reported by the data source. The crawler worker can use methods of the Compound Extractor Service to decide if an object is a compound that is also supported by the extractor service. To mark an object as a compound the crawler sets the system attribute _isCompound to true. This is also necessary to handle compounds correctly in the delta-check-and-delete logic:
- if a compound has been removed from the data source, the import process must also delete all previously imported compound elements. Therefore compound elements must be stored with individual entries in the delta service.
- if a compound it self was not changed all previously imported elements must be marked as visited, too, so that they are not deleted afterwards: We do not want to fetch and extract the complete unchanged compound in this case just to update the element entries, of course.
- Extraction: Fetch the compound content and create an additional record for each compound element (make sure that the record ID is still unique for each element wrt. the data source). Contained compounds are extracted immediately, too. To support the delta logic, in each element record of the compound the system attribute _compoundId must be set to the record ID of the top-level compund object. Note that the DeltaService does currently not support the individual management of sub-compounds, so to handle the delta logic for contained compounds correctly they must all be extracted and checked in the UpdatePusher individually.
The branching of the workflow for compounds is currently done by the DeltaChecker: It has a second output slot named "updatedCompounds". If this slot is connected to a bucket, it will write all new and updated objects marked as being a compound object by the crawler to this slot instead of the original "updatedRecords" slot. However, this means that correct compound handling is currently only possible if the DeltaCheckerWorker is part of the workflow, even if is not needed to implement the chosen deltaImportStrategy.