Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Difference between revisions of "SMILA/Discussion/MapReduce"

m
(Questions)
 
(11 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 
== Discussion - SMILA and MapReduce ==
 
== Discussion - SMILA and MapReduce ==
  
Thinking about Big Data, a lot of people have '''MapReduce''' in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.  
+
Thinking about Big Data, a lot of people have '''MapReduce''' in mind. SMILA offers great flexibility in the job management / asynchronous workflows, so let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.  
  
 
=== MapReduce ===
 
=== MapReduce ===
Line 7: Line 7:
 
(in very simplified form ;)
 
(in very simplified form ;)
  
First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.
+
First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.
  
 
* Map: record -> (K,V)*
 
* Map: record -> (K,V)*
Line 24: Line 24:
 
   protected abstract void map(record)
 
   protected abstract void map(record)
 
   /** the resulting key/value pairs are delivered to the superclass. */
 
   /** the resulting key/value pairs are delivered to the superclass. */
   protected void emit(key, value)  
+
   protected void emit(key, value)
 +
  /** this will be called after the input has been processed. */
 +
  protected abstract void finish()  
 
</code>  
 
</code>  
  
Line 41: Line 43:
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
* The abstract MapWorker will use the so called ''in-mapper combining pattern'' which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key.
+
* The MapWorker implementation should, if possible,  use the so called ''in-mapper combining pattern'' which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key. The emitting of the values then could be done in finish(). This would reduce the number of output values.
 +
 
 +
'''Alternative output format:''' The Map worker produces a record bulk with one record per key:
 +
<pre>
 +
{
 +
  "_key": "<key1>",
 +
  "_values": [<values>],
 +
  ...
 +
}
 +
{
 +
  "_key": "<key2>",
 +
  "_values": [<values>],
 +
  ...
 +
}
 +
...
 +
</pre>
 +
This is easier to read via streaming in the Reduce worker. Further, it allows that the keys have arbitrary data types (even complex types). The map worker must still ensure that the records are sorted by key in the output bulk.
  
 
'''ReduceWorker:'''
 
'''ReduceWorker:'''
Line 52: Line 70:
 
</code>  
 
</code>  
  
The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter(?)
+
The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter.
  
 
=== Workflow ===  
 
=== Workflow ===  
  
The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.  
+
The workflow consists of (at least) a MapWorker and a ReduceWorker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.  
  
We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce ''one'' task for the Reduce phase.
+
We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETING phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce ''one'' task for the Reduce phase.
  
 
Why only ''one'' task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.
 
Why only ''one'' task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.
  
The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many  
+
The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)
intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)
+
  
 
'''MergeWorker'''
 
'''MergeWorker'''
  
A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.
+
A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort as already described before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worse performance due to the additional workflow step with reading and writing another intermediate result.
  
=== Open issues ===
+
=== Questions ===
 +
 
 +
* Is there a better idea beside using this "trick" with the COMPLETING step for starting the Reduce phase?
 +
 
 +
** Yes, we introduced the [[SMILA/Documentation/WorkerAndWorkflows#Worker_properties_in_detail|"barrier" worker mode]] to support this. Now we just need a special task generator for the ''reduce'' workers.
 +
 
 +
* Is it really possible for the ReduceWorker to process the intermediate record bulks as stream so that it doesn't have to hold the whole intermediate results in memory? Or do we need a special kind/format of intermediate result to allow such a streamed processing?
  
 
* Does it make sense to have a Combine phase, resp. a CombineWorker?  
 
* Does it make sense to have a Combine phase, resp. a CombineWorker?  
  
 
* Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?
 
* Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?
 
  
 
== Examples ==
 
== Examples ==
Line 80: Line 102:
 
==== word count ====
 
==== word count ====
  
tbd
+
Workflow: Crawler -> Fetcher -> WordCountMapWorker -> WordCountReduceWorker
 +
 
 +
Assuming the crawling produces three records in two record bulks as input for the MapWorker:
 +
 
 +
<pre>
 +
{"_recordid": "r1", "content": "aaa ccc ccc" }
 +
{"_recordid": "r2", "content": "ccc bbb" }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "r3", "content": "bbb ccc" }
 +
</pre>
 +
 
 +
This will produce two tasks for the WordCountMapWorker which produces two intermediate record bulks:
 +
<pre>
 +
{"_recordid": "intermediate1",
 +
"_mapReduce": {
 +
    "aaa": [1],
 +
    "bbb": [1],
 +
    "ccc": [2,1]    // could also be [3] when using in-mapper combining
 +
  }
 +
</pre>
 +
<pre>
 +
{"_recordid": "intermediate2",
 +
"_mapReduce": {
 +
    "bbb": [1],
 +
    "ccc": [1]
 +
  }
 +
</pre>
 +
 
 +
Or, using the alternative bulk format:
 +
* Intermediate bulk 1:
 +
<pre>
 +
{ "_key": "aaa","_values": [1] }
 +
{ "_key": "bbb","_values": [1] }
 +
{ "_key": "ccc","_values": [2, 1] }
 +
</pre>
 +
* Intermediate bulk 2:
 +
<pre>
 +
{ "_key": "bbb","_values": [1] }
 +
{ "_key": "ccc","_values": [1] }
 +
</pre>
 +
 
 +
The WordCountReduceWorker reads those two intermediate results and produces the final result:
 +
 
 +
<pre>
 +
{"_recordid": "result",
 +
"_mapReduce": {
 +
    "aaa": [1],
 +
    "bbb": [2],
 +
    "ccc": [4]
 +
  }
 +
</pre>
  
 
==== average value calculation ====
 
==== average value calculation ====
  
tbd (e.g. crawl filesystem, calculate avergae file size for each file extension)
+
e.g. crawl some files and calculate the average file size for each file extension
 +
 
 +
Workflow: Crawler -> AvgFileSizeMapWorker -> AvgFileSizeReduceWorker
 +
 
 +
Assuming the crawling produces three records in two record bulks as input for the MapWorker:
 +
 
 +
<pre>
 +
{"_recordid": "r1", "FileSize": "100", "FileExtension":"html" }
 +
{"_recordid": "r2", "FileSize": "100", "FileExtension":"txt" }
 +
{"_recordid": "r2", "FileSize": "400", "FileExtension":"html" }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "r3", "FileSize": "300", "FileExtension":"txt" }
 +
{"_recordid": "r4", "FileSize": "400", "FileExtension":"html"}
 +
</pre>
 +
 
 +
This will produce two tasks for the AvgFileSizeMapWorker which produces two intermediate record bulks:
 +
 
 +
<pre>
 +
{"_recordid": "intermediate1",
 +
"_mapReduce": {
 +
    "html": [100, 400],
 +
    "txt": [100],
 +
  }
 +
</pre>
 +
 
 +
<pre>
 +
{"_recordid": "intermediate2",
 +
"_mapReduce": {
 +
    "html": [400],
 +
    "txt": [300]
 +
  }
 +
</pre>
 +
 
 +
Or, again with alternative format:
 +
<pre>
 +
{ "_key": "html", "_values": [100, 400] }
 +
{ "_key": "txt", "_values": [100] }
 +
</pre>
 +
<pre>
 +
{ "_key": "html", "_values": [400] }
 +
{ "_key": "txt", "_values": [300] }
 +
</pre>
 +
 
 +
The AvgFileSizeReduceWorker reads those two intermediate results and produces the final result:
 +
 
 +
<pre>
 +
{"_recordid": "result",
 +
"_mapReduce": {
 +
    "html": [300],
 +
    "txt": [200]
 +
}
 +
</pre>
  
 
==== inverted indexing of document terms ====
 
==== inverted indexing of document terms ====
  
 
tbd (key: term, value: document which contains the term)
 
tbd (key: term, value: document which contains the term)

Latest revision as of 08:35, 15 February 2013

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. SMILA offers great flexibility in the job management / asynchronous workflows, so let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.

MapReduce

(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.

  • Map: record -> (K,V)*
  • Reduce: K,V* -> result

To integrate this easily in SMILA's data model, intermediate and output results should also be records resp. record bulks.

Worker

As a basic MapReduce support in SMILA we could provide an abstract MapWorker and an abstract ReduceWorker. To implement a dedicated MapReduce solution, subclasses of Map- and ReduceWorker have to be implemented. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could have a kind of script parameter to provide the map and reduce function.)

MapWorker:

 /** the map function which has to be implemented. */
 protected abstract void map(record)
 /** the resulting key/value pairs are delivered to the superclass. */
 protected void emit(key, value)
 /** this will be called after the input has been processed. */
 protected abstract void finish() 

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": [<values>], 
    "key2": [<values>], 
    ...
  } 
...
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
  • The MapWorker implementation should, if possible, use the so called in-mapper combining pattern which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key. The emitting of the values then could be done in finish(). This would reduce the number of output values.

Alternative output format: The Map worker produces a record bulk with one record per key:

{
  "_key": "<key1>",
  "_values": [<values>],
  ...
}
{
  "_key": "<key2>",
  "_values": [<values>],
  ...
}
...

This is easier to read via streaming in the Reduce worker. Further, it allows that the keys have arbitrary data types (even complex types). The map worker must still ensure that the records are sorted by key in the output bulk.

ReduceWorker:

 /** the reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter.

Workflow

The workflow consists of (at least) a MapWorker and a ReduceWorker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETING phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)

MergeWorker

A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort as already described before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worse performance due to the additional workflow step with reading and writing another intermediate result.

Questions

  • Is there a better idea beside using this "trick" with the COMPLETING step for starting the Reduce phase?
    • Yes, we introduced the "barrier" worker mode to support this. Now we just need a special task generator for the reduce workers.
  • Is it really possible for the ReduceWorker to process the intermediate record bulks as stream so that it doesn't have to hold the whole intermediate results in memory? Or do we need a special kind/format of intermediate result to allow such a streamed processing?
  • Does it make sense to have a Combine phase, resp. a CombineWorker?
  • Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?

Examples

word count

Workflow: Crawler -> Fetcher -> WordCountMapWorker -> WordCountReduceWorker

Assuming the crawling produces three records in two record bulks as input for the MapWorker:

{"_recordid": "r1", "content": "aaa ccc ccc" }
{"_recordid": "r2", "content": "ccc bbb" }
{"_recordid": "r3", "content": "bbb ccc" }

This will produce two tasks for the WordCountMapWorker which produces two intermediate record bulks:

{"_recordid": "intermediate1", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [1], 
    "ccc": [2,1]     // could also be [3] when using in-mapper combining
  } 
{"_recordid": "intermediate2", 
 "_mapReduce": { 
    "bbb": [1], 
    "ccc": [1]
  } 

Or, using the alternative bulk format:

  • Intermediate bulk 1:
{ "_key": "aaa","_values": [1] }
{ "_key": "bbb","_values": [1] }
{ "_key": "ccc","_values": [2, 1] }
  • Intermediate bulk 2:
{ "_key": "bbb","_values": [1] }
{ "_key": "ccc","_values": [1] }

The WordCountReduceWorker reads those two intermediate results and produces the final result:

{"_recordid": "result", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [2], 
    "ccc": [4]
  } 

average value calculation

e.g. crawl some files and calculate the average file size for each file extension

Workflow: Crawler -> AvgFileSizeMapWorker -> AvgFileSizeReduceWorker

Assuming the crawling produces three records in two record bulks as input for the MapWorker:

{"_recordid": "r1", "FileSize": "100", "FileExtension":"html" }
{"_recordid": "r2", "FileSize": "100", "FileExtension":"txt" }
{"_recordid": "r2", "FileSize": "400", "FileExtension":"html" }
{"_recordid": "r3", "FileSize": "300", "FileExtension":"txt" }
{"_recordid": "r4", "FileSize": "400", "FileExtension":"html"}

This will produce two tasks for the AvgFileSizeMapWorker which produces two intermediate record bulks:

{"_recordid": "intermediate1", 
 "_mapReduce": {
    "html": [100, 400], 
    "txt": [100], 
  } 
{"_recordid": "intermediate2", 
 "_mapReduce": { 
    "html": [400], 
    "txt": [300]
  } 

Or, again with alternative format:

{ "_key": "html", "_values": [100, 400] }
{ "_key": "txt", "_values": [100] }
{ "_key": "html", "_values": [400] }
{ "_key": "txt", "_values": [300] }

The AvgFileSizeReduceWorker reads those two intermediate results and produces the final result:

{"_recordid": "result", 
 "_mapReduce": {
    "html": [300], 
    "txt": [200]
 } 

inverted indexing of document terms

tbd (key: term, value: document which contains the term)

Back to the top