Java Concurrency

Srini
11 min readAug 28, 2021

Overview

The java.util.concurrent package provides tools for creating concurrent applications. The java.util.concurrent contains way too many features, and some of the most useful utilities from this package like:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

Executor

Executor is an interface that represents an object that executes provided or submitted tasks.

This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling,etc.

An Executor is used normally instead of explicitly creating threads. For example, rather than invoking

new Thread(new RunnableTask()).start();

for each of a set of tasks, you might use:

Executor executor = anExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

However, the Executor interface does not strictly require that execution be asynchronous. In the simplest case, an executor can run the submitted task immediately in the caller's thread:

class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}

More typically, tasks are executed in some thread other than the caller’s thread. The executor below spawns a new thread for each task.

class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}

Many Executor implementations impose some sort of limitation on how and when tasks are scheduled. The executor below serializes the submission of tasks to a second executor, illustrating a composite executor.

class SerialExecutor implements Executor {
final Queue tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(Runnable r) {
tasks.add(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}

The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executorsclass provides convenient factory methods for these Executors.

Memory consistency effects: Actions in a thread prior to submitting a Runnable object to an Executor happen-before its execution begins, perhaps in another thread.

ExecutorService

An ExecutorService is a complete solution for asynchronous processing. It manages an in-memory queue and schedules submitted tasks based on thread availability.

To use ExecutorService, we need to create one Runnable class.

public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

Now we can create the ExecutorService instance and assign this task. At the time of creation, we need to specify the thread-pool size.

ExecutorService executor = Executors.newFixedThreadPool(10);

If we want to create a single-threaded ExecutorService instance, we can use newSingleThreadExecutor(ThreadFactory threadFactory) to create the instance.

Once the executor is created, we can use it to submit the task:

public void execute() { 
executor.submit(new Task());
}

We can also create the Runnable instance while submitting the task.

executor.submit(() -> {
new Task();
});

Method submit extends base method Executor#execute(Runnable) by creating and returning a Future that can be used to cancel execution and/or wait for completion.

Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. ExecutorCompletionService can be used to write customized variants of these methods.)

It also provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService. The shutdown method will allow previously submitted tasks to execute before terminating, while the shutdownNow method prevents waiting tasks from starting and attempts to stop currently executing tasks. Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.

The Executors class provides factory methods for the executor services provided in this package.

Usage Examples

Here is a sketch of a network service in which threads in a thread pool service incoming requests. It uses the preconfigured Executors#newFixedThreadPool factory method:

class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize) throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}

public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}

The following method shuts down an ExecutorService in two phases, first by calling shutdown to reject incoming tasks, and then calling shutdownNow, if necessary, to cancel any lingering tasks:

void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

Memory consistency effects: Actions in a thread prior to the submission of a Runnable or Callable task to an ExecutorService happen-before any actions taken by that task, which in turn happen-before the result is retrieved via Future.get().

ScheduledExecutorService

ScheduledExecutorService is an ExecutorService that can schedule commands to run after a given delay, or to execute periodically.

The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled.

Commands submitted using the Executor#execute(Runnable) and ExecutorService submit methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedulemethods, and are treated as requests for immediate execution.

All schedule methods accept relative delays and periods as arguments, not absolute times or dates. It is a simple matter to transform an absolute time represented as a java.util.Date to the required form. For example, to schedule at a certain future date, you can use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS). Beware however that expiration of a relative delay need not coincide with the current Date at which the task is enabled due to network time synchronization protocols, clock drift, or other factors.

The Executors class provides convenient factory methods for the ScheduledExecutorService implementations provided in this package.

Usage Example

We can use both Runnable and Callable interface to define the task.

public void execute() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Future future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);

ScheduledFuture scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);

executorService.shutdown();
}

Here is a class with a method that sets up a ScheduledExecutorService to beep every ten seconds for an hour:

import static java.util.concurrent.TimeUnit.*;class BeeperControl {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

public void beepForAnHour() {
Runnable beeper = () -> System.out.println("beep");
ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
Runnable canceller = () -> beeperHandle.cancel(false);
scheduler.schedule(canceller, 1, HOURS);
}
}

ScheduledExecutorService can also schedule the task after some given fixed delay:

executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

Here, the scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)method creates and executes a periodic action that is invoked firstly after the provided initial delay, and subsequently with the given period until the service instance shutdowns.

The scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

Future

Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, and to get the computed result, etc.

The result can only be retreived using the method get when the computation has completed, blocking if necessary until its ready.

Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled.

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

What’s more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly. Otherwise, in-progress tasks will be allowed to complete.

public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);

Future future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

FutureTask

The FutureTask is a cancellable computation. This class provides a base implementation of Future that implements Runnable, and so may be executed by an Executor.

FutureTask contains methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the compuation.

For example, the above construction with submit could be replaced by:

FutureTask future = new FutureTask<>(task);
executor.execute(future);

The result can only be retrieved when the computation has completed. The get method will block if the computation has not yet completed. Once the computation has completed, the computation can not be restarted or cancelled (unless the computation is invoked using runAndReset)

Memory consistency effects: Actions taken by the asynchronous computation happen-before actions following the corresponding Future.get() in another thread.

CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately.

This is a one-shot phenomenon — the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes.

A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking awaitwait at the gate until it is opened by a thread invoking countDown. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.

Sample usage: Here is a pair of classes in which a group of worker threads use two countdown latches:

  • The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
  • The second is a completion signal that allows the driver to wait until all workers have completed.
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}

Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await. (When threads must repeatedly count down in this way, instead use a CyclicBarrier.)

class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}

Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.

CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method (known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable {
private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() + " is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() + " is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

Now we can invoke some threads to race for the barrier condition:

public void start() {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});

Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release()method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10);public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength());

if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}

We can implement a Mutex like data-structure using Semaphore.

In the subsequent post, we can learn about ThreadFactory, BlockingQueue, DelayQueue, Locks, Phaser, etc.

--

--