Overview
The java.util.concurrent package provides tools for creating concurrent applications. The java.util.concurrent contains way too many features, and some of the most useful utilities from this package like:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
Executor
Executor is an interface that represents an object that executes provided or submitted tasks.
This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling,etc.
An Executor is used normally instead of explicitly creating threads. For example, rather than invoking
new Thread(new RunnableTask()).start();
for each of a set of tasks, you might use:
Executor executor = anExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
However, the Executor
interface does not strictly require that execution be asynchronous. In the simplest case, an executor can run the submitted task immediately in the caller's thread:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
More typically, tasks are executed in some thread other than the caller’s thread. The executor below spawns a new thread for each task.
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
Many Executor
implementations impose some sort of limitation on how and when tasks are scheduled. The executor below serializes the submission of tasks to a second executor, illustrating a composite executor.
class SerialExecutor implements Executor {
final Queue tasks = new ArrayDeque<>();
final Executor executor;
Runnable active; SerialExecutor(Executor executor) {
this.executor = executor;
} public synchronized void execute(Runnable r) {
tasks.add(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
The Executor
implementations provided in this package implement ExecutorService
, which is a more extensive interface. The ThreadPoolExecutor
class provides an extensible thread pool implementation. The Executors
class provides convenient factory methods for these Executors.
Memory consistency effects: Actions in a thread prior to submitting a Runnable
object to an Executor
happen-before its execution begins, perhaps in another thread.
ExecutorService
An ExecutorService
is a complete solution for asynchronous processing. It manages an in-memory queue and schedules submitted tasks based on thread availability.
To use ExecutorService, we need to create one Runnable class.
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
Now we can create the ExecutorService instance and assign this task. At the time of creation, we need to specify the thread-pool size.
ExecutorService executor = Executors.newFixedThreadPool(10);
If we want to create a single-threaded ExecutorService
instance, we can use newSingleThreadExecutor(ThreadFactory threadFactory)
to create the instance.
Once the executor is created, we can use it to submit the task:
public void execute() {
executor.submit(new Task());
}
We can also create the Runnable instance while submitting the task.
executor.submit(() -> {
new Task();
});
Method submit
extends base method Executor#execute(Runnable)
by creating and returning a Future
that can be used to cancel execution and/or wait for completion.
Methods invokeAny
and invokeAll
perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. ExecutorCompletionService
can be used to write customized variants of these methods.)
It also provides methods to manage termination and methods that can produce a Future
for tracking progress of one or more asynchronous tasks.
An ExecutorService
can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService
. The shutdown
method will allow previously submitted tasks to execute before terminating, while the shutdownNow
method prevents waiting tasks from starting and attempts to stop currently executing tasks. Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService
should be shut down to allow reclamation of its resources.
The Executors
class provides factory methods for the executor services provided in this package.
Usage Examples
Here is a sketch of a network service in which threads in a thread pool service incoming requests. It uses the preconfigured Executors#newFixedThreadPool
factory method:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
The following method shuts down an ExecutorService
in two phases, first by calling shutdown
to reject incoming tasks, and then calling shutdownNow
, if necessary, to cancel any lingering tasks:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
Memory consistency effects: Actions in a thread prior to the submission of a Runnable
or Callable
task to an ExecutorService
happen-before any actions taken by that task, which in turn happen-before the result is retrieved via Future.get()
.
ScheduledExecutorService
ScheduledExecutorService
is an ExecutorService
that can schedule commands to run after a given delay, or to execute periodically.
The schedule
methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate
and scheduleWithFixedDelay
methods create and execute tasks that run periodically until cancelled.
Commands submitted using the Executor#execute(Runnable)
and ExecutorService
submit
methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedule
methods, and are treated as requests for immediate execution.
All schedule
methods accept relative delays and periods as arguments, not absolute times or dates. It is a simple matter to transform an absolute time represented as a java.util.Date
to the required form. For example, to schedule at a certain future date
, you can use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)
. Beware however that expiration of a relative delay need not coincide with the current Date
at which the task is enabled due to network time synchronization protocols, clock drift, or other factors.
The Executors
class provides convenient factory methods for the ScheduledExecutorService
implementations provided in this package.
Usage Example
We can use both Runnable and Callable interface to define the task.
public void execute() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Future future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
Here is a class with a method that sets up a ScheduledExecutorService
to beep every ten seconds for an hour:
import static java.util.concurrent.TimeUnit.*;class BeeperControl {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
Runnable beeper = () -> System.out.println("beep");
ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
Runnable canceller = () -> beeperHandle.cancel(false);
scheduler.schedule(canceller, 1, HOURS);
}
}
ScheduledExecutorService
can also schedule the task after some given fixed delay:
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
Here, the scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)
method creates and executes a periodic action that is invoked firstly after the provided initial delay, and subsequently with the given period until the service instance shutdowns.
The scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.
Future
Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, and to get the computed result, etc.
The result can only be retreived using the method get
when the computation has completed, blocking if necessary until its ready.
Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled.
We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
What’s more, the cancel(boolean mayInterruptIfRunning)
API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning
is true
, the thread executing the task will be terminated instantly. Otherwise, in-progress
tasks will be allowed to complete.
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException
is thrown:
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
If you would like to use a Future
for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?>
and return null
as a result of the underlying task.
FutureTask
The FutureTask
is a cancellable computation. This class provides a base implementation of Future
that implements Runnable
, and so may be executed by an Executor
.
FutureTask
contains methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the compuation.
For example, the above construction with submit
could be replaced by:
FutureTask future = new FutureTask<>(task);
executor.execute(future);
The result can only be retrieved when the computation has completed. The get
method will block if the computation has not yet completed. Once the computation has completed, the computation can not be restarted or cancelled (unless the computation is invoked using runAndReset
)
Memory consistency effects: Actions taken by the asynchronous computation happen-before actions following the corresponding Future.get()
in another thread.
CountDownLatch
CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operations being performed in other threads completes.
A CountDownLatch
is initialized with a given count. The await
methods block until the current count reaches zero due to invocations of the countDown
method, after which all waiting threads are released and any subsequent invocations of await
return immediately.
This is a one-shot phenomenon — the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier
.
A CountDownLatch
is a versatile synchronization tool and can be used for a number of purposes.
A CountDownLatch
initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await
wait at the gate until it is opened by a thread invoking countDown
. A CountDownLatch
initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
A useful property of a CountDownLatch
is that it doesn't require that threads calling countDown
wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await
until all threads could pass.
Sample usage: Here is a pair of classes in which a group of worker threads use two countdown latches:
- The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
- The second is a completion signal that allows the driver to wait until all workers have completed.
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
} void doWork() { ... }
}
Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await. (When threads must repeatedly count down in this way, instead use a CyclicBarrier
.)
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ... for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish
}
}class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown()
happen-before actions following a successful return from a corresponding await()
in another thread.
CyclicBarrier
CyclicBarrier
works almost the same as CountDownLatch
except that we can reuse it. Unlike CountDownLatch
, it allows multiple threads to wait for each other using await()
method (known as barrier condition) before invoking the final task.
We need to create a Runnable task instance to initiate the barrier condition:
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() + " is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() + " is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Now we can invoke some threads to race for the barrier condition:
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
Here, the isBroken()
method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.
Semaphore
The Semaphore
is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.
If a permit is not available (via tryAcquire()
), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.
Once the executing thread releases the critical section, again the permit counter increases (done by release()
method).
We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit)
method.
We can also check the number of available permits or the number of threads waiting to acquire the semaphore.
Following code snippet can be used to implement a semaphore:
static Semaphore semaphore = new Semaphore(10);public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}
We can implement a Mutex like data-structure using Semaphore.
In the subsequent post, we can learn about ThreadFactory, BlockingQueue, DelayQueue, Locks, Phaser, etc.