# Coursera: parallel programming

## **Task-level Parallelism**

### [**Task Creation and Termination (Async, Finish)**](https://github.com/habanero-rice/pcdp)

```
finish {
  async S1; // asynchronously compute sum of the lower half of the array
  S2;       // compute sum of the upper half of the array in parallel with S1
}
S3; // combine the two partial sums after both S1 and S2 have finished
```

We learned the *async* notation for task creation: “*async ⟨stmt1⟩*”, causes the parent task (*i.e.*, the task executing the async statement) to create a new child task to execute the body of the *async, ⟨stmt1⟩, asynchronously* (*i.e.*, before, after, or in parallel) with the remainder of the parent task. We also learned the *finish* notation for task termination: “*finish ⟨stmt2⟩*” causes the parent task to execute *⟨stmt2⟩*, and then wait until *⟨stmt2⟩* and all *async* tasks created within *⟨stmt2⟩* have completed. Async and finish constructs may be arbitrarily nested.

#### [**Fork/Join**](https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)

:bulb: [Great article about Fork/Join](https://www.pluralsight.com/guides/introduction-to-the-fork-join-framework)

* Applying a *divide and conquer* principle, the framework recursively divides the task into smaller subtasks until a given threshold is reached. This is the *fork* part.

  Then, the subtasks are processed independently and if they return a result, all the results are recursively combined into a single result. This is the *join* part.
* To execution the tasks in parallel, the framework uses a pool of threads, with a number of threads equal to the number of processors available to the Java Virtual Machine (JVM) by default.
* Each thread has its own double-ended queue (deque) to store the tasks that will execute.

  A [deque](https://en.wikipedia.org/wiki/Double-ended_queue) is a type of queue that supports adding or removing elements from either the front (head) or the back (tail). This allows two things:

  * A thread can execute only one task at a time (the task at the head of its deque).
  * A **work-stealing algorithm** s implemented to balance the thread’s workload.

  With the work-stealing algorithm, threads that run out of tasks to process can steal tasks from other threads that are still busy (by removing tasks from the tail of their deque).

#### Computation Graph

![](/files/Un4YxGOkVcmsHeILWEzh)

![](/files/akGLRq02WXW0vl50q0bA)

![](/files/zasZFgbXsXD15wHZlemE)

A simple observation made by Gene Amdahl in 1967: if *q ≤* 1 is the fraction of *WORK* in a parallel program that must be executed *sequentially*, then the best speedup that can be obtained for that program for any number of processors, *P* , is *Speedup(P)≤* &#x31;*/q*.

```java
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public final class ReciprocalArraySum {

    public static final int SEQUENTIAL_THRESHOLD = 100000;
    
    private static class ReciprocalArraySumTask extends RecursiveAction {
        private final int startIndexInclusive;
        private final int endIndexExclusive;
        private final double[] input;
        private double value;

        ReciprocalArraySumTask(final int setStartIndexInclusive,
                final int setEndIndexExclusive, final double[] setInput) {
            this.startIndexInclusive = setStartIndexInclusive;
            this.endIndexExclusive = setEndIndexExclusive;
            this.input = setInput;
        }

        public double getValue() {
            return value;
        }

        @Override
        protected void compute() {
            if (endIndexExclusive - startIndexInclusive <= SEQUENTIAL_THRESHOLD) {
                for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
                    value += 1 / input[i];
                }
            } else {
                ReciprocalArraySumTask left = new ReciprocalArraySumTask(
                        startIndexInclusive,
                        (endIndexExclusive+startIndexInclusive)/2,
                        input);
                ReciprocalArraySumTask right = new ReciprocalArraySumTask(
                        (endIndexExclusive+startIndexInclusive)/2,
                        endIndexExclusive,
                        input);
                left.fork();
                right.compute();
                left.join();
                value = left.getValue() + right.getValue();
            }
        }
    }
    
    private static double parArraySum(final double[] input) {
        ReciprocalArraySumTask t = new ReciprocalArraySumTask(0, input.length, input);
        ForkJoinPool.commonPool().invoke(t);
        return t.getValue();
    }
}
```

| RecursiveTask                           | RecursiveAction                           |
| --------------------------------------- | ----------------------------------------- |
| V compute();                            | void compute();                           |
| join() waits until task to be completed | join() waits until action to be completed |

**Memoization**

![](/files/B67kCz6O99eW3ThRDqTG)

the basic idea of “memoization”, which is to remember results of function calls *f* (*x*) as follows:

1. Create a data structure that stores the set {(*x*\_11&#x200B;*, y*\_11​ = *f* (*x*\_11​))*,* (*x*\_22&#x200B;*,y*\_22​ = *f* (*x*\_22​))*, . . .*}for each call *f* (*x*\_ii​) that returns *y*\_ii​.
2. Perform look ups in that data structure when processing calls of the form *f* (*x'*) when *x'*&#x65;quals one of the *x*\_ii​ inputs for which *f* (*x*\_ii​) has already been computed.

Memoization can be especially helpful for algorithms based on [dynamic programming](https://en.wikipedia.org/wiki/Dynamic_programming). In the lecture, we used [Pascal’s triangle](https://en.wikipedia.org/wiki/Pascal%27s_triangle) as an illustrative example to motivate memoization.

The memoization pattern lends itself easily to parallelisation using futures by modifying the memoized data structure to store {(*x*\_11​, *y*\_11​ = *future*(*f* (*x*\_11​))), (*x*\_22​, *y*\_22​ = *future*(*f* (*x*\_22​))), . . .}. The lookup operation can then be replaced by a *get()* operation on the future value, if a future has already been created for the result of a given input.

## Determinism

* Functional
  * same input -> same output
* Structural
  * same input -> same computational graph

The presence of data races often leads to functional and/or structural nondeterminism because a parallel program with data races may exhibit different behaviors for the same input, depending on the relative scheduling and timing of memory accesses involved in a data race.

## **Parallel Loops**

The most general way is to think of each iteration of a parallel loop as an *async* task, with a *finish* construct encompassing all iterations. This approach can support general cases such as parallelization of the following pointer-chasing while loop (in pseudocode):

`finish {for (p = head; p != null ; p = p.next) async compute(p);}`

However, further efficiencies can be gained by paying attention to *counted-for* loops for which the number of iterations is known on entry to the loop (before the loop executes its first iteration). We then learned the *forall* notation for expressing parallel counted-for loops, such as in the following vector addition statement (in pseudocode):

`forall (i : [0:n-1]) a[i] = b[i] + c[i]`

We also discussed the fact that Java streams can be an elegant way of specifying parallel loop computations that produce a single output array, e.g., by rewriting the vector addition statement as follows:

`a = IntStream.rangeClosed(0, N-1).parallel().toArray(i -> b[i] + c[i]);`

In summary, streams are a convenient notation for parallel loops with at most one output array, but the *forall* notation is more convenient for loops that create/update multiple output arrays, as is the case in many scientific computations. For generality, we will use the *forall* notation for parallel loops in the remainder of this module.

**Iteration Grouping: Chunking of Parallel Loops**

In this lecture, we revisited the vector addition example:

`forall (i : [0:n-1]) a[i] = b[i] + c[i]`

We observed that this approach creates *n* tasks, one per *forall* iteration, which is wasteful when (as is common in practice) *n* is much larger than the number of available processor cores.

To address this problem, we learned a common tactic used in practice that is referred to as *loop chunking* or *iteration grouping*, and focuses on reducing the number of tasks created to be closer to the number of processor cores, so as to reduce the overhead of parallel execution:

With iteration grouping/chunking, the parallel vector addition example above can be rewritten as follows:

`forall (g:[0:ng-1])`  \
&#x20;      `for (i : mygroup(g, ng, [0:n-1])) a[i] = b[i] + c[i]`

Note that we have reduced the degree of parallelism from *n* to the number of groups, *ng*, which now equals the number of iterations/tasks in the *forall* construct.

There are two well known approaches for iteration grouping: ***block*** and ***cyclic***. The former approach (*block*) maps consecutive iterations to the same group, whereas the latter approach (*cyclic*) maps iterations in the same congruence class (mod *ng*) to the same group. With these concepts, you should now have a better understanding of how to execute *forall* loops in practice with lower overhead.

## **Split-phase Barriers with Java** [**Phasers**](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Phaser.html)

In this lecture, we examined a variant of the *barrier* example that we studied earlier:

```
forall (i : [0:n-1]) {   
    print HELLO, i;  
    myId = lookup(i); // convert int to a string   
    print BYE, myId;
}
```

We learned about Java’s Phaser class, and that the operation **ph.arriveAndAwaitAdvance()**, can be used to implement a barrier through phaser object **ph**. We also observed that there are two possible positions for inserting a barrier between the two print statements above — before or after the call to **lookup(i)**. However, upon closer examination, we can see that the call to **lookup(i)** is local to iteration i and that there is no specific need to either complete it before the barrier or to complete it after the barrier. In fact, the call to **lookup(i)** can be performed in parallel with the barrier. To facilitate this *split-phase barrier* (also known as a *fuzzy barrier*) we use two separate APIs from Java Phaser class — **ph.arrive()** and p&#x68;**.awaitAdvance().** Together these two APIs form a barrier, but we now have the freedom to insert a computation such as **lookup(i)** between the two calls as follows:

```
// initialize phaser ph	for use by n tasks ("parties") 
Phaser ph = new Phaser(n);
// Create forall loop with n iterations that operate on ph 
forall (i : [0:n-1]) {
  print HELLO, i;
  int phase = ph.arrive();
  
  myId = lookup(i); // convert int to a string

  ph.awaitAdvance(phase);
  print BYE, myId;
}
```

Doing so enables the barrier processing to occur in parallel with the call to {\tt lookup(i)}lookup(i), which was our desired outcome.

<br>

![](/files/r7zVeuuIQeC7fNk3IXBU)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://amartyushov.gitbook.io/tech/programming-languages/java/coursera-parallel-programming.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
