news 2026/4/3 10:42:43

基于AQS实现的ReentrantLock

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于AQS实现的ReentrantLock

基于AQS实现的ReentrantLock

这里的源码用的Java8版本

lock方法

当ReentrantLock类的实例对象尝试获取锁的时候,调用lock方法,

会进入sync的lock方法,其中Sync是ReentrantLock的一个内部类,ReentrantLock构造方法会默认使用非公平锁NonfairSync,这个类是继承于Sync的

/* by 01022.hk - online tools website : 01022.hk/zh/post.html */ final void lock() { if (!initialTryLock()) acquire(1); } // 其中Sync的initialTryLock是抽象方法,需要看非公平锁实现方法

[!TIP]
在这里是第一次尝试获取锁

由于ReentrantLock是个可重入锁,判断里有重入的判断

/* by 01022.hk - online tools website : 01022.hk/zh/post.html */ final boolean initialTryLock() { Thread current = Thread.currentThread(); // 获取当前线程的对象 if (compareAndSetState(0, 1)) { // first attempt is unguarded // 用CAS比较state状态是否为0(无人持有锁),如果是,就转为1(获取到锁) setExclusiveOwnerThread(current); // 将当前进程设置为拥有锁的线程 return true; } else if (getExclusiveOwnerThread() == current) { // 当前线程为拥有锁的线程(重复获取),重入 int c = getState() + 1; if (c < 0) // overflow // 负数,state是个int类型数据,超出可能导致溢出变为负数 throw new Error("Maximum lock count exceeded"); setState(c); // 设置新的state return true; } else // 已有线程占锁,返回为false return false; }

然后开始调用acquire方法,传入1

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

调用tryAcquire()方法,其中tryAcquire()方法是一个只有抛出异常的方法,需要重写,我们看非公平锁的写法

[!TIP]
这是第二次获取锁

protected final boolean tryAcquire(int acquires) { if (getState() == 0 && !hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }

这里,如果state是0,即没有线程占用锁的情况下getState() == 0​这个为真!hasQueuedPredecessors()执行这个方法,这个方法会检查是否已经出现了等待队列

public final boolean hasQueuedPredecessors() { Thread first = null; Node h, s; if ((h = head) != null && ((s = h.next) == null || (first = s.waiter) == null || s.prev == null)) first = getFirstQueuedThread(); // retry via getFirstQueuedThread return first != null && first != Thread.currentThread(); }

当未出现 同步队列/阻塞队列 ,或者当前线程是队列的第一个时,执行compareAndSetState(0, acquires),第二次尝试获取锁,如果成功,返回真

否则返回假,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 尝试加入队尾 pred.next = node; return node; } } enq(node); return node; }

Node是双向队列:阻塞队列一个节点,是为了保证原子化所以包装起来的

如果tail尾指针指向的节点不为空,则设置新生成的为尾指针指向的

否则(阻塞队列为空),调用enq函数

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 使用CAS,防止多线程同时创建头节点,所以本质上还是需要抢入队顺序 tail = head; // 初始化头节点,并将尾指针指向头节点 } else { node.prev = t; if (compareAndSetTail(t, node)) { // 判断t是否为尾节点,如果有线程更快的改掉尾节点,那么修改失败, // 重新进入for循环 t.next = node; return t; // 修改成功 } } } }

[!TIP]
这是第三次尝试获取锁

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 获取node的前一个节点,如果前一个节点是头节点(当前节点是第一个) // 执行tryAcquire(arg),执行第三次尝试获取锁 if (p == head && tryAcquire(arg)) { // 获取锁成功,出队 setHead(node);// 将node设为头节点 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

如果第三次尝试获取锁失败了,会调用shouldParkAfterFailedAcquire()方法,将node的前一个节点传入(node一直都是加入的节点)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 确认前面的节点处于SIGNAL状态,即确认前面的节点会叫醒自己 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // Node里面仅有一个大于零的状态,即1取消状态,也就是说当前任务被取消了 // 持续循环值找到不再取消的节点 pred.next = node; } else { // 将前一个节点用CAS转为Node.SIGNAL状态-1,返回为false /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

这里插一嘴,Node节点有一些状态,来体现其的任务状态,如前面传入的就是独占队列,addWaiter(Node.EXCLUSIVE)

static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); // 共享队列 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; // 独占队列 /** waitStatus value to indicate thread has cancelled */// 取消 static final int CANCELLED = 1; // 已被取消 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; // 表示next节点已经park,需要被唤醒 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 共享状态 static final int PROPAGATE = -3;
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;

如果前一个节点的waitState是0,会被CAS转为-1,然后返回false,进而不会执行parkAndCheckInterrupt(),继续for的无限循环,这里有可能出现第四次尝试

如果前一个节点的waitState是-1,该函数返回一个true,也就可以继续执行parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

当前线程进入park状态

至此我们完成了这个的lock过程

unlock方法

unlock()也是公平锁以及非公平锁都有的方法,同样继承了Sync

public void unlock() { sync.release(1); }

Sync的release方法

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

首先尝试tryRelease方法

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

如果成功醒过来,该线程依然处于一种park的位置上,即parkAndCheckInterrupt这个方法上,这个方法返回是否被中断ReentrantLock这个锁仅获取中断信息,而不会做出任何操作

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

苏醒过来之后,继续for循环,尝试获取锁,失败之后会接着park,成功就会获取锁,并返回中断状态,在acquire中决定自我中断

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

并将setExclusiveOwnerThread传入当前线程,返回为真,因此在TryRelease方法里的Thread.currentThread() != getExclusiveOwnerThread()一定为假,不会抛出异常,并设置free为false,当c也就是资源的state如果是0

if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free;

c如果是0,即没有线程占用资源,setExclusiveOwnerThread将锁的线程设置为空,如果不为0,也就是重入锁仅仅解锁一次,c依然存在多个,设置c为新的state值,然会free值(资源锁的使用情况)

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;、 // 如果下一个节点的状态为取消或者为空,从后向前找最后一个满足条件的,赋值为s if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // s不为空的话作为下一个被唤醒的节点,尝试唤醒 if (s != null) LockSupport.unpark(s.thread); }

此时,当前节点为头节点,调用unparkSuccessor()方法,获取头节点的下一个节点

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/3 4:50:34

R语言:数据分析与可视化的利器

摘要 R语言作为一种开源的统计计算和图形编程语言&#xff0c;在数据科学、统计分析和可视化领域占据着核心地位。本文探讨了R语言的核心优势&#xff0c;并通过实际案例展示了其在数据整理、统计建模和可视化分析中的应用。 1. R语言的核心优势 1.1 生态系统完善 R语言拥有…

作者头像 李华
网站建设 2026/3/31 2:37:42

你应该先参加工作,还是去读研

你应该先参加工作&#xff0c;还是去读研站在人生的十字路口&#xff0c;很多应届生或职场新人都会陷入这样的迷茫与纠结&#xff1a;一边是走进职场&#xff0c;用实际工作积累经验、实现经济独立&#xff0c;在实践中摸清自己的职业方向&#xff0c;一步步褪去青涩、成长为独…

作者头像 李华
网站建设 2026/3/27 7:36:45

从斑马鱼到机器鱼:机器人实验重塑神经行为研究

美国杜克大学与葡萄牙高等理工大学的联合团队&#xff0c;已率先运用机器人部分替代动物开展生理学实验&#xff0c;旨在深入探究动物神经网络对各类智能行为的调控机制。当大多数人仍聚焦于让机器人承担端茶倒水等家务时&#xff0c;来自瑞士联邦理工学院&#xff08;洛桑&…

作者头像 李华
网站建设 2026/3/27 3:03:22

内部强化学习破解AI长程推理难题

某中心的“内部强化学习”如何解锁长视距AI智能体 研究人员开发了一种新技术&#xff0c;使得AI模型更容易学习通常会导致大语言模型产生幻觉或崩溃的复杂推理任务。他们的技术被称为内部强化学习&#xff0c;不是通过下一个令牌预测来训练大语言模型&#xff0c;而是引导模型的…

作者头像 李华
网站建设 2026/3/13 4:52:22

从数据成功到人工智能成功:极简人工智能治理

人工智能治理是指组织用来确保人工智能实例安全、公平且使用正确的一系列规则和检查措施。就数据成功而言&#xff0c;它确保人工智能使用优质、干净的数据&#xff0c;遵守法律法规和政策&#xff0c;并由人工对重要决策进行复核&#xff0c;也就是所谓的“人机协同设计”。一…

作者头像 李华