Jump to: navigation, search

Difference between revisions of "SMILA/Documentation/Importing/Crawler/JDBC"

(Processing)
(Splitting)
(21 intermediate revisions by 2 users not shown)
Line 17: Line 17:
 
*** <tt>password</tt> ''(opt.)'' user password
 
*** <tt>password</tt> ''(opt.)'' user password
 
*** <tt>...</tt> ''(opt.)'' any property supported by the used JDBC driver
 
*** <tt>...</tt> ''(opt.)'' any property supported by the used JDBC driver
** <tt>crawlSql</tt>: ''(req.)'' the SQL statement to execute
+
** <tt>crawlSql</tt>: ''(req.)'' the SQL statement to execute to get the records
 +
** <tt>splitLimitsSql</tt>: ''(opt.)'' the SQL statement to determine limits for splitting the result set into smaller parts based on an integer column. See [[SMILA/Documentation/Importing/Crawler/JDBC#Splitting|Splitting]] for details.
 +
** <tt>splitIncrement</tt>: ''(opt., >0)'' the increment to use for splitting the integer range determined by <tt>splitLimitsSql</tt> See [[SMILA/Documentation/Importing/Crawler/JDBC#Splitting|Splitting]] for details.
 
** <tt>mapping</tt> ''(req.)'' specifies how to map database column names to record attributes or attachments.
 
** <tt>mapping</tt> ''(req.)'' specifies how to map database column names to record attributes or attachments.
 
*** <tt>COLUMN-1</tt> ''(opt.)'' mapping of the first column to an attribute/attachment
 
*** <tt>COLUMN-1</tt> ''(opt.)'' mapping of the first column to an attribute/attachment
Line 24: Line 26:
 
** <tt>idColumns</tt> ''(req.)'' a list of database column names used to generate the record ID from
 
** <tt>idColumns</tt> ''(req.)'' a list of database column names used to generate the record ID from
 
** <tt>deltaColumns</tt> ''(opt.)'' a list of database column names (upper-case) used to generate the value for attribute _deltaHash
 
** <tt>deltaColumns</tt> ''(opt.)'' a list of database column names (upper-case) used to generate the value for attribute _deltaHash
 +
** <tt>maxAttachmentSize</tt> ''(opt.)'' maximum accepted size of BLOB/CLOB column values (in bytes/characters) for attachment creation. Default is "1000000000" (1 billion). Larger values are skipped and a warning is written to the log.
 
** parameters to control size of output bulks, see below for details
 
** parameters to control size of output bulks, see below for details
*** <tt>maxRecordsPerBulk</tt> ''(opt.)'' maximum number of records in one bulk. (default: 1000)
+
*** <tt>maxRecordsPerBulk</tt> ''(opt., >0)'' maximum number of records in one bulk. (default: 1000)
 
* Task generator: <tt>[[SMILA/Documentation/TaskGenerators#RunOnceTriggerTaskGenerator|runOnceTrigger]]</tt>
 
* Task generator: <tt>[[SMILA/Documentation/TaskGenerators#RunOnceTriggerTaskGenerator|runOnceTrigger]]</tt>
 +
* Input slots:
 +
** <tt>splitsToCrawl</tt>: Descriptions of split-crawl tasks, see [[SMILA/Documentation/Importing/Crawler/JDBC#Splitting|Splitting]] for details
 
* Output slots:
 
* Output slots:
** <tt>crawledRecords</tt>
+
** <tt>crawledRecords</tt>: Bulks with crawled records to be processed.
 +
** <tt>splitsToCrawl</tt>: Descriptions for split-crawl tasks, see [[SMILA/Documentation/Importing/Crawler/JDBC#Splitting|Splitting]] for details
  
 
===== Processing =====
 
===== Processing =====
Line 67: Line 73:
 
|-
 
|-
 
| CHAR || String || attribute
 
| CHAR || String || attribute
|-
 
| LONGNVARCHAR || String || attribute
 
|-
 
| LONGVARCHAR || String || attribute
 
 
|-
 
|-
 
| VARCHAR || String || attribute
 
| VARCHAR || String || attribute
Line 91: Line 93:
 
|-
 
|-
 
| NCLOB || byte[] || attachment
 
| NCLOB || byte[] || attachment
 +
|-
 +
| LONGNVARCHAR || byte[] (UTF-8 encoded) || attachment (because CLOB-like types are often reported by JDBC drivers as this type)
 +
|-
 +
| LONGVARCHAR || byte[] (UTF-8 encoded) || attachment (because CLOB-like types are often reported by JDBC drivers as this type)
 
|-
 
|-
 
| NULL || - || no entry is generated
 
| NULL || - || no entry is generated
Line 116: Line 122:
 
The attribute <tt>_source</tt> is set from the task parameter <tt>dataSource</tt> which has no further meaning currently, but it is needed by the delta service.
 
The attribute <tt>_source</tt> is set from the task parameter <tt>dataSource</tt> which has no further meaning currently, but it is needed by the delta service.
  
 +
===== Splitting =====
 +
 +
Reading data from very large tables in a single task can be problematic for two reasons:
 +
* Performance: Reading a large table sequentially can take very long, obviously. You will want to parallelize the process.
 +
* Memory: When accessing large result sets, the JDBC driver can cause excessive memory usage. It can be necessary to read the table in smaller portions.
 +
The ''Splitting'' feature of the JDBC crawler worker can be used to do this. Basically, it works like this:
 +
* In the initial crawl task the crawler determines only the size of the table instead of reading the rows. Therefore you need to provide an SQL statement as parameter "splitLmitsSql" that yields a single row with two integer values "min" and "max".
 +
* Then the crawler creates a series of smaller intervals that cover the complete [min,max] range. The bounds for each of these intervals are written to an own record bulk in output slot "splitsToCrawl" then. The size of the intervals is determined by the "splitIncrement" parameter. No "crawledRecords" will be created by this initial task.
 +
* Each "splitsToCrawl" bulk is then processed in a separate follow-up task using the "crawlSql" statement. It must contain two "?" as parameter placeholders, which will be filled by the crawler with the "min" (first "?") and "max" (second "?") value from the input record. All rows produced by the resulting statement will be mapped to records and written to "crawledRecords" just like in normal one-step crawling.
 +
 +
Of course, you must take care that the "splitLimitSql" and "crawlSql" statements are consistent and that the "crawlSql" statement really reads each row exactly once if executed repeatedly with the split-interval bounds.
 +
 +
Both parameters "splitLimitsSql" and "splitIncrement" need to be set to enable splitting, and the crawler's "splitsToCrawl" output slot must be connected via a transient bucket to the input slot with the same name. Invalid parameter values (e.g. "splitIncrement"<=0) or workflow configurations will cause the crawl job to fail. If you don't need to do splitting, you can omit the loopback connection from the crawler workers "splitsToCrawl" output to input slot in your workflow.
 +
 +
'''Example'''
 +
 +
As a simple example we want to crawl a very large table that has an INTEGER key column named ID. So the "splitLimitsSql" get the minimum and maximum value from this ID column, and the "crawlSql" would restrict the result set for the task based on a given upper and lower bound for this column:
 +
 +
<pre>
 +
{
 +
  ...
 +
  "parameters": {
 +
    ...
 +
    "splitLimitsSql": "SELECT min(ID) as MIN, max(ID) as MAX FROM VERY_LARGE_TABLE",
 +
    "crawlSql": "SELECT * FROM VERY_LARGE_TABLE WHERE ID >= ? and ID <= ?",
 +
    "splitIncrement": 10000,
 +
    ...
 +
    "mapping": {
 +
      // as usual
 +
    }
 +
  }
 +
}
 +
</pre>
 +
 +
The "splitLimitsSql" must deliver the limits in a single row with column names MIN and MAX. Lets assume it yields ''MIN=1'' and ''MAX=1000000''. Then, using the "splitIncrement" value of 10,000, the first task creates 100 splitToCrawl records with the following bounds:
 +
* MIN=1, MAX=10,000
 +
* MIN=10,001, MAX=20,000
 +
* ...
 +
* MIN=990,001, MAX=1,000,000
 +
This means: for each split interval, MAX equals ''MIN+splitIncrement-1'', and the MIN of the next split interal is ''MAX+1'' of the previous interval. This is repeated until the MAX value of the record is equal to or greater than the MAX determined by the "splitLimitsSql" statement.
 +
 +
Each of these records is then used in a separate task to get a subset of the complete table and map it to "crawledRecords". This is done by using the MIN and MAX value if the input record as values for the first two parameters of a prepared statements created from "crawlSql", i.e. in place of the "?" characters of the "crawlSql" string. These tasks can be executed in parallel as allowed by the scaleUp parameter which should greatly improve the crawl performance.
 +
 +
In general, the column used for splitting does not need to be a (primary) key column, i.e. there can be multiple rows with the same value of the split column. For example, the column can have a very limited number of distinct values, e.g. some category key). Then you can use <tt>"splitIncrement":1</tt> to create a split for each distinct value, so that all rows of the same "category" are fetched in a separate split-crawl task.
 +
 +
Furthermore, it is of course not necessary that there is a row for each different split column value. However, this all means that the "splitIncrement" value does not necessarily have a relation to the number of rows fetched in this split crawl task.
 +
 +
Also, the split ranges do not have to be read from "real" columns. You can do whatever is possible in SQL to generate those numbers, as long as you can provide a "crawlSql" statement that fetches the rows you want to have as actual records.
 +
 +
The configuration of the jdbcFetcher is not affected by splitting.
  
 
=== JDBC Fetcher ===
 
=== JDBC Fetcher ===
Line 137: Line 193:
 
*** <tt>COLUMN-N</tt> ''(opt.)'' mapping of the last column to an attribute/attachment
 
*** <tt>COLUMN-N</tt> ''(opt.)'' mapping of the last column to an attribute/attachment
 
** <tt>idColumns</tt> ''(opt.)'' If <tt>fetchParameterAttributes</tt> isn't set, the record attributes which were mapped from the <tt>idColumns</tt> DB columns are used as parameter (substitutes for the '?') in the <tt>fetchSql</tt> statement. In this case the id columns must also be specified in the mapping.
 
** <tt>idColumns</tt> ''(opt.)'' If <tt>fetchParameterAttributes</tt> isn't set, the record attributes which were mapped from the <tt>idColumns</tt> DB columns are used as parameter (substitutes for the '?') in the <tt>fetchSql</tt> statement. In this case the id columns must also be specified in the mapping.
 +
** <tt>maxAttachmentSize</tt> ''(opt.)'' maximum accepted size of BLOB/CLOB column values (in bytes/characters) for attachment creation. Default is "1000000000" (1 billion). Larger values are skipped and a warning is written to the log.
 
* Input slots:
 
* Input slots:
 
** <tt>recordsToFetch</tt>
 
** <tt>recordsToFetch</tt>

Revision as of 09:31, 17 December 2012

JDBC Crawler and JDBC Fetcher worker are used for importing data via JDBC from a database. For a big picture and the worker's interaction have a look at the Importing Concept.

JDBC Crawler

The JDBC Crawler executes an SQL statement and crawles the result set, producing a record for each row of the result set.

Configuration

The JDBC Crawler worker is usually the first worker in a workflow and the job is started in runOnce mode.

  • Worker name: jdbcCrawler
  • Parameters:
    • dataSource: (req.) value for attribute _source, needed e.g. by the delta service
    • dbUrl: (req.) database URL to connect to
    • dbProps: (opt.) properties used when connecting to the database.
      • user (opt.) user name
      • password (opt.) user password
      • ... (opt.) any property supported by the used JDBC driver
    • crawlSql: (req.) the SQL statement to execute to get the records
    • splitLimitsSql: (opt.) the SQL statement to determine limits for splitting the result set into smaller parts based on an integer column. See Splitting for details.
    • splitIncrement: (opt., >0) the increment to use for splitting the integer range determined by splitLimitsSql See Splitting for details.
    • mapping (req.) specifies how to map database column names to record attributes or attachments.
      • COLUMN-1 (opt.) mapping of the first column to an attribute/attachment
      • COLUMN-2 (opt.) mapping of the second column to an attribute/attachment
      • COLUMN-N (opt.) mapping of the last column to an attribute/attachment
    • idColumns (req.) a list of database column names used to generate the record ID from
    • deltaColumns (opt.) a list of database column names (upper-case) used to generate the value for attribute _deltaHash
    • maxAttachmentSize (opt.) maximum accepted size of BLOB/CLOB column values (in bytes/characters) for attachment creation. Default is "1000000000" (1 billion). Larger values are skipped and a warning is written to the log.
    • parameters to control size of output bulks, see below for details
      • maxRecordsPerBulk (opt., >0) maximum number of records in one bulk. (default: 1000)
  • Task generator: runOnceTrigger
  • Input slots:
    • splitsToCrawl: Descriptions of split-crawl tasks, see Splitting for details
  • Output slots:
    • crawledRecords: Bulks with crawled records to be processed.
    • splitsToCrawl: Descriptions for split-crawl tasks, see Splitting for details
Processing

The JDBC Crawler executes the crawlSql statement and produces one record per result row in the bucket connected to crawledRecords. Please note that internally database column names are normalized to upper-case. In the configuration however any casing can be used. The resulting records contain only the values of the columns configured in the mapping. Whether a column is represented as an attribute or as an attachment depends on the type of the database column. Below is a table that summarizes the supported database types, how they are mapped to Java types and whether they are represented as attributes or attachments:

Database Type Java Type represented as
BIT Boolean attribute
BOOLEAN Boolean attribute
BIGINT Long attribute
INTEGER Long attribute
SMALLINT Long attribute
TINYINT Long attribute
DOUBLE Double attribute
FLOAT Double attribute
REAL Double attribute
DECIMAL Double(scale>0) or Long attribute
NUMERIC Double(scale>0) or Long attribute
DATE Date attribute
TIME DateTime attribute
TIMESTAMP DateTime attribute
CHAR String attribute
VARCHAR String attribute
NCHAR String attribute
NVARCHAR String attribute
ROWID String attribute
BINARY byte[] attachment
VARBINARY byte[] attachment
LONGVARBINARY byte[] attachment
BLOB byte[] attachment
CLOB byte[] attachment
NCLOB byte[] attachment
LONGNVARCHAR byte[] (UTF-8 encoded) attachment (because CLOB-like types are often reported by JDBC drivers as this type)
LONGVARCHAR byte[] (UTF-8 encoded) attachment (because CLOB-like types are often reported by JDBC drivers as this type)
NULL - no entry is generated

The following types are not fully supported:

  • ARRAY
  • DATALINK
  • DISTINCT
  • JAVA_OBJECT
  • OTHER
  • REF
  • SQLXML
  • STRUCT

The crawler tries to automatically convert any values into attributes of an appropriate data type. If this is not possible an attachment with the bytes is generated.

The records are collected in bulks, whose size can be configured via the parameter maxRecordsPerBulk:

  • maxRecordsPerBulk has the same effect in any of the following cases:
    • not configured: a new crawledRecords bulk is started after 1000 records.
    • configured: a new crawledRecords bulk is started when the configured value is reached.

Please note that maxRecordsPerBulk must be > 0. Otherwise your job will fail.

Source: The attribute _source is set from the task parameter dataSource which has no further meaning currently, but it is needed by the delta service.

Splitting

Reading data from very large tables in a single task can be problematic for two reasons:

  • Performance: Reading a large table sequentially can take very long, obviously. You will want to parallelize the process.
  • Memory: When accessing large result sets, the JDBC driver can cause excessive memory usage. It can be necessary to read the table in smaller portions.

The Splitting feature of the JDBC crawler worker can be used to do this. Basically, it works like this:

  • In the initial crawl task the crawler determines only the size of the table instead of reading the rows. Therefore you need to provide an SQL statement as parameter "splitLmitsSql" that yields a single row with two integer values "min" and "max".
  • Then the crawler creates a series of smaller intervals that cover the complete [min,max] range. The bounds for each of these intervals are written to an own record bulk in output slot "splitsToCrawl" then. The size of the intervals is determined by the "splitIncrement" parameter. No "crawledRecords" will be created by this initial task.
  • Each "splitsToCrawl" bulk is then processed in a separate follow-up task using the "crawlSql" statement. It must contain two "?" as parameter placeholders, which will be filled by the crawler with the "min" (first "?") and "max" (second "?") value from the input record. All rows produced by the resulting statement will be mapped to records and written to "crawledRecords" just like in normal one-step crawling.

Of course, you must take care that the "splitLimitSql" and "crawlSql" statements are consistent and that the "crawlSql" statement really reads each row exactly once if executed repeatedly with the split-interval bounds.

Both parameters "splitLimitsSql" and "splitIncrement" need to be set to enable splitting, and the crawler's "splitsToCrawl" output slot must be connected via a transient bucket to the input slot with the same name. Invalid parameter values (e.g. "splitIncrement"<=0) or workflow configurations will cause the crawl job to fail. If you don't need to do splitting, you can omit the loopback connection from the crawler workers "splitsToCrawl" output to input slot in your workflow.

Example

As a simple example we want to crawl a very large table that has an INTEGER key column named ID. So the "splitLimitsSql" get the minimum and maximum value from this ID column, and the "crawlSql" would restrict the result set for the task based on a given upper and lower bound for this column:

{
  ...
  "parameters": {
    ...
    "splitLimitsSql": "SELECT min(ID) as MIN, max(ID) as MAX FROM VERY_LARGE_TABLE",
    "crawlSql": "SELECT * FROM VERY_LARGE_TABLE WHERE ID >= ? and ID <= ?",
    "splitIncrement": 10000,
    ...
    "mapping": {
      // as usual
    }
  }
}

The "splitLimitsSql" must deliver the limits in a single row with column names MIN and MAX. Lets assume it yields MIN=1 and MAX=1000000. Then, using the "splitIncrement" value of 10,000, the first task creates 100 splitToCrawl records with the following bounds:

  • MIN=1, MAX=10,000
  • MIN=10,001, MAX=20,000
  • ...
  • MIN=990,001, MAX=1,000,000

This means: for each split interval, MAX equals MIN+splitIncrement-1, and the MIN of the next split interal is MAX+1 of the previous interval. This is repeated until the MAX value of the record is equal to or greater than the MAX determined by the "splitLimitsSql" statement.

Each of these records is then used in a separate task to get a subset of the complete table and map it to "crawledRecords". This is done by using the MIN and MAX value if the input record as values for the first two parameters of a prepared statements created from "crawlSql", i.e. in place of the "?" characters of the "crawlSql" string. These tasks can be executed in parallel as allowed by the scaleUp parameter which should greatly improve the crawl performance.

In general, the column used for splitting does not need to be a (primary) key column, i.e. there can be multiple rows with the same value of the split column. For example, the column can have a very limited number of distinct values, e.g. some category key). Then you can use "splitIncrement":1 to create a split for each distinct value, so that all rows of the same "category" are fetched in a separate split-crawl task.

Furthermore, it is of course not necessary that there is a row for each different split column value. However, this all means that the "splitIncrement" value does not necessarily have a relation to the number of rows fetched in this split crawl task.

Also, the split ranges do not have to be read from "real" columns. You can do whatever is possible in SQL to generate those numbers, as long as you can provide a "crawlSql" statement that fetches the rows you want to have as actual records.

The configuration of the jdbcFetcher is not affected by splitting.

JDBC Fetcher

For each input record, it executes the fetchSql statement and adds the result rows to the record.

Configuration
  • Worker name: jdbcFetcher
  • Parameters:
    • dbUrl: (req.) database URL to connect to
    • dbProps: (opt.) propereties used when connecting to the database.
      • user (opt.) user name
      • password (opt.) user password
      • ... (opt.) any property supported by the used JDBC driver
    • fetchSql: (opt.) the SQL statement executed to fetch the data which is used to enrich the crawled input record. It may contain one or more '?' as parameter placeholders, see PreparedStatement. If the fetchSql parameter isn't set, the originally crawled record is written unchanged to the output.
    • fetchParameterAttributes: (opt.) a list of record attribute names who's values are used in the given order as parameters (substitutes for the '?') in the fetchSql statement. If fetchParameterAttributes isn't set, the record attributes which were mapped from the idColumns are used as fetchSql statement parameter substitutes.
    • mapping (req.) specifies how to map database column names to record attributes or attachments. Please note that database column names are normalized to upper-case.
      • COLUMN-1 (opt.) mapping of the first column to an attribute/attachment
      • COLUMN-2 (opt.) mapping of the second column to an attribute/attachment
      • COLUMN-N (opt.) mapping of the last column to an attribute/attachment
    • idColumns (opt.) If fetchParameterAttributes isn't set, the record attributes which were mapped from the idColumns DB columns are used as parameter (substitutes for the '?') in the fetchSql statement. In this case the id columns must also be specified in the mapping.
    • maxAttachmentSize (opt.) maximum accepted size of BLOB/CLOB column values (in bytes/characters) for attachment creation. Default is "1000000000" (1 billion). Larger values are skipped and a warning is written to the log.
  • Input slots:
    • recordsToFetch
  • Output slots:
    • fetchedRecords
Processing

The JDBC Fetcher is used to enrich the input records with the data selected by the fetchSql statement. The fetchSql is executed as PreparedStatement. So it can have parameters ('?') which will be replaced by either the values of the record attributes specified by the fetchParameterAttributes or (per default) by the values of the record attributes which were mapped from the idColumns.

All columns that are selected by the fetchSql query and are mapped in the mapping section will enrich the crawled record. That means, the mapped attributes are added to the record's metadata, binary data (BLOB, CLOB) will be added as attachments. Have a look at the table above for a description of the mapping from database types to Java types.

In general, the fetchSql query will return exactly one row. If it returns no row at all (resp. no result), the originally crawled record is written unchanged to the output. If it returns more than one row, the values of the rows are merged (e.g. as lists) before being added to the input record.

If one of the attributes used as fetch parameters is not set or is does not have a simple values, but a sequence or a map, it is skipped and no additional data will be fetched for this record. It will just be written unchanged to the output.

Sample JDBC crawl job

This example uses the following fictitious tables of database EmailDB:

Emails
Sender Receiver Subject SendDate BodyId
EmailBodies
BodyId Body


  {
   "name":"crawlJdbcJob",
   "workflow":"jdbcCrawling",
   "parameters":{
     "dataSource":"emails",
     "dbUrl":"jdbc:derby:memory:EmailDB",
     "dbProps":
     {
        "user": "admin",
        "password": "topsecret"
     },
     "crawlSql":"SELECT * FROM Emails",
     "fetchSql":"SELECT Body FROM EmailBodies WHERE BodyId=?",
     "fetchParameterAttributes": "bodyReference",
     "idColumns": ["Sender", "Receiver"],
     "deltaColumns": "SendDate",
     "mapping":{
       "Sender":"From",
       "Receiver":"To",       
       "Subject":"Title",       
       "SendDate":"lastModified",   
       "BodyId":"bodyReference"
     },
     "jobToPushTo":"indexUpdateJob",
     "tempStore": "temp"
   }
 }

Adding JDBC Drivers

By default SMILA includes only JDBC drivers for Derby. If you want to access other databases then you have to provide according JDBC drivers. To add a JDBC driver you have to do the following steps:

  • Copy your JDBC driver jars into folder SMILA/plugins/org.eclipse.smila.jdbc/lib
  • Edit SMILA/plugins/org.eclipse.smila.jdbc/META-INF/MANIFEST.MF and add the jars to the bundle classpath, e.g.
Bundle-ClassPath: .,
 lib/postgresql-9.1-902.jdbc4.jar,
 lib/mysql-connector-java-5.1.20-bin.jar
  • Do not forget the whitespace character in front of the path.
  • JDBC-4-compliant drivers should be found automatically after a restart. For other drivers you must add the driver class name to the SMILA-JDBC header in META-INF/MANIFEST.MF, e.g.
SMILA-JDBC: org.apache.derby.jdbc.EmbeddedDriver,
 org.apache.derby.jdbc.ClientDriver