Improve this page Quickly fork, edit online, and submit a pull request for this page. Requires a signed-in GitHub account. This works well for small changes. If you'd like to make larger changes you may want to consider using local clone. Page wiki View or edit the community-maintained wiki page associated with this page.

std.parallelism

std.parallelism implements high-level primitives for SMP parallelism. These include parallel foreach, parallel reduce, parallel eager map, pipelining and future/promise parallelism. std.parallelism is recommended when the same operation is to be executed in parallel on different data, or when a function is to be executed in a background thread and its result returned to a well-defined main thread. For communication between arbitrary threads, see std.concurrency.

std.parallelism is based on the concept of a Task. A Task is an object that represents the fundamental unit of work in this library and may be executed in parallel with any other Task. Using Task directly allows programming with a future/promise paradigm. All other supported parallelism paradigms (parallel foreach, map, reduce, pipelining) represent an additional level of abstraction over Task. They automatically create one or more Task objects, or closely related types that are conceptually identical but not part of the public API.

After creation, a Task may be executed in a new thread, or submitted to a TaskPool for execution. A TaskPool encapsulates a task queue and its worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that is associated with exactly one task queue. It executes the Task at the front of its queue when the queue has work available, or sleeps when no work is available. Each task queue is associated with zero or more worker threads. If the result of a Task is needed before execution by a worker thread has begun, the Task can be removed from the task queue and executed immediately in the thread where the result is needed.

Warning:
Unless marked as @trusted or @safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.

Synopsis:
import std.algorithm, std.parallelism, std.range;

void main() {
    // Parallel reduce can be combined with
    // std.algorithm.map to interesting effect.
    // The following example (thanks to Russel Winder)
    // calculates pi by quadrature  using
    // std.algorithm.map and TaskPool.reduce.
    // getTerm is evaluated in parallel as needed by
    // TaskPool.reduce.
    //
    // Timings on an Athlon 64 X2 dual core machine:
    //
    // TaskPool.reduce:       12.170 s
    // std.algorithm.reduce:  24.065 s

    immutable n = 1_000_000_000;
    immutable delta = 1.0 / n;

    real getTerm(int i)
    {
        immutable x = ( i - 0.5 ) * delta;
        return delta / ( 1.0 + x * x ) ;
    }

    immutable pi = 4.0 * taskPool.reduce!"a + b"(
        std.algorithm.map!getTerm(iota(n))
    );
}

Source:
std/parallelism.d

Author:
David Simcha

License:
Boost License 1.0

struct Task(alias fun, Args...);
Task represents the fundamental unit of work. A Task may be executed in parallel with any other Task. Using this struct directly allows future/promise parallelism. In this paradigm, a function (or delegate or other callable) is executed in a thread other than the one it was called from. The calling thread does not block while the function is being executed. A call to workForce, yieldForce, or spinForce is used to ensure that the Task has finished executing and to obtain the return value, if any. These functions and done also act as full memory barriers, meaning that any memory writes made in the thread that executed the Task are guaranteed to be visible in the calling thread after one of these functions returns.

The std.parallelism.task and std.parallelism.scopedTask functions can be used to create an instance of this struct. See task for usage examples.

Function results are returned from yieldForce, spinForce and workForce by ref. If fun returns by ref, the reference will point to the returned reference of fun. Otherwise it will point to a field in this struct.

Copying of this struct is disabled, since it would provide no useful semantics. If you want to pass this struct around, you should do so by reference or pointer.

BUGS:
Changes to ref and out arguments are not propagated to the call site, only to args in this struct.

alias args = _args[1 .. __dollar];
The arguments the function was called with. Changes to out and ref arguments will be visible here.

alias ReturnType = typeof(fun(_args));
The return type of the function called by this Task. This can be void.

@trusted ReturnType spinForce();
If the Task isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value. If it threw an exception, rethrow that exception.

This function should be used when you expect the result of the Task to be available on a timescale shorter than that of an OS context switch.

@trusted ReturnType yieldForce();
If the Task isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable. If it threw an exception, rethrow that exception.

This function should be used for expensive functions, as waiting on a condition variable introduces latency, but avoids wasted CPU cycles.

@trusted ReturnType workForce();
If this Task was not started yet, execute it in the current thread. If it is finished, return its result. If it is in progress, execute any other Task from the TaskPool instance that this Task was submitted to until this one is finished. If it threw an exception, rethrow that exception. If no other tasks are available or this Task was executed using executeInNewThread, wait on a condition variable.

@trusted bool done();
Returns true if the Task is finished executing.

Throws:
Rethrows any exception thrown during the execution of the Task.

@trusted void executeInNewThread();
@trusted void executeInNewThread(int priority);
Create a new thread for executing this Task, execute it in the newly created thread, then terminate the thread. This can be used for future/promise parallelism. An explicit priority may be given to the Task. If one is provided, its value is forwarded to core.thread.Thread.priority. See std.parallelism.task for usage example.

auto task(alias fun, Args...)(Args args);
Creates a Task on the GC heap that calls an alias. This may be executed via Task.executeInNewThread or by submitting to a std.parallelism.TaskPool. A globally accessible instance of TaskPool is provided by std.parallelism.taskPool.

Returns:
A pointer to the Task.

Examples:
// Read two files into memory at the same time.
import std.file;

void main()
{
    // Create and execute a Task for reading
    // foo.txt.
    auto file1Task = task!read("foo.txt");
    file1Task.executeInNewThread();

    // Read bar.txt in parallel.
    auto file2Data = read("bar.txt");

    // Get the results of reading foo.txt.
    auto file1Data = file1Task.yieldForce;
}

// Sorts an array using a parallel quick sort algorithm.
// The first partition is done serially.  Both recursion
// branches are then executed in parallel.
//
// Timings for sorting an array of 1,000,000 doubles on
// an Athlon 64 X2 dual core machine:
//
// This implementation:               176 milliseconds.
// Equivalent serial implementation:  280 milliseconds
void parallelSort(T)(T[] data)
{
    // Sort small subarrays serially.
    if(data.length < 100)
    {
         std.algorithm.sort(data);
         return;
    }

    // Partition the array.
    swap(data[$ / 2], data[$ - 1]);
    auto pivot = data[$ - 1];
    bool lessThanPivot(T elem) { return elem < pivot; }

    auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
    swap(data[$ - greaterEqual.length - 1], data[$ - 1]);

    auto less = data[0..$ - greaterEqual.length - 1];
    greaterEqual = data[$ - greaterEqual.length..$];

    // Execute both recursion branches in parallel.
    auto recurseTask = task!parallelSort(greaterEqual);
    taskPool.put(recurseTask);
    parallelSort(less);
    recurseTask.yieldForce;
}

auto task(F, Args...)(F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);
Creates a Task on the GC heap that calls a function pointer, delegate, or class/struct with overloaded opCall.

Examples:
// Read two files in at the same time again,
// but this time use a function pointer instead
// of an alias to represent std.file.read.
import std.file;

void main()
{
    // Create and execute a Task for reading
    // foo.txt.
    auto file1Task = task(&read, "foo.txt");
    file1Task.executeInNewThread();

    // Read bar.txt in parallel.
    auto file2Data = read("bar.txt");

    // Get the results of reading foo.txt.
    auto file1Data = file1Task.yieldForce;
}

Notes:
This function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, see scopedTask, which takes a scope delegate.

@trusted auto task(F, Args...)(F fun, Args args) if (is(typeof(fun(args))) && isSafeTask!F);
Version of task usable from @safe code. Usage mechanics are identical to the non-@safe case, but safety introduces some restrictions:

1. fun must be @safe or @trusted.

2. F must not have any unshared aliasing as defined by std.traits.hasUnsharedAliasing. This means it may not be an unshared delegate or a non-shared class or struct with overloaded opCall. This also precludes accepting template alias parameters.

3. Args must not have unshared aliasing.

4. fun must not return by reference.

5. The return type must not have unshared aliasing unless fun is pure or the Task is executed via executeInNewThread instead of using a TaskPool.

auto scopedTask(alias fun, Args...)(Args args);
auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);
@trusted auto scopedTask(F, Args...)(F fun, Args args) if (is(typeof(fun(args))) && isSafeTask!F);
These functions allow the creation of Task objects on the stack rather than the GC heap. The lifetime of a Task created by scopedTask cannot exceed the lifetime of the scope it was created in.

scopedTask might be preferred over task:

1. When a Task that calls a delegate is being created and a closure cannot be allocated due to objects on the stack that have scoped destruction. The delegate overload of scopedTask takes a scope delegate.

2. As a micro-optimization, to avoid the heap allocation associated with task or with the creation of a closure.

Usage is otherwise identical to task.

Notes:
Task objects created using scopedTask will automatically call Task.yieldForce in their destructor if necessary to ensure the Task is complete before the stack frame they reside on is destroyed.

immutable uint totalCPUs;
The total number of CPU cores available on the current machine, as reported by the operating system.

class TaskPool;
This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.

This class should usually be used via the global instantiation available via the std.parallelism.taskPool property. Occasionally it is useful to explicitly instantiate a TaskPool:

1. When you want TaskPool instances with multiple priorities, for example a low priority pool and a high priority pool.

2. When the threads in the global task pool are waiting on a synchronization primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.

@property @trusted TaskPool taskPool();
Returns a lazily initialized global instantiation of TaskPool. This function can safely be called concurrently from multiple non-worker threads. The worker threads in this pool are daemon threads, meaning that it is not necessary to call TaskPool.stop or TaskPool.finish before terminating the main thread.

@property @trusted uint defaultPoolThreads();
@property @trusted void defaultPoolThreads(uint newVal);
These properties get and set the number of worker threads in the TaskPool instance returned by taskPool. The default value is totalCPUs - 1. Calling the setter after the first call to taskPool does not changes number of worker threads in the instance returned by taskPool.

ParallelForeach!R parallel(R)(R range);
ParallelForeach!R parallel(R)(R range, size_t workUnitSize);
Convenience functions that forwards to taskPool.parallel. The purpose of these is to make parallel foreach less verbose and more readable.

Example:
// Find the logarithm of every number from
// 1 to 1_000_000 in parallel, using the
// default TaskPool instance.
auto logs = new double[1_000_000];

foreach(i, ref elem; parallel(logs)) {
    elem = log(i + 1.0);
}