ReentrantLock 实现

AQS 与 ReentrantLock 的实现

ReentrantLock 分为公平锁和非公平锁,可以通过构造方法来指定具体类型:

// 默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

// 公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

我们首先概览下 ReentrantLock 加解锁的过程,公平锁具体是由其子类(FairSync)来实现的,其 tryAcquire 方法如下:

protected final boolean tryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		if (!hasQueuedPredecessors() &&
			compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0)
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

首先会判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中中是否有其他线程,如果有则不会尝试获取锁(这是公平锁特有的情况)。如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为 1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程(setExclusiveOwnerThread(current))。如果 state 大于 0 时,说明锁已经被获取了,则需要判断获取锁的线程是否为当前线程(ReentrantLock 支持重入),是则需要将 state + 1,并将值更新。

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程写入队列中。写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node); 来写入了。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

写入队列之后需要将当前线程挂起(利用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

他是利用 LockSupport 的 part 方法来挂起当前线程的,直到被唤醒。公平锁与非公平锁的差异主要在获取锁:公平锁就相当于买票,后来的人需要排到队尾依次买票,不能插队。而非公平锁则没有这些规则,是抢占模式,每来一个人不会去管队列如何,直接尝试获取锁。

final void lock() {
    //直接尝试获取锁
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

还要一个重要的区别是在尝试获取锁时 tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //没有 !hasQueuedPredecessors() 判断
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

加锁流程概述

公平锁和非公平锁的释放流程都是一样的:

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	   // 唤醒被挂起的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 尝试释放锁
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。

线程加入等待队列

当执行 Acquire(1)时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter 加入到等待队列中去。获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
	return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。

  • Pred 指针指向尾节点 Tail。

  • 将 New 中 Node 的 Prev 指针指向 Pred。

  • 通过 compareAndSetTail 方法,完成尾节点的设置。这个方法主要是对 tailOffset 和 Expect 进行比较,如果 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,那么设置 Tail 的值为 Update 的值。

    // java.util.concurrent.locks.AbstractQueuedSynchronizer
    
    static {
    	try {
    		stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    		headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
    		tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
    		waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
    		nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
    	} catch (Exception ex) {
    	throw new Error(ex);
    }
    }
    

    从 AQS 的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性。tailOffset 指的是 tail 对应的偏移量,所以这个时候会将 new 出来的 Node 置为当前队列的尾节点。同时,由于是双向链表,也需要将前一个节点指向尾节点。

  • 如果 Pred 指针是 Null(说明等待队列中没有元素),或者当前 Pred 指针和 Tail 指向的位置不同(说明被别的线程已经修改),就需要看一下 Enq 的方法。

    // java.util.concurrent.locks.AbstractQueuedSynchronizer
    
    private Node enq(final Node node) {
    	for (;;) {
    		Node t = tail;
    		if (t == null) { // Must initialize
    			if (compareAndSetHead(new Node()))
    				tail = head;
    		} else {
    			node.prev = t;
    			if (compareAndSetTail(t, node)) {
    				t.next = node;
    				return t;
    			}
    		}
    	}
    }
    

    如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

总结一下,线程获取锁的时候,过程大体如下:

  • 当没有线程获取到锁时,线程 1 获取锁成功。
  • 线程 2 申请锁,但是锁被线程 1 占有。
  • 如果再有线程要获取锁,依次在队列中往后排队即可。

等待队列

回到上边的代码,hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
	// The correctness of this depends on head being initialized
	// before tail and on head.next being accurate if the current
	// thread is first in queue.
	Node t = tail; // Read fields in reverse initialization order
	Node h = head;
	Node s;
	return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当 h != t 时:如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了 Tail 指向 Head,没有将 Head 指向 Tail,此时队列中有元素,需要返回 True(这块具体见下边代码分析)。如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时 s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果 s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

if (t == null) { // Must initialize
	if (compareAndSetHead(new Node()))
		tail = head;
} else {
	node.prev = t;
	if (compareAndSetTail(t, node)) {
		t.next = node;
		return t;
	}
}

节点入队不是原子操作,所以会出现短暂的 head != tail,此时 Tail 指向最后一个节点,而且 Tail 指向 Head。如果 Head 没有指向 Tail(可见 5、6、7 行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。

等待队列中线程出队列时机

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

上文解释了 addWaiter 方法,这个方法其实就是把对应的线程以 Node 的数据结构形式加入到双端队列里,返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 方法中。acquireQueued 方法可以对排队中的线程进行“获锁”操作。总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	// 标记是否成功拿到资源
	boolean failed = true;
	try {
		// 标记等待过程中是否中断过
		boolean interrupted = false;
		// 开始自旋,要么获取锁,要么中断
		for (;;) {
			// 获取当前节点的前驱节点
			final Node p = node.predecessor();
			// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
			if (p == head && tryAcquire(arg)) {
				// 获取锁成功,头指针移动到当前node
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

注:setHead 方法是把当前节点置为虚节点,但并没有修改 waitStatus,因为它是一直需要用的数据。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
	head = node;
	node.thread = null;
	node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取头结点的节点状态
	int ws = pred.waitStatus;
	// 说明头结点处于唤醒状态
	if (ws == Node.SIGNAL)
		return true;
	// 通过枚举值我们知道waitStatus>0是取消状态
	if (ws > 0) {
		do {
			// 循环向前查找取消节点,把取消节点从队列中剔除
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 设置前任节点等待状态为SIGNAL
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

parkAndCheckInterrupt 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

上述方法的流程图如下:

线程获取流程

从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire 流程):

挂起流程

CANCELLED 状态节点生成

acquireQueued 方法中的 Finally 代码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
    ...
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				...
				failed = false;
        ...
			}
			...
	} finally {
		if (failed)
			cancelAcquire(node);
		}
}

通过 cancelAcquire 方法,将 Node 的状态标记为 CANCELLED。接下来,我们逐行来分析这个方法的原理:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
  // 将无效节点过滤
	if (node == null)
		return;
  // 设置该节点不关联任何线程,也就是虚节点
	node.thread = null;
	Node pred = node.prev;
  // 通过前驱节点,跳过取消状态的node
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;
  // 获取过滤后的前驱节点的后继节点
	Node predNext = pred.next;
  // 把当前node的状态设置为CANCELLED
	node.waitStatus = Node.CANCELLED;
  // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
	if (node == tail && compareAndSetTail(node, pred)) {
		compareAndSetNext(pred, predNext, null);
	} else {
		int ws;
    // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
    // 如果1和2中有一个为true,再判断当前节点的线程是否为null
    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
		if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
			Node next = node.next;
			if (next != null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
      // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
			unparkSuccessor(node);
		}
		node.next = node; // help GC
	}
}

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node 设置为 CANCELLED。

  • 根据当前节点的位置,考虑以下三种情况:

    • 当前节点是尾节点。
    • 当前节点是 Head 的后继节点。
    • 当前节点不是 Head 的后继节点,也不是尾节点。

当前节点是尾节点。

尾节点

当前节点是 Head 的后继节点。

Head 后继节点

当前节点不是 Head 的后继节点,也不是尾节点。

中间节点

通过上面的流程,我们对于 CANCELLED 节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev 指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。

do {
	node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

如何解锁

我们已经剖析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析。由于 ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:

// java.util.concurrent.locks.ReentrantLock

public void unlock() {
	sync.release(1);
}

可以看到,本质释放锁的地方,是通过框架来完成的。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

在 ReentrantLock 里面的公平锁和非公平锁的父类 Sync 定义了可重入锁的释放锁机制。

// java.util.concurrent.locks.ReentrantLock.Sync

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
	// 减少可重入次数
	int c = getState() - releases;
	// 当前线程不是持有锁的线程,抛出异常
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
	// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
	if (tryRelease(arg)) {
		// 获取头结点
		Node h = head;
		// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}
  • h == null Head 还没初始化。初始情况下,head == null,第一个节点入队,Head 会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null 的情况。
  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

再看一下 unparkSuccessor 方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void unparkSuccessor(Node node) {
	// 获取头结点waitStatus
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);
	// 获取当前节点的下一个节点
	Node s = node.next;
	// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
	if (s == null || s.waitStatus > 0) {
		s = null;
		// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
	if (s != null)
		LockSupport.unpark(s.thread);
}

为什么要从后往前找第一个非 Cancelled 的节点呢?原因如下。之前的 addWaiter 方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作 Tail 入队的原子操作,但是此时 pred.next = node;还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node。综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行 acquireQueued 方法以后,中断如何处理?

中断恢复后的执行流程

唤醒后,会执行 return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}

再回到 acquireQueued 代码,当 parkAndCheckInterrupt 返回 True 或者 False 的时候,interrupted 的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted 返回。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
			}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

如果 acquireQueued 为 True,就会执行 selfInterrupt 方法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

static void selfInterrupt() {
	Thread.currentThread().interrupt();
}

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于 Java 提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

  • 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
  • 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元 Worder 中的 runWorker,通过 Thread.interrupted()进行额外的判断处理。

其他

超时机制

在 ReetrantLock 的 tryLock(long timeout, TimeUnit unit) 提供了超时获取锁的功能。它的语义是在指定的时间内如果获取到锁就返回 true,获取不到则返回 false。这种机制避免了线程无限期的等待锁释放。那么超时的功能是怎么实现的呢?我们还是用非公平锁为例来一探究竟。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

还是调用了内部类里面的方法。我们继续向前探究:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

这里的语义是:如果线程被中断了,那么直接抛出 InterruptedException。如果未中断,先尝试获取锁,获取成功就直接返回,获取失败则进入 doAcquireNanos。tryAcquire 我们已经看过,这里重点看一下 doAcquireNanos 做了什么。

/**
 * 在有限的时间内去竞争锁
 * @return 是否获取成功
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 起始时间
    long lastTime = System.nanoTime();
    // 线程入队
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱是头节点并且占用锁成功,则将当前节点变成头结点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 如果已经超时,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超时时间未到,且需要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞当前线程直到超时时间到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相应中断
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireNanos 的流程简述为:线程先入等待队列,然后开始自旋,尝试获取锁,获取成功就返回,失败则在队列里找一个安全点把自己挂起直到超时时间过期。这里为什么还需要循环呢?因为当前线程节点的前驱状态可能不是 SIGNAL,那么在当前这一轮循环中线程不会被挂起,然后更新超时时间,开始新一轮的尝试。

上一页