Monday 13 February 2017

Introduction to AbstractQueuedSynchronizer in java

For managing multi-threading applications, java follows monitor construct. According to Monitor construct if a thread is executing a critical section while holding mutex (or intrinsic lock) of an object, no other thread can execute this critical section with lock of same object. synchronized method and synchronized block are simple implementation of this technique.Many concepts of Semaphore and latches are implemented in java over this concept. All concurrent classes introduced in java 1.5 like ReentrantLock, ThreadPoolExecutor and many others are based on this technique. There are times when we require either to wait a thread or to start running a thread, based on the state of an object. Thread can sleep/wait/run depending on some conditions. These types of classes are known as state dependent classes.

Java 1.5 introduced a framework for creating and using these types of state dependent classes called as AbstractQueuedSynchronizer, also known as AQS. Many classes like ReentrantLock, ThreadPoolExecutor, Semaphore, CountdownLatch, ReentrantReadWriteLock uses AQS. AQS in itself uses LockSupport class which helps to park and unpark threads for a specified monitor. LockSupport internally uses native API (sun.misc.Unsafe), for using Monitor pattern.


This was all introduction, but lets raise some questions here:

(Q)Why this framework was needed ?
(Q)What were the problems in java that raised to create this framework ?

Lets try to answer above questions.

Below is a pseudo code for blocking/waiting/executing state-dependent actions
       
acquire lock on object state
while (precondition does not hold)
{
  release lock
  wait until precondition might hold
  optionally fail if interrupted or timeout expires
  reacquire lock
}
perform action of CRITICAL SECTION
release lock

Lets take an example of a fixed size queue/buffer in a producer consumer problem for above pseudo code.


  • Producer will push the data in queue and wait if it is full
  • Consumer will pull the data from queue and wait if it is empty

Note that queue is empty or full are preconditions for the threads (see line 2).
It is the state of queue object, that every thread need to check before doing any operation in
critical section (see line 9).One single object of queue will be shared between the threads of producer and consumer.
Also note that , precondition must be checked only when threads are holding the lock,
i.e. monitor of the queue object (see line 1 and 7).

Lets try to solve above problem statement without using any concurrent API.

Solution 1 :
       
public abstract class BaseBoundedBuffer<V> {
    private final V[] buf;
    private int tail;
    private int head;
    private int count;
    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }
    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }
    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }
    public synchronized final boolean isFull() {
        return count == buf.length;
    }
    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}
BaseBoundedBuffer is base class for creating queue of our producer consumer problem.
Our various solutions will be using this base abstract class.
       
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public GrumpyBoundedBuffer(int size) { super(size); }
    public synchronized void put(V v) throws BufferFullException {
        if (isFull())
            throw new BufferFullException();
        doPut(v);
    }
    public synchronized V take() throws BufferEmptyException {
       if (isEmpty())
           throw new BufferEmptyException();
       return doTake();
    }
}
GrumpyBoundedBuffer is first solution to our problem.It's a kind of short tempered solution.

  • if pushing data when queue is full, it will throw an exception
  • if pulling data when queue is empty, it will throw an exception
This might be a good solution for some cases but not always.
There might be some requirements when we want to sleep current thread and allow other threads to execute.

Solution 2:

sleep thread if precondition is failed.
       
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public SleepyBoundedBuffer(int size) { super(size); }
 
    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_DURATION);
        }
    }
 
    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty())
                    return doTake();
            }
            Thread.sleep(SLEEP_DURATION);
        }
    }
}

Problems with this solution are :

  1. CPU cycles will waste for a shorter sleep duration.
  2. Performance of program will downgrade for a longer sleep duration.

Solution 3:
Using wait() and notifyAll()
       
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer(int size) { super(size); }
 
    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        doPut(v);
        notifyAll();
    }
 
    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }
}


Problems with this solution are :


  1. notifyAll() method will notify all waiting threads even if they are waiting for some other preconditions, like in BoundedBuffer we have two totally different exhaustive preconditions of full and empty buffer.After notifyAll() method is executed, every waiting thread will be given monitor and will be allowed to execute. Only few may pass the precondition and only one will get monitor. This will increase context switching and hence a downgrade in performance.
  2. If we would be adding more preconditions to this program, code will become more messy.
  3. In case, if these type of classes are allowed to inherit, then proper documentation should be done for handling critical section and checking preconditions.

Solution 4:

Use notify() instead of notifyAll()

notify() can only be used in scenarios where both of the below two conditions meet:


  1. Uniform waiters requirement
  2. One-in-One-out requirement
Uniform waiters requirement means that all waiting threads are "same","Uniform". They are waiting for the same condition to become true.
In our producer consumer buffer problem, there may be two types of waiting threads,


  • wait to push into the queue, until there is some space
  • wait to pull from the queue, until there is any element

Hence we cannot apply notify() method in our problem.

One-in-One-out requirement means a notification on the condition allows at most one thread to proceed.

This requirement is fulfilled here.

Please note that:

wait() notify() notifyAll() must be called on the object whose monitor is held by current thread.

You can see that we have problems with the solutions of not using concurrent API.


Here comes AbstractQueuedSynchronizer to the rescue.

AQS provides api for releasing and acquiring monitor of an object. It provides empty methods which can be implemented by clients and hence clients can create their own state dependent classes or Synchronizers. It has also implemented ConditionObject which can be used to create more than one condition predicates for a single monitor object.As we have two condition predicates in BoundedBuffer. Each condition predicate has a queue of waiting ConditionObject. By this we can solve the problem of Uniform waiters requirement.

Lets try to solve the producer consumer problem with one of the client implementation of AQS, 
that is ReentrantLock.
       
public class ConditionBoundedBuffer<T> {
    protected final Lock lock = new ReentrantLock();
    // CONDITION PREDICATE: notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // CONDITION PREDICATE: notEmpty (count > 0)
    private final Condition notEmpty = lock.newCondition();
 
    private final T[] items = (T[]) new Object[BUFFER_SIZE];
    private int tail, head, count;
 
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
 
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

ConditionBoundedBuffer creates an object of ReentrantLock.

One lock can have one or more than one Condition object.
One Condition object represents one condition queue, i.e. queue of threads waiting for that condition to become true.
Lock interface provides : lock() and unlock() methods.
Condition interface provides : await() signal() and signalAll() methods.

The advantage of ConditionBoundedBuffer are :


  1. Each precondition(condition predicate) has its own condition queue, means await() and signal() will be called on wait sets of same Condition object. Hence reducing the no of context switching and reducing wastage of cpu cycles.
  2. It is easy to understand the logic of critical section and preconditions even without documentation.

This was all about producer consumer problem. ConditionBoundedBuffer is using ReentrantLock and ConditionObject classes. Both these classes are backed up by AbstractQueuedSynchronizer. So lets try to understand some basics of AQS framework:


Clients of AQS are:


  • ReentrantLock
  • ThreadPoolExecutor
  • Semaphore
  • CountDownLatch
and many more..

Below are public methods of AQS that has framework implementation and can be called by client:

       
public final void acquire(int arg)
public final boolean release(int arg)
public final void acquireShared(int arg)
public final boolean releaseShared(int arg)
public final void acquireInterruptibly(int arg) throws InterruptedException
public final void acquireSharedInterruptibly(int arg) throws InterruptedException
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException

Below are some methods of AQS that should be implemented by clients of AQS:

       
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)

A state dependent class must have following features :

  1. They should define the meaning of acquire and release
  2. They should define state of a condition
  3. They can hold some extra info like holding running thread itself
  4. They should define type of acquisition : Exclusive or Shared
Exclusive means only one thread is allowed to execute in critical section. Shared means one or more than one threads are allowed to execute in critical section as in Semaphore. State of a synchronizer decides the precondition for critical section.

Probably you will never use AQS directly, but having a little understanding of it will make you clear about how standard synchronizer work. The basic operations that an AQS-based synchronizer performs are some variants of acquire and release.


acquire and release :

Acquisition is the state-dependent operation and can always block.
With a ReentraLock or Semaphore, the meaning of acquire is straightforward :
acquire the lock or a permit—and the caller may have to wait
until the synchronizer is in a state where that can happen.
For CountdownLatch, it means wait until the latch has reached its terminal state.
Release is not a blocking operation.A release may allow threads blocked in acquire to proceed.

State :

For a class to be state-dependent, it must have some state. AQS takes on the task of managing some of the state for the synchronizer class : it manages a single integer of state information that can be manipulated through the protected getState, setState, and compareAndSetState methods.
For example: ReentraLock uses state as the count of times the owning thread has acquired the lock. Semaphore uses state as number of permits remaining.

Exclusive or Shared

Acquisition might be exclusive (ReentrantLock), or nonexclusive/Shared (Semaphore and CountDownLatch).

Important Note: A synchronizer supporting exclusive acquisition (like ReentrantLock) should implement the protected methods  tryAcquire, tryRelease, and isHeldExclusively. Synchronizer supporting shared acquisition (like Semaphore , CountDownLatch) should implement tryAcquireShared and tryReleaseShared. The acquire, acquireShared, release, and releaseShared methods in AQS call the try forms of these methods in the synchronizer subclass to determine if the operation can proceed.


Lets analyse data structure for AQS :

       
class AQS  {
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
 
    static final class Node {
        static final int CANCELLED =  1;
        //Non-negative values mean that a node doesn't need to signal
        static final int SIGNAL    = -1;
        static final int PROPAGATE = -3;
        //Only used for Condition queues
        static final int CONDITION = -2;
  
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
  
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        //condition queues are accessed only when holding in exclusive mode
        //In case of exclusive mode, threads for condition queue is saved
        //In case of shared mode, empty Node object is saved.
        Node nextWaiter;
    }

    public class ConditionObject implements Condition {

        private transient Node firstWaiter;
        private transient Node lastWaiter;
        //nextWaiter is used to hold next node of condition queue.
    }
}

As you can see, it maintains a doubly linked list of nodes, where each node hold waiting thread and its waitStatus. AQS has its own state whose value can be decided by client api. It also provides implementation of Condition interface.


This was an initial introduction to AQS and we discussed how java used this to solve producer consumer problem. We also discussed how AQS helps us to create our own State dependent classes or Synchronizers. I am expecting that this blog would help many programmers to understand basics of multi-threading.


Thanks

6 comments: