Jump to: navigation, search

Difference between revisions of "SMILA/Discussion/MapReduce"

m
m
Line 23: Line 23:
 
   /** the map function which has to be implemented. */
 
   /** the map function which has to be implemented. */
 
   protected abstract void map(record)
 
   protected abstract void map(record)
   /** the resulting key/values pairs are delivered to the superclass. */
+
   /** the resulting key/value pairs are delivered to the superclass. */
   protected void emit(key, values)
+
   protected void emit(key, value)  
 
</code>  
 
</code>  
  
To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records like that:
+
To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:
  
 
<pre>
 
<pre>
 
{"_recordid": "...",  
 
{"_recordid": "...",  
 
  "_mapReduce": {
 
  "_mapReduce": {
     "key1": <values>,  
+
     "key1": [<values>],  
     "key2": <values>,  
+
     "key2": [<values>],  
 
     ...
 
     ...
 
   }  
 
   }  
Line 41: Line 41:
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
 
* The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
* The MapWorker implementations should use the so called "in-mapper combining" pattern which means that it already cumulates the results for the same keys. So, no key should be emitted twice. We can not cumulate the values for the keys in the abstract MapWorker, cause we don't know anything about the Map function that is implemented, e.g. if we should sum or concat the values or whatever.
+
* The abstract MapWorker will use the so called ''in-mapper combining pattern'' which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key.
  
 
'''ReduceWorker:'''
 
'''ReduceWorker:'''
  
 
<code>
 
<code>
   /** the main reduce function which has to be implemented. */
+
   /** the reduce function which has to be implemented. */
 
   protected abstract void reduce(key, values)
 
   protected abstract void reduce(key, values)
 
   /** the result for each key is delivered to the superclass. */
 
   /** the result for each key is delivered to the superclass. */
Line 52: Line 52:
 
</code>  
 
</code>  
  
The (abstract) ReduceWorker could store the result in one (or more?) record(s), also below the "_mapReduce" parameter(?)
+
The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter(?)
  
 
=== Workflow ===  
 
=== Workflow ===  
Line 68: Line 68:
  
 
A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.
 
A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.
 +
 +
=== Open issues ===
 +
 +
* Does it make sense to have a Combine phase, resp. a CombineWorker?
 +
 +
* Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?
 +
  
 
== Examples ==
 
== Examples ==

Revision as of 17:24, 31 October 2012

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.

MapReduce

(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.

  • Map: record -> (K,V)*
  • Reduce: K,V* -> result

To integrate this easily in SMILA's data model, intermediate and output results should also be records resp. record bulks.

Worker

As a basic MapReduce support in SMILA we could provide an abstract MapWorker and an abstract ReduceWorker. To implement a dedicated MapReduce solution, subclasses of Map- and ReduceWorker have to be implemented. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could have a kind of script parameter to provide the map and reduce function.)

MapWorker:

 /** the map function which has to be implemented. */
 protected abstract void map(record)
 /** the resulting key/value pairs are delivered to the superclass. */
 protected void emit(key, value) 

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": [<values>], 
    "key2": [<values>], 
    ...
  } 
...
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
  • The abstract MapWorker will use the so called in-mapper combining pattern which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key.

ReduceWorker:

 /** the reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter(?)

Workflow

The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)

MergeWorker

A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.

Open issues

  • Does it make sense to have a Combine phase, resp. a CombineWorker?
  • Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?


Examples

word count

tbd

average value calculation

tbd (e.g. crawl filesystem, calculate avergae file size for each file extension)

inverted indexing of document terms

tbd (key: term, value: document which contains the term)