Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/HowTo/How to add a new Data Source to the importing framework"

(What is the fetcher worker supposed to do?)
Line 1: Line 1:
{{note|Work in progress. Not yet finished!}}
 
 
 
This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see [[SMILA/Documentation/Importing/Concept|Importing Concept]] for more information about the framework).
 
This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see [[SMILA/Documentation/Importing/Concept|Importing Concept]] for more information about the framework).
  

Revision as of 05:08, 23 January 2012

This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see Importing Concept for more information about the framework).

The steps necessary to include the bundles and workers into the builds or launchers won't be covered here, as they are covered in detail in other how tos (see preconditions).

Preconditions

  • Set up your development environment, see How to set up the development environment.
  • You should have read and understood the documentation about the JobManager, especially the configuration of workers and workflows if you want to create new workers.
  • You should have at least an idea about the OSGi framework and OSGi services. For links to introductory articles and tutorials see [1]. For a quite comprehensive overview on OSGi see [2]. Especially, SMILA makes intensive use of OSGi's Declarative Services facility, so you may want to have at least a quick look at it.
  • You should already have gone through the How to write a Worker tutorial, since you need a Crawler and a Fetcher worker in order to be able to crawl a new Data Source.

Prepare the bundle

Please follow the How to create a bundle (plug-in) manual to create a new bundle.

Add the following bundles to the Imported Packages list:

  • org.eclipse.smila.datamodel: For the Record class.
  • org.eclipse.smila.objectstore: Possible exceptions when accessing input/output streams.
  • org.eclipse.smila.taskmanager: To access the Task.
  • org.eclipse.smila.taskworker: The TaskWorker bundle containing the Worker and TaskContext interfaces.
  • org.eclipse.smila.taskworker.input: Input streams of the TaskWorker bundle.
  • org.eclipse.smila.taskworker.output: Output streams of the TaskWorker bundle.
  • org.eclipse.smila.importing: The importing framework bundle.

You should also add a test bundle (see How to create a test bundle (plug-in)).

Writing the workers

You can also have a look at the two existing crawlers in SMILA, org.eclipse.smila.importing.crawler.file and org.eclipse.smila.importing.crawler.web.

The Crawler

The crawler worker is responsible to retrieve or produce the IDs (e.g. URLs etc.) to adress or identify the data in the data source.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the crawler worker supposed to do?

The crawler worker is supposed to do the following:

  • be invoked by the task generator when the crawl job ist started (as Run-Once job!)
  • optionally get some information about what to crawl (some seed id or base URL or SQL query or whatever)
  • iterate over the data source according to that information
  • and for each entry generate an output record
    • with the data source property set
    • with the id set (e.g. to the ID of the data sources data record, to make things easier)
    • optionally with the attribute _deltaHash (ImportingConstants.ATTRIBUTE_DELTA_HASH) set to some information that indicates if the data has been changed meanwhile (a hash over the content or a timestampt of the last modification etc.), so the delta checker can determine if the record has to be processed or the data in the index is up-to-date.
    • optionally with every other information it can easily gather for the data (e.g. for a file crawler these would be the file's metadata that are quickly available without actually reading the file).

So the worker could look something like the following:

public class WhatsoeverCrawlerWorker implements Worker {
 
	private static final String NAME = "whatsoeverCrawler";
 
	private static final String OUTPUT_SLOT = "output";
 
	private static final String PROPERTY_SEED = "seed";
 
	private static final int MAX_IDS_PER_BULK = 1024;
 
	private Log _log = LogFactory.getLog(getClass());
 
	/** {@inheritDoc} */
	@Override
	public void perform(TaskContext taskContext) throws Exception {
		final AnyMap taskParams = taskContext.getTaskParameters();
		final String dataSource = taskParams
				.getStringValue(ImportingConstants.TASK_PARAM_DATA_SOURCE);
		if (dataSource == null || dataSource.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '"
					+ ImportingConstants.TASK_PARAM_DATA_SOURCE + "' of task "
					+ taskContext.getTask().getTaskId() + " is null or empty");
		}
		final String seedId = taskParams.getStringValue(PROPERTY_SEED);
		if (seedId == null || seedId.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '" + PROPERTY_SEED
					+ "' of task " + taskContext.getTask().getTaskId()
					+ " is null or empty");
		}
 
		int recordCount = 0;
		int recordOutputIndex = 0;
		RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(
				OUTPUT_SLOT, recordOutputIndex);
		for (Record record : getRecordsBySeed(seedId, dataSource)) {
			recordOutput.writeRecord(record);
			recordCount++;
			if (_log.isDebugEnabled()) {
				_log.debug("added id " + record.getId());
			}
			if (recordCount % MAX_IDS_PER_BULK == 0) {
				recordOutput.commit();
				recordOutputIndex++;
				recordOutput = taskContext.getOutputs().getAsRecordOutput(
						OUTPUT_SLOT, recordOutputIndex);
			}
		}
		_log.info("Found " + recordCount + " records for seed id " + seedId + ".");
	}
 
	/**
	 * gets records from the data source, if possible fills the
	 * {@link ImportingConstants#ATTRIBUTE_DELTA_HASH} attribute for the delta
	 * checker to be able to determine if the record has to be updated/inserted
	 * at all.
	 * 
	 * @param seedId
	 *            the seed id to know where/what to crawl.
	 * @param dataSource
	 *            the data source to crawl.
	 * @return a list of records containing the ID of the data source's data and
	 *         optionally a delta hash.
	 */
	private List<Record> getRecordsBySeed(final String seedId, final String dataSource) {
		ArrayList<Record> recordsToCralw = new ArrayList<Record>();
 
		// iterate over the entries in the data source determined by the seed id
                while(...) {
                        // id: the id of the data
                        // lastModified: the last modified date of the record (omit if it cannot be determined)
			final Record record = DataFactory.DEFAULT.createRecord(id, dataSource);
			record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, lastModified);
			recordsToCralw.add(record);
		}
		return recordsToCralw;
	}
 
	/** {@inheritDoc} */
	@Override
	public String getName() {
		return NAME;
	}
}

The Fetcher

So now we've created bulks of records pointing to the data to be imported into SMILA, we now need a worker that actually fetches the data from the data source using the ids, the crawler provided.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the fetcher worker supposed to do?

  • Read the records sent from the crawler and filtered by the delta checker
  • get the data to be processed by SMILA out of the data source for each ID in the record bulk
  • compile records out of that data
  • hand that data over to the update pusher, which in turn hands it over to the import workflow (i.e. the bulk builder)
  • optionally (if supported) extract compounds or send them to a compound extractor worker to do so.

so the fetcher worker would look something like the follows, with the magic happening in the fetch(...) method, that has to be accessing the data source, retrieving the data and adding it as an attachment and filling other metadata as needed (you might have a look at the FileFetcherWorker or the web crawlers SimpleFetcher implementation for an inspiration).

public class WhatsoeverFetcherWorker implements Worker {
 
	private static final String NAME = "whatsoeverFetcher";
	private static final String INPUT_SLOT = "input";
	private static final String OUTPUT_SLOT = "output";
	protected final Log _log = LogFactory.getLog(getClass());
 
	/** {@inheritDoc} */
	@Override
	public void perform(TaskContext taskContext) throws Exception {
		final RecordInput recordInput = taskContext.getInputs()
				.getAsRecordInput(INPUT_SLOT);
		final RecordOutput recordOutput = taskContext.getOutputs()
				.getAsRecordOutput(OUTPUT_SLOT);
		Record record;
		do {
			record = recordInput.getRecord();
			if (record != null) {
				if (_log.isDebugEnabled()) {
					_log.debug("fetching content for record " + record.getId());
				}
				fetch(record, taskContext);
				recordOutput.writeRecord(record);
				if (_log.isDebugEnabled()) {
					_log.debug("added record " + record.getId());
				}
			}
		} while (record != null);
	}
 
	/**
	 * Actually retrieves the data from the source based on the ID of the record
	 * and fills in the record's meta data and/or attachments.
	 * 
	 * @param record
	 *            the record to be completed with information from the data
	 *            source
	 * @param taskContext
	 *            the tasks context.
	 */
	private void fetch(Record record, TaskContext taskContext) {
		final long time = taskContext.getTimestamp();
 
		// go and fetch the content and fill the record's content, metadata and/or
		// attachments with it.
		record.getMetadata().put(..., ...);
		...
 
		taskContext.measureTime("fetchContent", time);
	}
 
	/** {@inheritDoc} */
	@Override
	public String getName() {
		return NAME;
	}
}

Plugging it up

So, now we have to plug all together.

  • Write component definitions for your workers (and as well for your service if one is needed to access your data source).
  • Add the bundle to the launcher and the config.ini file.
  • add worker descriptions to the workers.json file for your workers, these could look something like the following code snippet.
    • note: our worker would not need an input slot, since it is only run in runOnce mode and it is not short-circuited (as could be for hierarchical data sources), but still there is an input slot defined here. Why? Currently there are restrictions to the jobmanager that run once jobs need an input bucket for their start actions, but that might be changed in the future.
    • also note: we need the task generator here for the runOnce triggering!
{ 
  "name": "whatsoeverCrawler",
  "taskGenerator":"runOnceTrigger",
  "parameters":[
    {
      "name":"dataSource"
    },
    {
      "name":"seed"
    }
  ],
  "input": [ 
    {   "name": "input",
        "type": "recordBulks"
    } ],
  "output": [ 
    {   "name": "output",
        "type": "recordBulks",
        "modes":[
            "maybeEmpty",
            "multiple"
        ]
    } ]
},
{ 
  "name": "whatsoeverFetcher",
  "input": [ 
    {	"name": "input",
	"type": "recordBulks"
    } ],
  "output": [ 
    {	"name": "output",
	"type": "recordBulks"
    } ]
}
  • add the workers to a sensible workflow like e.g.
    {
      "name":"whatsoeverCrawling",
      "startAction":{
        "worker":"whatsoeverCrawler",
        "input": {
          "input": "dummyInput"
        },
        "output":{
          "output":"somethingToCrawlBucket"
        }
      },
      "actions":[
        {
          "worker":"deltaChecker",
          "input":{
            "recordsToCheck":"somethingToCrawlBucket"
          },
          "output":{
            "updatedRecords":"somethingToFetchBucket"
          }
        },
        {
          "worker":"whatsoeverFetcher",
          "input":{
            "input":"somethingToFetchBucket"
          },
          "output":{
            "output":"somethingToPushBucket"
          }
        },
        {
          "worker":"updatePusher",
          "input":{
            "recordsToPush":"somethingToPushBucket"
          }
        }
      ]
    }
  • For your convenience you can also create a predfined job in the jobs.json, like the following snippet (you should notice that the seed parameter is fixed if you choose to use a predefined job)
    • note: here also the dummy input slot from above needs to be provided a dummy bucket.
    {
      "name": "crawlWhatsoever",
      "workflow": "whatsoeverCrawling",
      "parameters": {
        "tempStore": "temp",
        "dataSource": "whatsoever",
        "seed": "your seed data",
        "jobToPushTo": "indexUpdate"
      }

Mapping to Solr

You may have to map your record's attributes to Solr. See AdaptFileCrawlerWorkerOutput.bpel as well as AddPipelinde.bpel of SMILA.application/configuration/org.eclipse.smila.processing.bpel/pipelines/.

If you have used attributes that do not match your Solr attributes, you have to adapt them in order to index them. You do not have to do so, if you used attributes that match your Solr schema.

In SMILA.application you can find existing mappings for file and web crawling in the configuration folder configuration/org.eclipse.smila.processing.bpel/pipelines/.

Just add your own mapping pipeline (e.g. by copying, renaming and adapting AdaptFileCrawlerWorkerOutput.bpel or by building it from scratch (the key component to use within is the org.eclipse.smila.processing.pipelets.CopyPipelet).

Then extend the </tt>AddPipeline</tt> by inserting your new mapping pipeline.

Don't forget to add your adaption pipeline to the deployment descriptor deploy.xml (as a process and as a partner link in the AddPipeline as well!).

Now the attributes of the records your fetcher fetched will be mapped according to your Solr configuration.

And ....Action!

So now it's time to check if everything went right.

  • Start SMILA
  • check if you can access your worker definitions, workflow and job via the REST API. If not, check for errors (syntax errors in the json files, others in SMILA log).
  • check in SMILA's log if your workers were added
  • check if your adaption pipeline is visible in the pipeline list (http://localhost:8080/smila/pipeline/)
  • start the indexing job: POST http://localhost:8080/smila/jobmanager/jobs/indexUpdate/
  • start your crawling job (remember: it has to be started as a RunOnce Job!)
POST http://localhost:8080/smila/jobmanager/jobs/crawlWhatsoever/
{
  "mode": "runOnce"
}
  • Check your jobs, after your crawl job succeeded, you can finish your input job. After the input job succeeded (if you finished it), you should wait some seconds (up to 60, because it takes some time for the autocommit), before checking, if your data was indexed (see http://localhost:8080/SMILA/search).

So now you should be able to search in your content.

If you can find your records, you have just successfully added a new datasource to your SMILA application. Congratulations!

Back to the top