Coursera: parallel programming

Task-level Parallelism

finish {
  async S1; // asynchronously compute sum of the lower half of the array
  S2;       // compute sum of the upper half of the array in parallel with S1
}
S3; // combine the two partial sums after both S1 and S2 have finished

We learned the async notation for task creation: “async ⟨stmt1⟩”, causes the parent task (i.e., the task executing the async statement) to create a new child task to execute the body of the async, ⟨stmt1⟩, asynchronously (i.e., before, after, or in parallel) with the remainder of the parent task. We also learned the finish notation for task termination: “finish ⟨stmt2⟩” causes the parent task to execute ⟨stmt2⟩, and then wait until ⟨stmt2⟩ and all async tasks created within ⟨stmt2⟩ have completed. Async and finish constructs may be arbitrarily nested.

💡 Great article about Fork/Join

  • Applying a divide and conquer principle, the framework recursively divides the task into smaller subtasks until a given threshold is reached. This is the fork part.

    Then, the subtasks are processed independently and if they return a result, all the results are recursively combined into a single result. This is the join part.

  • To execution the tasks in parallel, the framework uses a pool of threads, with a number of threads equal to the number of processors available to the Java Virtual Machine (JVM) by default.

  • Each thread has its own double-ended queue (deque) to store the tasks that will execute.

    A deque is a type of queue that supports adding or removing elements from either the front (head) or the back (tail). This allows two things:

    • A thread can execute only one task at a time (the task at the head of its deque).

    • A work-stealing algorithm s implemented to balance the thread’s workload.

    With the work-stealing algorithm, threads that run out of tasks to process can steal tasks from other threads that are still busy (by removing tasks from the tail of their deque).

Computation Graph

A simple observation made by Gene Amdahl in 1967: if q ≤ 1 is the fraction of WORK in a parallel program that must be executed sequentially, then the best speedup that can be obtained for that program for any number of processors, P , is Speedup(P)≤ 1/q.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public final class ReciprocalArraySum {

    public static final int SEQUENTIAL_THRESHOLD = 100000;
    
    private static class ReciprocalArraySumTask extends RecursiveAction {
        private final int startIndexInclusive;
        private final int endIndexExclusive;
        private final double[] input;
        private double value;

        ReciprocalArraySumTask(final int setStartIndexInclusive,
                final int setEndIndexExclusive, final double[] setInput) {
            this.startIndexInclusive = setStartIndexInclusive;
            this.endIndexExclusive = setEndIndexExclusive;
            this.input = setInput;
        }

        public double getValue() {
            return value;
        }

        @Override
        protected void compute() {
            if (endIndexExclusive - startIndexInclusive <= SEQUENTIAL_THRESHOLD) {
                for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
                    value += 1 / input[i];
                }
            } else {
                ReciprocalArraySumTask left = new ReciprocalArraySumTask(
                        startIndexInclusive,
                        (endIndexExclusive+startIndexInclusive)/2,
                        input);
                ReciprocalArraySumTask right = new ReciprocalArraySumTask(
                        (endIndexExclusive+startIndexInclusive)/2,
                        endIndexExclusive,
                        input);
                left.fork();
                right.compute();
                left.join();
                value = left.getValue() + right.getValue();
            }
        }
    }
    
    private static double parArraySum(final double[] input) {
        ReciprocalArraySumTask t = new ReciprocalArraySumTask(0, input.length, input);
        ForkJoinPool.commonPool().invoke(t);
        return t.getValue();
    }
}
RecursiveTaskRecursiveAction

V compute();

void compute();

join() waits until task to be completed

join() waits until action to be completed

Memoization

the basic idea of “memoization”, which is to remember results of function calls f (x) as follows:

  1. Create a data structure that stores the set {(x_11​, y_11​ = f (x_11​)), (x_22​,y_22​ = f (x_22​)), . . .}for each call f (x_ii​) that returns y_ii​.

  2. Perform look ups in that data structure when processing calls of the form f (x') when x'equals one of the x_ii​ inputs for which f (x_ii​) has already been computed.

Memoization can be especially helpful for algorithms based on dynamic programming. In the lecture, we used Pascal’s triangle as an illustrative example to motivate memoization.

The memoization pattern lends itself easily to parallelisation using futures by modifying the memoized data structure to store {(x_11​, y_11​ = future(f (x_11​))), (x_22​, y_22​ = future(f (x_22​))), . . .}. The lookup operation can then be replaced by a get() operation on the future value, if a future has already been created for the result of a given input.

Determinism

  • Functional

    • same input -> same output

  • Structural

    • same input -> same computational graph

The presence of data races often leads to functional and/or structural nondeterminism because a parallel program with data races may exhibit different behaviors for the same input, depending on the relative scheduling and timing of memory accesses involved in a data race.

Parallel Loops

The most general way is to think of each iteration of a parallel loop as an async task, with a finish construct encompassing all iterations. This approach can support general cases such as parallelization of the following pointer-chasing while loop (in pseudocode):

finish {for (p = head; p != null ; p = p.next) async compute(p);}

However, further efficiencies can be gained by paying attention to counted-for loops for which the number of iterations is known on entry to the loop (before the loop executes its first iteration). We then learned the forall notation for expressing parallel counted-for loops, such as in the following vector addition statement (in pseudocode):

forall (i : [0:n-1]) a[i] = b[i] + c[i]

We also discussed the fact that Java streams can be an elegant way of specifying parallel loop computations that produce a single output array, e.g., by rewriting the vector addition statement as follows:

a = IntStream.rangeClosed(0, N-1).parallel().toArray(i -> b[i] + c[i]);

In summary, streams are a convenient notation for parallel loops with at most one output array, but the forall notation is more convenient for loops that create/update multiple output arrays, as is the case in many scientific computations. For generality, we will use the forall notation for parallel loops in the remainder of this module.

Iteration Grouping: Chunking of Parallel Loops

In this lecture, we revisited the vector addition example:

forall (i : [0:n-1]) a[i] = b[i] + c[i]

We observed that this approach creates n tasks, one per forall iteration, which is wasteful when (as is common in practice) n is much larger than the number of available processor cores.

To address this problem, we learned a common tactic used in practice that is referred to as loop chunking or iteration grouping, and focuses on reducing the number of tasks created to be closer to the number of processor cores, so as to reduce the overhead of parallel execution:

With iteration grouping/chunking, the parallel vector addition example above can be rewritten as follows:

forall (g:[0:ng-1]) for (i : mygroup(g, ng, [0:n-1])) a[i] = b[i] + c[i]

Note that we have reduced the degree of parallelism from n to the number of groups, ng, which now equals the number of iterations/tasks in the forall construct.

There are two well known approaches for iteration grouping: block and cyclic. The former approach (block) maps consecutive iterations to the same group, whereas the latter approach (cyclic) maps iterations in the same congruence class (mod ng) to the same group. With these concepts, you should now have a better understanding of how to execute forall loops in practice with lower overhead.

Split-phase Barriers with Java Phasers

In this lecture, we examined a variant of the barrier example that we studied earlier:

forall (i : [0:n-1]) {   
    print HELLO, i;  
    myId = lookup(i); // convert int to a string   
    print BYE, myId;
}

We learned about Java’s Phaser class, and that the operation ph.arriveAndAwaitAdvance(), can be used to implement a barrier through phaser object ph. We also observed that there are two possible positions for inserting a barrier between the two print statements above — before or after the call to lookup(i). However, upon closer examination, we can see that the call to lookup(i) is local to iteration i and that there is no specific need to either complete it before the barrier or to complete it after the barrier. In fact, the call to lookup(i) can be performed in parallel with the barrier. To facilitate this split-phase barrier (also known as a fuzzy barrier) we use two separate APIs from Java Phaser class — ph.arrive() and ph.awaitAdvance(). Together these two APIs form a barrier, but we now have the freedom to insert a computation such as lookup(i) between the two calls as follows:

// initialize phaser ph	for use by n tasks ("parties") 
Phaser ph = new Phaser(n);
// Create forall loop with n iterations that operate on ph 
forall (i : [0:n-1]) {
  print HELLO, i;
  int phase = ph.arrive();
  
  myId = lookup(i); // convert int to a string

  ph.awaitAdvance(phase);
  print BYE, myId;
}

Doing so enables the barrier processing to occur in parallel with the call to {\tt lookup(i)}lookup(i), which was our desired outcome.

Last updated