SMILA/Documentation/TaskManager

From Eclipsepedia

Jump to: navigation, search

Contents

TaskManager

The TaskManager is a component for administrating and delivering tasks during a job run. Tasks are stored in internal queues. They are consumed by workers pulling them for processing. They are produced as initial tasks or as follow-up tasks when workers finish their processing of their current task.

The TaskManager guarantees that tasks are delivered at least once, but tasks may be delivered more than once (e.g. when IOErrors, process crashes or timeouts occur). A client must be able to handle a task multiple times. But redelivery of tasks is the exception to the rule, usually during task processing we do not expect errors to happen. There are some rare cases in which a task is successfully processed (i.e. output data and a possible follow up task are created successfully) but the task is redelivered nonetheless. This could happen, if:

  • the worker process crashes exactly after that step OR
  • the TTL of the task times out exactly at the time when the worker is just finishing, in which case the the task is rolled back inside the TaskManager automatically.

Fairness

The TaskManager implements a fair delivery of (job-)tasks on a worker level. That means, if tasks of multiple jobs for the same worker are waiting to be processed ('todo'), the Task Manager will deliver these tasks alternately for each job using a “round robin” mechanism. The task age is not considered here, hence the tasks are distributed fairly among the jobs, even if the tasks of one job were produced earlier than the others.

In a cluster environment the information needed for fairness is not shared among the cluster. Fairness is only implemented locally in the TaskManager on each node. (In practice this should have no negative impact.)

Configuration

The TaskManager is configured via the ClusterConfig service. With the "simple" ClusterConfig service, it uses the properties in the "taskmanager" section of clusterconfig.json:

{
  ...
  "taskmanager": {
    "maxScaleUp" : 16,
    "maxRetries": 10,
    "timeToLive": 300,
    "resumeJobs": false
  },
  ...
}
  • maxScaleUp: Maximum number of scale-up tasks (tasks for worker that set the "workerHost" property in the Get-Task request, see below) allowed on a single node. Usually this covers tasks for workers that are not marked as "runAlways". If a worker tries to get a task when the limit is reached, the taskmanager will answer as if there was no task available currently. Meaning, if the maxScaleUp value of the taskmanager is less or equal to the sum of all the maxScaleUp values of the workers and pipelets no asynchronous work will be done. See WorkerManager, too.
  • timeToLive: Time in seconds before an in-progress task is finished with a recoverable error result when it is not kept alive by the worker. Default is 300 seconds. Whether the task is repeated, committed or fails completely depends on the worker description and job manager configuration.
  • maxRetries: Used by the JobManager to decide how often a task should be retried that has failed with an RECOVERABLE_ERROR, either because the "timeToLive" was exceeded or the worker itself reported such an error. If the retry limit is reached, the task will finally fail with a FATAL_ERROR. (Technically, this property is used by the Jobmanager, not the TaskManager itself)
  • resumeJobs: By default, all information about running jobs and current tasks is removed when the SMILA process is stopped or killed and restarted, no jobs are running then and no tasks are in todo or in progress. Setting the "resumeJobs" flag to true causes SMILA to resume all jobs and continue to process the tasks leftover from the previous process. See more details on resuming jobs below.

External REST API

The external ReST API provides a number of endpoints for use by the administrator to watch the state of the taskmanager. This includes getting numbers of tasks in the various task pipes and being able to look into the content of tasks.

TaskManager State and Configuration Overview

Use a GET request to get information about taskmanager state.

Supported operations:

  • GET: get information about taskmanager task state.

Usage:

  • URL: http://<hostname>:8080/smila/tasks.
  • Allowed methods:
    • GET
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 500 INTERNAL SERVER ERROR: If the taskmanager is not operational anymore because too many nodes are already dead.

The result consists of three parts:

  • The failsafety setting is the maximum number of nodes that may fail before the taskmanager stops working. Nodes that are already down at this moment are not subtracted from the number.
  • After that the workers sections with counters for tasks todo or in inprogress for each worker and links to get more details about one type of tasks in a single worker follows. To retrieve the number of tasks todo per single jobs, the URL has to contain ...?returnDetails=true; this information is not included if the parameter returnDetails is set to false or missing.

The workers section always includes a _finishingTasks worker. This is an internal worker that picks up real worker tasks that have been finished by the worker or the taskmanager (in case of missing keep-alives or canceled workflow run) and sends them to the JobManager for really finishing them (aggregating counters, computing follow-up tasks, etc.)

  • Finally there is an overview on the scale-up situation: first "maxScaleUp" describes the configured maximum number of tasks to be processed by scaling workers at the same time. A value of -1 means that there is no limit configured. Then "scaleUp" gives an array of maps each containing a host name (as in cluster.ini) and a counter describing the number of currently working scaling worker on this host. "runAlways" workers do not contribute to this counter, and a host on which no scaling worker has yet started to work will not occur in the list.

TaskManager Configuration Update

Use a POST request to change taskmanager configuration settings on runtime. Currently, only the "maxScaleUp" setting is supported.

Supported operations:

  • POST: post new configuration

Usage:

  • URL: http://<hostname>:8080/smila/tasks.
  • Allowed methods:
    • POST
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 500 INTERNAL SERVER ERROR: Update operation failed

Sample Request:

POST http://<hostname>:8080/smila/tasks

{
  "maxScaleUp": 10
}


Worker Overview

Use a GET request to get information about a specific worker.

Supported operations:

  • GET: get information.

Usage:

  • URL: http://<hostname>:8080/smila/tasks/<worker>/<type>/.
  • Allowed methods:
    • GET: shows the number of tasks and links to the first 100 tasks for a given worker and type. The <type> can currently be:
      • todo: tasks not yet fetched by any worker (or retried)
      • inprogress: tasks currently worked on by a worker.
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 404 NOT FOUND: bad URL path, e.g. bad worker or type
    • 500 INTERNAL SERVER ERROR: If the taskmanager not available or is not operational anymore because too many nodes are already dead.

Task Content

Shows the content of a task in JSON format.

Supported operations:

  • GET: get information about specific task.

Usage:

  • URL: http://<hostname>:8080/smila/tasks/<worker>/<type>/<taskname>/.
  • Allowed methods:
    • GET: Shows the content of a task in JSON format. Additionally the creation and modification timestamps are given. For todo tasks they should be identical, for inprogress tasks the modification timestamp is the timestamp of the last keepAlive request that was successfully executed. The <taskname> is not necessarily the task ID, but an internal name for the task as created by the /smila/tasks/<worker>/<type>/ handler. In addition to todo and inprogress, <type> can be todo_part in partitioned pipes. Tasks that are inprogress have an additional property inProgressBy that indicates which thread currently processes the task.
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 404 NOT FOUND: bad URL path, e.g. bad worker or type
    • 500 INTERNAL SERVER ERROR: If the taskmanager not available or is not operational anymore because too many nodes are already dead.

Tasks

The rest of this page is only of interest for you if you want to create a worker without using the WorkerManager framework and need to implement the interaction with the TaskManager on your own. This is the case if you need to create a worker that cannot run inside of SMILA for whatever reason, so you would need the HTTP API of the TaskManager to get, keep alive and finish tasks. First we describe the JSON representation of tasks and then the REST URLs for communication with the TaskManager.

JSON format of task (1)

{
  "taskId": "...",
  "workerName": "...",
  <"qualifier": "...">,
  "properties": { 
    "name": "value", ...
  }
  "parameters": { 
    "name": "value", ...
  }
  "input": {
    "slot1": [ { "bucket": "...", "store": "...", "id": "..." } , ... ],
    "slot2": [ { "bucket": "...", "store": "...", "id": "..." } , ... ],
    ...
  }
  "output": {
    "slot1": [ { "bucket": "...", "store": "...", "id": "..." } , ... ],
    "slot2": [ { "bucket": "...", "store": "...", "id": "..." } , ... ],
    ...
  }
}

A task consists of

  • task ID, workerName, qualifier: needed by the taskmanager to manage the task.
  • properties: used by the jobmanager to associate the task to its job/workflow run. The taskmanager may also add properties.
    • "recoverable": if set to 'false' this task will not be retried after a timeout or after a recoverable worker error
    • "workerHost": the host where the worker is running that requested the task (only set if worker sets host as request parameter
    • "createdTime", "startTime", "taskAge": The time the task was created, retrieved by the worker and the difference in milliseconds. Apart from statistical purposes this can for example be used by a worker to decide that a task must be processed instead of being postponed because some age limit has been reached.
  • parameters: worker parameters as defined in workflow and job definition.
  • input: Lists of bulk infos associated the worker's input slots describing which data has to be processed to complete this task. May be empty for "initial tasks" (e.g. BulkBuilder).
  • output: Lists of bulk infos associated to the worker's output slots describing where to put the results of the task (currently it's always a single bulk info per slot). May be empty completely (e.g. for the HSSI record deleter worker).

The "bucket" name in the bulk info is usually irrelevant to the worker, it just needs to read the "store" and "id" to be able to find and create data objects.

JSON format of qualifier condition (2)

{ 
  "qualifier": [ "parts/abcd", "parts/1234", ... ]
  <, "workerHost": "...">
}

The qualifier is used in POST requests to /taskmanager/<worker> to get only tasks that have one of the given set as the "qualifier" field. This is used for example by the HSSI record delete worker to receive only tasks for certain partitions. The qualifier can be set in a task by adding the mode "qualifier" to the the input slot of a worker in which case the "id" value of the bulk in this slot is used as the qualifier. Note that the qualifier will be the complete object ID path, not only the UUID part. The workerHost must be set by scaling workers so that taskmanager can control the maximum number of tasks delivered to such workers.

JSON format of result description (3)

{
  "status": < "SUCCESSFUL", "FATAL_ERROR", "RECOVERABLE_ERROR", "POSTPONE" >,
  "errorCode": "...", 
  "errorMessage": "...",
  "counters": {
    "name": numbervalue,
    ...
  }
}

The result description is added to the "task finished" request so that the JobManager can decide on what to do next based on the result of the task. It consists of:

  • status: one of
    • SUCCESSFUL: the task has been processed completely and successfully. Follow-up tasks can be generated.
    • FATAL_ERROR: the task cannot be processed and should not be repeated, e.g. input data is corrupt. This leads to cancelling the complete associated workflow run as failed, so no further tasks can be created in this workflow run. (The job run as a whole is continued, of course).
    • RECOVERABLE_ERROR: the task could not be processed now, e.g. because input data was temporary not available. Usually the job manager will repeat this task until a configured retry limit is reached. However, special workers may specify that the task should not be repeated in this case but follow-up tasks should be created nethertheless.
    • POSTPONE: the worker cannot yet perform this task for some reason (no error, just waiting for a condition in the system) but it should be processed later. The task will be readded to the todo queue for this worker and redelivered later (but very soon, usually). There is no limit to retrying a task after postponing it.
  • errorCode/errorMessage: will currently be logged only. Could be merged to the job run result data in later versions to give an overview about errors that happened during the run.
  • counters: Integer or floating point numbers giving statistical information about the task processing. Not evaluated by task/job manager currently, but may be aggregated in the job run data in a later version.

Internal REST API

Get task information

Use a GET request to get information about a specific task. Use POST for getting information with qualifier condition.

Supported operations:

  • GET: get information about task.
  • POST: get information about task with qualifier condition. Qualifier condition is provided with JSON body (2).

Usage:

  • URL: http://<hostname>:8080/taskmanager/<workername>.
  • Allowed methods:
    • GET
    • POST
  • Response status codes:
    • 200 OK + JSON Body (1): Upon successful execution.
    • 204 NO CONTENT: No task available.

Get initial task information

Use a GET request to get initial task for given job.

Supported operations:

  • GET: get initial task for given job.

Usage:

  • URL: http://<hostname>:8080/taskmanager/<workername>/initialTask/<jobname>.
  • Allowed methods:
    • GET
  • Response status codes:
    • 200 OK + JSON Body (1): Upon successful execution.
    • 204 NO CONTENT: No task available.

Get task for scaling worker

Use a GET request to get task for scaling worker.

Supported operations:

  • GET: get task for scaling worker.

Usage:

  • URL: http://<hostname>:8080/taskmanager/<workername>?workerHost=<hostname>.
  • Allowed methods:
    • GET== External REST API ==

The external ReST API provides a number of endpoints for use by the administrator to watch the state of the taskmanager. This includes getting numbers of tasks in the various task pipes and being able to look into the content of tasks.

TaskManager State Overview

Supported operations:

  • GET: get information about taskmanager task state.

Usage:

  • URL: http://<hostname>:8080/smila/tasks/<worker>/<type>/.
  • Allowed methods:
    • GET: shows the number of tasks and links to the first 100 tasks for a given worker and type. The <type> can currently be:
    • todo: tasks not yet fetched by any worker (or retried)
    • inprogress: tasks currently worked on by a worker.
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 404 NOT FOUND: bad URL path, e.g. bad worker or type
    • 500 INTERNAL SERVER ERROR: If the taskmanager not available or is not operational anymore because too many nodes are already dead.

Use a GET request to get information about taskmanager state.

Supported operations:

  • GET: get information about taskmanager task state.

Usage:

  • URL: http://<hostname>:8080/smila/tasks.
  • Allowed methods:
    • GET
  • Response status codes:
    • 200 OK: Upon successful execution.
    • 500 INTERNAL SERVER ERROR: If the taskmanager is not operational anymore because too many nodes are already dead.

The taskmanager state consists of three parts:

  • The failsafety value is the maximum number of nodes that may fail before the taskmanager stops working. Nodes that are already down at this moment are not subtracted from the number.
  • Follows the workers sections with counters for tasks todo or in inprogress for each worker and links to get more details about one type of tasks in a single worker. It always includes a _finishingTasks worker. This is an internal worker that picks up real worker tasks that have been finished by the worker or the taskmanager (in case of missing keep-alives or canceled workflow run) and sends them to the JobManager for really finishing them (aggregating counters, computing follow-up tasks, etc.)
  • Finally there is an overview on the scale-up situation: first "maxScaleUp" describes the configured maximum number of tasks to be processed by scaling workers at the same time. A value of -1 means that there is no limit configured. Then "scaleUp" gives an array of maps each containing a host name (as in cluster.ini) and a counter describing the number of currently working scaling worker on this host. "runAlways" workers do not contribute to this counter, and a host on which no scaling worker has yet started to work will not occur in the list.
  • Response status codes:
    • 200 OK + JSON Body (1): Upon successful execution.
    • 204 NO CONTENT: No task available.

Finish or keep alive task

Use a POST request and JSON body (3) to finish a task or use POST and empty body to send a keep-alive signal for a task in progress.

Supported operations:

  • POST: with body finish task, without body send keep-alive

Usage:

  • URL: http://<hostname>:8080/taskmanager/<workername>/<taskId>.
  • Allowed methods:
    • POST
  • Response status codes:
    • 200 OK: Upon successful execution for finishing a task with result description.
    • 202 ACCEPTED: task is kept alive during progress.