Synchronizers


A synchronizer is any object that coordinates the control flow of threads based on its state. A simple example of control flow is forcing all the threads to start at the same time.

BlockingQueue, which is a widely used concurrent and thread-safe data structure in Java, is actually a synchronizer. Blocking queues are unique among the collections classes: not only do they act as containers for objects, but they can also coordinate the control flow of producer and consumer threads because take and put block until the queue enters the desired state (not empty or not full). If BlockingQueue is empty, any attempt to take an item from it will be blocked till it has at least one item in it. On the other hand, if a BlockingQueue is full, any attempt to put any more item will be blocked till at least one of the item is removed from the BlockingQueue making room for more item(s).

Other types of synchronizers include semaphores, future tasks, barriers, and latches. There are a number of synchronizer classes in the platform library; if these do not meet your needs, you can also create your own.

In this chapter, we would talk about Latches and Barriers as two are two very useful Synchronizers.

Latch:


A latch is a synchronizer that can delay the progress of threads until it reaches its terminal state. A latch acts as a gate: until the latch reaches the terminal state the gate is closed and no thread can pass, and in the terminal state the gate opens, allowing all threads to pass. Once the latch reaches the terminal state, it cannot change state again, so it remains open forever. Latches can be used to ensure that certain activities do not proceed until other one-time activities complete, such as:
  • Ensuring that a computation does not proceed until resources it needs have been initialized. A simple binary (two-state) latch could be used to indicate “Resource R has been initialized”, and any activity that requires R would wait first on this latch.
  • Ensuring that a service does not start until other services on which it depends have started. Each service would have an associated binary latch; starting service S would involve first waiting on the latches for other services on which S depends, and then releasing the S latch after startup completes so any services that depend on S can then proceed.
  • Waiting until all the parties involved in an activity, for instance the players in a multi-player game, are ready to proceed. In this case, the latch reaches the terminal state after all the players are ready.

CountDownLatch is a flexible latch implementation that can be used in any of these situations; it allows one or more threads to wait for a set of events to occur. The latch state consists of a counter initialized to a positive number, representing the number of events to wait for.
  • The countDown method decrements the counter, indicating that an event has occurred.
  • The await methods wait for the counter to reach zero , which happens when all the events have occurred.
    If the counter is nonzero on entry, await blocks until the counter reaches zero, the waiting thread is interrupted, or the wait times out.

The code below illustrates two common uses for latches. It creates a number of threads that run a given task concurrently. It uses two latches, a “starting gate” and an “ending gate”. The starting gate is initialized with a count of one; the ending gate is initialized with a count equal to the number of worker threads.
  • The first thing each worker thread does is wait on the starting gate . This ensures that none of them starts working until they all are ready to start.
  • The last thing each thread does is count down on the ending gate. This allows the master thread to wait efficiently until the last of the worker threads has finished, so it can calculate the elapsed time .


Why did we bother with the latches in TestHarness instead of just starting the threads immediately after they are created? Presumably, we wanted to measure how long it takes to run a task n times concurrently. If we simply created and started the threads, the threads started earlier would have a “head start” on the later threads, and the degree of contention would vary over time as the number of active threads increased or decreased. Using a starting gate allows the master thread to release all the worker threads at once, and the ending gate allows the master thread to wait for the last thread to finish rather than waiting sequentially for each thread to finish.


public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
            public void run() {
                try {
                    startGate.await();
                    try {
                    task.run();
                } finally {
                    endGate.countDown();
                }
                } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}



Barrier:


We have seen how latches can facilitate starting a group of related activities or waiting for a group of related activities to complete. Latches are single-use objects: once a latch enters the terminal state, it cannot be reset .

Barriers are similar to latches in that they block a group of threads until some event has occurred .

The key difference between latches and barriers are:
  • With a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events; barriers are for waiting for other threads. A barrier implements the protocol some families use to rendezvous during a day at the mall: “Everyone meet at Starbucks at 6:00 PM; once you get there, stay there until everyone shows up, and then we’ll figure out what we’re doing next.”


CyclicBarrier allows a fixed number of parties to rendezvous repeatedly at a barrier point and is useful in parallel iterative algorithms that break down a problem into a fixed number of independent subproblems.
  • Threads call await when they reach the barrier point, and await blocks until all the threads have reached the barrier point. If all threads meet at the barrier point, the barrier has been successfully passed, in which case all threads are released and the barrier is reset so it can be used again.

    If a call to await times out or a thread blocked in await is interrupted, then the barrier is considered broken and all outstanding calls to await terminate with BrokenBarrierException.

    If the barrier is successfully passed, await returns a unique arrival index for each thread, which can be used to “elect” a leader that takes some special action in the next iteration.

    CyclicBarrier also lets you pass a barrier action to the constructor; this is a Runnable that is executed (in one of the subtask threads) when the barrier is successfully passed but before the blocked threads are released.

Barriers are often used in simulations, where the work to calculate one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step. For example, in n-body particle simulations, each step calculates an update to the position of each particle based on the locations and other attributes of the other particles. Waiting on a barrier between each update ensures that all updates for step k have completed before moving on to step (k + 1).

The chapter How to Test Concurrent Program ? shows real-world application of barriers: how barriers can be of immense importance when it comes to testing for safety for concurrent programs. Please give it a read before looking at the example given below.

The code below demonstrates using a barrier to compute a cellular automata simulation, such as Conway’s Life game. When parallelizing a simulation, it is generally impractical to assign a separate thread to each element (in the case of Life, a cell); this would require too many threads, and the overhead of coordinating them would dwarf the computation. Instead, it makes sense to partition the problem into a number of subparts, let each thread solve a subpart, and then merge the results. CellularAutomata partitions the board into Ncpu parts, where Ncpu is the number of CPUs available, and assigns each part to a thread. At each step, the worker threads calculate new values for all the cells in their part of the board. When all worker threads have reached the barrier, the barrier action commits the new values to the data model. After the barrier action runs, the worker threads are released to compute the next step of the calculation, which includes consulting an isDone method to determine whether further iterations are required.


public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                                        new Runnable() {
                                        public void run() {
                                        mainBoard.commitNewValues();
                                        }});

        this.workers = new Worker[count];

        for (int i = 0; i < count; i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    }

    private class Worker implements Runnable {
        private final Board board;
        public Worker(Board board) { this.board = board; }

        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY(); y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                 }

                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }
    public void start() {
        for (int i = 0; i < workers.length; i++) {
            new Thread(workers[i]).start();
        }
        mainBoard.waitForConvergence();
    }
}



Another form of barrier is Exchanger, a two-party barrier in which the parties exchange data at the barrier point. Exchangers are useful when the parties perform asymmetric activities, for example when one thread fills a buffer with data and the other thread consumes the data from the buffer; these threads could use an Exchanger to meet and exchange a full buffer for an empty one. When two threads exchange objects via an Exchanger, the exchange constitutes a safe publication of both objects to the other party. The timing of the exchange depends on the responsiveness requirements of the application. The simplest approach is that the filling task exchanges when the buffer is full, and the emptying task exchanges when the buffer is empty; this minimizes the number of exchanges but can delay processing of some data if the arrival rate of new data is unpredictable. Another approach would be that the filler exchanges when the buffer is full, but also when the buffer is partially filled and a certain amount of time has elapsed.

Must Read:



Recommended Read:





Instructor:





Help Your Friends save 25% on our products

wave