Jump to: navigation, search

Difference between revisions of "SMILA/Discussion/MapReduce"

m (Worker)
(Questions)
 
(15 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 
== Discussion - SMILA and MapReduce ==
 
== Discussion - SMILA and MapReduce ==
  
Thinking about Big Data, a lot of people have '''MapReduce''' in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.  
+
Thinking about Big Data, a lot of people have '''MapReduce''' in mind. SMILA offers great flexibility in the job management / asynchronous workflows, so let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.  
  
 
=== MapReduce ===
 
=== MapReduce ===
Line 7: Line 7:
 
(in very simplified form ;)
 
(in very simplified form ;)
  
First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.
+
First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.
  
 
* Map: record -> (K,V)*
 
* Map: record -> (K,V)*
Line 23: Line 23:
 
   /** the map function which has to be implemented. */
 
   /** the map function which has to be implemented. */
 
   protected abstract void map(record)
 
   protected abstract void map(record)
   /** the resulting key/values pairs are delivered to the superclass. */
+
   /** the resulting key/value pairs are delivered to the superclass. */
   protected void emit(key, values)
+
   protected void emit(key, value)
 +
  /** this will be called after the input has been processed. */
 +
  protected abstract void finish()  
 
</code>  
 
</code>  
  
To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records like that:
+
To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:
  
 
<pre>
 
<pre>
 
{"_recordid": "...",  
 
{"_recordid": "...",  
 
  "_mapReduce": {
 
  "_mapReduce": {
     "key1": <values>,  
+
     "key1": [<values>],  
     "key2": <values>,  
+
     "key2": [<values>],  
 
     ...
 
     ...
 
   }  
 
   }  
Line 41: Line 43:
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
* The MapWorker implementations should use the so called "in-mapper combining" pattern which means that it already cumulates the results for the same keys. So, no key should be emitted twice. We can not cumulate the values for the keys in the abstract MapWorker, cause we don't know anything about the Map function that is implemented, e.g. if we should sum or concat the values or whatever.
+
* The MapWorker implementation should, if possible,  use the so called ''in-mapper combining pattern'' which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key. The emitting of the values then could be done in finish(). This would reduce the number of output values.
 +
 
 +
'''Alternative output format:''' The Map worker produces a record bulk with one record per key:
 +
<pre>
 +
{
 +
  "_key": "<key1>",
 +
  "_values": [<values>],
 +
  ...
 +
}
 +
{
 +
  "_key": "<key2>",
 +
  "_values": [<values>],
 +
  ...
 +
}
 +
...
 +
</pre>
 +
This is easier to read via streaming in the Reduce worker. Further, it allows that the keys have arbitrary data types (even complex types). The map worker must still ensure that the records are sorted by key in the output bulk.
  
 
'''ReduceWorker:'''
 
'''ReduceWorker:'''
  
 
<code>
 
<code>
   /** the main reduce function which has to be implemented. */
+
   /** the reduce function which has to be implemented. */
 
   protected abstract void reduce(key, values)
 
   protected abstract void reduce(key, values)
 
   /** the result for each key is delivered to the superclass. */
 
   /** the result for each key is delivered to the superclass. */
Line 52: Line 70:
 
</code>  
 
</code>  
  
The (abstract) ReduceWorker could store the result in one (or more?) record(s), also below the "_mapReduce" parameter(?)
+
The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter.
  
 
=== Workflow ===  
 
=== Workflow ===  
  
The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.  
+
The workflow consists of (at least) a MapWorker and a ReduceWorker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.  
  
We can do a little trick here: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce ''one'' task for the Reduce phase.
+
We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETING phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce ''one'' task for the Reduce phase.
  
Why only ''one'' task for the ReduceWorker? Well, we have to provide all results for the same key as Reduce input. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.
+
Why only ''one'' task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.
 +
 
 +
The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)
 +
 
 +
'''MergeWorker'''
 +
 
 +
A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort as already described before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worse performance due to the additional workflow step with reading and writing another intermediate result.
 +
 
 +
=== Questions ===
 +
 
 +
* Is there a better idea beside using this "trick" with the COMPLETING step for starting the Reduce phase?
 +
 
 +
** Yes, we introduced the [[SMILA/Documentation/WorkerAndWorkflows#Worker_properties_in_detail|"barrier" worker mode]] to support this. Now we just need a special task generator for the ''reduce'' workers.
 +
 
 +
* Is it really possible for the ReduceWorker to process the intermediate record bulks as stream so that it doesn't have to hold the whole intermediate results in memory? Or do we need a special kind/format of intermediate result to allow such a streamed processing?
 +
 
 +
* Does it make sense to have a Combine phase, resp. a CombineWorker?
 +
 
 +
* Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?
 +
 
 +
== Examples ==
 +
 
 +
==== word count ====
 +
 
 +
Workflow: Crawler -> Fetcher -> WordCountMapWorker -> WordCountReduceWorker
 +
 
 +
Assuming the crawling produces three records in two record bulks as input for the MapWorker:
 +
 
 +
<pre>
 +
{"_recordid": "r1", "content": "aaa ccc ccc" }
 +
{"_recordid": "r2", "content": "ccc bbb" }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "r3", "content": "bbb ccc" }
 +
</pre>
 +
 
 +
This will produce two tasks for the WordCountMapWorker which produces two intermediate record bulks:
 +
<pre>
 +
{"_recordid": "intermediate1",
 +
"_mapReduce": {
 +
    "aaa": [1],
 +
    "bbb": [1],
 +
    "ccc": [2,1]    // could also be [3] when using in-mapper combining
 +
  }
 +
</pre>
 +
<pre>
 +
{"_recordid": "intermediate2",
 +
"_mapReduce": {
 +
    "bbb": [1],
 +
    "ccc": [1]
 +
  }
 +
</pre>
 +
 
 +
Or, using the alternative bulk format:
 +
* Intermediate bulk 1:
 +
<pre>
 +
{ "_key": "aaa","_values": [1] }
 +
{ "_key": "bbb","_values": [1] }
 +
{ "_key": "ccc","_values": [2, 1] }
 +
</pre>
 +
* Intermediate bulk 2:
 +
<pre>
 +
{ "_key": "bbb","_values": [1] }
 +
{ "_key": "ccc","_values": [1] }
 +
</pre>
 +
 
 +
The WordCountReduceWorker reads those two intermediate results and produces the final result:
 +
 
 +
<pre>
 +
{"_recordid": "result",
 +
"_mapReduce": {
 +
    "aaa": [1],
 +
    "bbb": [2],
 +
    "ccc": [4]
 +
  }
 +
</pre>
 +
 
 +
==== average value calculation ====
 +
 
 +
e.g. crawl some files and calculate the average file size for each file extension
 +
 
 +
Workflow: Crawler -> AvgFileSizeMapWorker -> AvgFileSizeReduceWorker
 +
 
 +
Assuming the crawling produces three records in two record bulks as input for the MapWorker:
 +
 
 +
<pre>
 +
{"_recordid": "r1", "FileSize": "100", "FileExtension":"html" }
 +
{"_recordid": "r2", "FileSize": "100", "FileExtension":"txt" }
 +
{"_recordid": "r2", "FileSize": "400", "FileExtension":"html" }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "r3", "FileSize": "300", "FileExtension":"txt" }
 +
{"_recordid": "r4", "FileSize": "400", "FileExtension":"html"}
 +
</pre>
 +
 
 +
This will produce two tasks for the AvgFileSizeMapWorker which produces two intermediate record bulks:
 +
 
 +
<pre>
 +
{"_recordid": "intermediate1",
 +
"_mapReduce": {
 +
    "html": [100, 400],
 +
    "txt": [100],
 +
  }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "intermediate2",
 +
"_mapReduce": {
 +
    "html": [400],
 +
    "txt": [300]
 +
  }
 +
</pre>
 +
 
 +
Or, again with alternative format:
 +
<pre>
 +
{ "_key": "html", "_values": [100, 400] }
 +
{ "_key": "txt", "_values": [100] }
 +
</pre>
 +
<pre>
 +
{ "_key": "html", "_values": [400] }
 +
{ "_key": "txt", "_values": [300] }
 +
</pre>
 +
 
 +
The AvgFileSizeReduceWorker reads those two intermediate results and produces the final result:
 +
 
 +
<pre>
 +
{"_recordid": "result",
 +
"_mapReduce": {
 +
    "html": [300],
 +
    "txt": [200]
 +
}
 +
</pre>
  
The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need lots of memory although with the merge sort approach we will not have all results in memory at once.
+
==== inverted indexing of document terms ====
  
We could also use a kind of MergeWorker in between to process the Map results and create output objects that contain all merged values for each key. Then the following ReduceWorker could get a task for each of these objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.
+
tbd (key: term, value: document which contains the term)

Latest revision as of 07:35, 15 February 2013

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. SMILA offers great flexibility in the job management / asynchronous workflows, so let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.

MapReduce

(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.

  • Map: record -> (K,V)*
  • Reduce: K,V* -> result

To integrate this easily in SMILA's data model, intermediate and output results should also be records resp. record bulks.

Worker

As a basic MapReduce support in SMILA we could provide an abstract MapWorker and an abstract ReduceWorker. To implement a dedicated MapReduce solution, subclasses of Map- and ReduceWorker have to be implemented. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could have a kind of script parameter to provide the map and reduce function.)

MapWorker:

 /** the map function which has to be implemented. */
 protected abstract void map(record)
 /** the resulting key/value pairs are delivered to the superclass. */
 protected void emit(key, value)
 /** this will be called after the input has been processed. */
 protected abstract void finish() 

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": [<values>], 
    "key2": [<values>], 
    ...
  } 
...
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
  • The MapWorker implementation should, if possible, use the so called in-mapper combining pattern which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key. The emitting of the values then could be done in finish(). This would reduce the number of output values.

Alternative output format: The Map worker produces a record bulk with one record per key:

{
  "_key": "<key1>",
  "_values": [<values>],
  ...
}
{
  "_key": "<key2>",
  "_values": [<values>],
  ...
}
...

This is easier to read via streaming in the Reduce worker. Further, it allows that the keys have arbitrary data types (even complex types). The map worker must still ensure that the records are sorted by key in the output bulk.

ReduceWorker:

 /** the reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter.

Workflow

The workflow consists of (at least) a MapWorker and a ReduceWorker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETING phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)

MergeWorker

A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort as already described before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worse performance due to the additional workflow step with reading and writing another intermediate result.

Questions

  • Is there a better idea beside using this "trick" with the COMPLETING step for starting the Reduce phase?
    • Yes, we introduced the "barrier" worker mode to support this. Now we just need a special task generator for the reduce workers.
  • Is it really possible for the ReduceWorker to process the intermediate record bulks as stream so that it doesn't have to hold the whole intermediate results in memory? Or do we need a special kind/format of intermediate result to allow such a streamed processing?
  • Does it make sense to have a Combine phase, resp. a CombineWorker?
  • Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?

Examples

word count

Workflow: Crawler -> Fetcher -> WordCountMapWorker -> WordCountReduceWorker

Assuming the crawling produces three records in two record bulks as input for the MapWorker:

{"_recordid": "r1", "content": "aaa ccc ccc" }
{"_recordid": "r2", "content": "ccc bbb" }
{"_recordid": "r3", "content": "bbb ccc" }

This will produce two tasks for the WordCountMapWorker which produces two intermediate record bulks:

{"_recordid": "intermediate1", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [1], 
    "ccc": [2,1]     // could also be [3] when using in-mapper combining
  } 
{"_recordid": "intermediate2", 
 "_mapReduce": { 
    "bbb": [1], 
    "ccc": [1]
  } 

Or, using the alternative bulk format:

  • Intermediate bulk 1:
{ "_key": "aaa","_values": [1] }
{ "_key": "bbb","_values": [1] }
{ "_key": "ccc","_values": [2, 1] }
  • Intermediate bulk 2:
{ "_key": "bbb","_values": [1] }
{ "_key": "ccc","_values": [1] }

The WordCountReduceWorker reads those two intermediate results and produces the final result:

{"_recordid": "result", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [2], 
    "ccc": [4]
  } 

average value calculation

e.g. crawl some files and calculate the average file size for each file extension

Workflow: Crawler -> AvgFileSizeMapWorker -> AvgFileSizeReduceWorker

Assuming the crawling produces three records in two record bulks as input for the MapWorker:

{"_recordid": "r1", "FileSize": "100", "FileExtension":"html" }
{"_recordid": "r2", "FileSize": "100", "FileExtension":"txt" }
{"_recordid": "r2", "FileSize": "400", "FileExtension":"html" }
{"_recordid": "r3", "FileSize": "300", "FileExtension":"txt" }
{"_recordid": "r4", "FileSize": "400", "FileExtension":"html"}

This will produce two tasks for the AvgFileSizeMapWorker which produces two intermediate record bulks:

{"_recordid": "intermediate1", 
 "_mapReduce": {
    "html": [100, 400], 
    "txt": [100], 
  } 
{"_recordid": "intermediate2", 
 "_mapReduce": { 
    "html": [400], 
    "txt": [300]
  } 

Or, again with alternative format:

{ "_key": "html", "_values": [100, 400] }
{ "_key": "txt", "_values": [100] }
{ "_key": "html", "_values": [400] }
{ "_key": "txt", "_values": [300] }

The AvgFileSizeReduceWorker reads those two intermediate results and produces the final result:

{"_recordid": "result", 
 "_mapReduce": {
    "html": [300], 
    "txt": [200]
 } 

inverted indexing of document terms

tbd (key: term, value: document which contains the term)