Semaphore源码解析

从结构上看,Semaphore 和 ReentrantLock 很相似,ReentrantLock 是基于 AQS 的独占模式,而 Semaphore 是基于共享模式。

一样是内部实现了AQS 的一个 Sync ,然后又衍生了两个子类,FairSync 和 NonfairSync 分别实现了获取锁的公平和非公平策略。

那么不同的地方,应该就在于各自重写的 tryAcquire 和 tryRelease 方法了。这几个方法里的具体实现,决定了他和 ReentrantLock 用途上的差异。

1、构造方法

从构造器可以看出,他和 ReentrantLock 一样,默认都是非公平锁的获取策略,可以通过boolean 参数来指定使用公平锁。

那这个 permits 又是什么呢?

跟下去可以看到最终调用的是 AQS 的 setState,因此这里的 permits 就是 AQS 中的 state 字段,这里可以理解为信号量资源。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

        NonfairSync(int permits) {
            super(permits);
        }

        Sync(int permits) {
            setState(permits);
        }

2、非公平锁 Acquire

一系列调用链的最后,核心方法是nonfairTryAcquireShared

从源码可以看出,这个方法将总的资源数减去需要获取的资源数(1),意思就是是否有1 一个信号量可用,然后通过 CAS 更新到 state,返回剩余的资源数。

然后通过 doAcquireShardInterruptibly ,当节点中的线程获取资源之后,如果资源还足够的话,他会接着唤醒后续节点去获取

 public void acquire() throws InterruptedException {
        //这边通过方法名可以看到是共享模式的方法
        sync.acquireSharedInterruptibly(1);
    }

//AQS中的方法
 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            //如果 tryAcquire 返回小于 0,则
            //doAcquireShared先在队尾插入新节点,然后和 acquiredQueued 相同逻辑进行自旋获取锁,注意这是可中断模式
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

     final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3、公平锁的 Acquire

tryAcquireShared通过 hasQueuedPredecessors 来保证公平。

 protected int tryAcquireShared(int acquires) {
            for (;;) {
                //当头结点后有节点在排队时,返回 true,返回 -1,则调用doAcquireSharedInterruptibly插入同步队列
                if (hasQueuedPredecessors())
                    return -1;
                //否则的话,那就是当前线程去获取资源,将资源state-1 并返回
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }


//返回 true 表示头结点后有节点在排队
 public final boolean hasQueuedPredecessors() { 
        Node t = tail;  
        Node h = head;
        Node s;
     //当队列不为空,或者h.next为空或者当前线程不是头结点next的线程,返回 true
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

所以和 ReentrantLock 一样,通过hasQueuedPredecessors来实现公平。

4、释放锁

关键在于tryReleaseShared

将释放的资源加上现有的资源,然后通过 CAS 更新 state,返回 true

当 tryReleaseShared 返回 true 时,调用 doReleaseShared,在 AQS 的分析中我们得知,该方法可以唤醒后面的节点。

 public void release() {
        sync.releaseShared(1);
    }
  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
 protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }