学习完Docker之后,发现了kubernetes这个容器云框架,于是就自己部署来玩玩。大家也可以按照这个和我一步步部署 kubernetes 集群文章来部署。最近在这里花费了大量的时间,之后希望整理一下相关的原理介绍。

kuber1.png

kube3.png

问题列表和解决方案

 Devops的概念已经火了很久了,我一直想对这方面进行一定的了解;再加上实验室项目环境依赖比较复杂,希望使用Docker来解决,所以最近就好好研究了一波Docker的相关实践和原理。这里整理一下,希望组成一个系列,从实践到原理详细讲解一下Docker的使用。
 第一篇就讲一下Jenkins+Docker的自动化部署实践。大致的流程如下:目前我有两个服务器,分别是阿里云和bandwagon,代码存储在github上,每次push都会触发阿里云上的jenkins的构建任务,jenkins将github上的代码fetch到本地,编译打包成war文件,生成docker image并上传到docker registry上,然后通过ssh来登录bandwagon服务器pull下来新生成的image并启动。由于篇幅问题,本篇文章不会介绍有关docker image的build和docker registry的搭建,但是我会在后续文章中再做详细讲解。
 学习Docker,我推荐先在网络上找说明指南,一步一步自己尝试的使用,然后如果觉得有必要可以看一下《Docker容器和容器云》这本书。
 本文内容都是docker和jenkins的基础知识,为了节约你的时间,本文的主要内容如下:

  • docker 基础命令
  • jenkins docker版本的搭建,构建任务的配置
  • Pubish Over SSH 安装和配置
  • 通过github的webhook来触发jenkins构建任务

Docker运行jenkins

 Docker如此火爆的一个原因是因为它形成了一个良好的生态圈,基本上主流的软件应用都有相应的Docker image。如果大家不清楚Docker image的含义,建议大家看一下Docker中文指南,我们可以通过docker pull命令来下载响应的image,然后运行。比如我们希望在阿里云服务器上部署一个jenkins应用,首先可以执行下列语句来获取一个jenkins的image。

1
docker pull jenkinsci/jenkins:lts

 这里我们使用pull从docker registry上拉取image,但是目前业界上有很多共有或在私有的docker registry,比如说docker hub和daoCloud。所以image的全称就由三部分组成:域名或在ip + / + 软件名称 + : + 版本号,所以上边的这条命令就是让docker去jenkinsci这个Jenkins机构自己部署的registry上下载jenkins的lts版本的image.你也可以直接使用docker pull jenkins来下载image,但docker会默认的从docker hub上下载jenkins的laster版本。

 下载成功之后,你可以使用docker images命令来查看当前下载的image信息

 你可以通过docker run命令来运行docker容器,请注意我这里的用词,在Docker中image和container是不同的概念,你可以将他们简单的理解成Java中类和对象的关系。我们使用下面的命令来启动这个jenkins容器。

1
sudo docker run -d --name jenkins -p 9090:8080 -v /var/jenkins_home:/var/jenkins_home jenkinsci/jenkins:lts

 我们来依次讲解一下run命令的几个参数把:

  • -d 后台运行docker容器并打印容器ID。如果不加-d参数,那么容器运行会和终端绑定,如果终端关闭,那么容器也会关闭,但是容器不会被删除。但是如果你只是想试一试某个容器,运行后自动进入命令行,那么可以使用-it参数;如果你想容器关闭之后自动删除,那么就使用-rm参数。

  • --name 给docker container起一个别名,后续可以通过别名来管理容器,否在会系统会默认分配一个随机的别名。

  • -p docker容器和外侧的端口映射,jenkins服务是运行在docker容器内部的,但是docker容器默认不对外暴露接口,所以通过这个参数将内部的8080端口映射到服务器本身的9090端口上。

  • -v 数据卷的挂载。这里涉及到docker container的一个特性,container如果停止运行了,那么再次启动时,之前所有运行相关的数据和文件就都不存在了,就类似于设置了自动还原的电脑一般,无论你做了多少的操作,一旦关机重启之后就又恢复到最初的状态。数据卷就是来解决上述问题的,通过Docker container外部的文件夹的挂载,将可持久化的文件存储到外部挂载的文件夹中。

 然后你就可以根据你自己的ip地址来键入下列地址http:ip:9090来访问jenkins的主页了。
 这里有一点需要注意的是,需要注意你阿里云服务器设置的网络安全协议,是否禁用掉了9090这个端口。

Publish over SSH配置

 Jenkins的初始化配置和SSH Over Publish的安装请大家自行百度,这里我主要讲解一下SSH Over Pushlish配置。
 首先我们要在jenkins服务器上生成密钥对,使用ssh-keygen -t rsa命令来生成秘密对,这样的话,在~/.ssh/下就会有私钥id_rsa和公钥id_rsa.pub。
 然后你需要上传公钥到目标服务器上,也就是我的bandwagon服务器上,可以使用ssh-copy-id来将文件上传到服务器上,类似于scp命令的使用方式。

1
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<host>

 最后我们需要修改目标服务器的ssh配置文件,配置文件为/etc/ssh/sshd_config。设置ssh-server允许使用私钥和公钥对的方式登录,然后使用sudo /etc/init.d/ssh restart命令重启ssh服务。

1
2
3
RSAAuthentication yes
PubkeyAuthentication yes
#AuthorizedKeysFile %h/.ssh/authorized_keys

 上述步骤成功之后,大家在系统管理中配置Publish over SSH。相关的配置信息如下图所示。

jenkins1.png

 你还可以点击下方的高级选项,来配置ssh服务器的端口,超时时间等信息,还可以点击Test Configuration来检测是否配置成功。

构建任务配置

 我们先创建一个构建任务,该任务从github repo上将代码拉取下来,然后执行构建任务,然后通过Publish Over SSH在目标服务器上进行部署。
 我们首先配置源码管理模块,选择Git选项,然后配置Repository URL 并添加认证信息。可以将自己的github帐号和密码加入其中。

jenkins2.png

 不同的项目的构建命令不同,但是我们可以在构建后操作模块设置后续操作,通过ssh登录目标服务器,让目标服务器执行命令行操作来pull最新上传的image并且执行,这样就完成了部署。

jenkins3.png

Push触发构建任务

 完成上述配置,你就可以手动在jenkins上启动构架任务了,但是要做到自动化部署,还必须设置Push操作自动触发jenkins构建任务的机制。
 我们先到首页-用户管理界面打开自己的用户界面,然后点击左侧的设置按钮,并点击show API token按钮来获取API token.然后在构建任务设置页面的构建触发器模块勾选触发远程构建选项,并将token填到里边去。这是jenkins会提示你如何通过URL来触发构建任务。
jenkins5.png

 然后我们打开github上相应库的设置页面。点击左侧的Webhooks选项,然后添加hook.将上述的url填写到Payload URL栏中,点击添加。如果添加成功之后,每次你push一个新版本,那么jenkins就会自动进行部署了。
jenkins6.png

 如果你发现webhooks发送请求失败,那可能是因为你jenkins安全设置的问题,禁止掉了发送请求自动化构建。

后记

 本篇讲的都是十分基础性的内容,后一篇文章讲一下dockerfile的原理和注意事项与docker registry。

 最近在阅读《多处理器编程艺术》一书,掌握了很多Java多线程的底层知识,现在就做一下书中链表-锁的作用一章的总结。
 为了节约你的时间,本文主要内容如下:

  • 带锁的链表队列
  • 细粒度同步
  • 乐观同步
  • 惰性同步
  • 非阻塞同步

粗粒度同步

 所谓粗粒度同步其实很简单,就是在List的add,remove,contains函数的开始就直接使用Lock加锁,然后在函数结尾释放。
add函数的代码如下所示,函数的主体就是链表的遍历添加逻辑,只不过在开始和结束进行了锁的获取和释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Node head;
private Lock lock = new ReentrantLock();
public boolean add(T item) {
Node pred, curr;
int key = item.hashCode();
lock.lock();
try {
pred = head;
curr = pred.next;
while(curr.key < key) {
pred = curr;
curr = pred.next;
}
if (key == curr.key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
return true;
}

} finally {
lock.unlock();
}
}

 大家看到这里就会想到,这不就是类似于Hashtable的实现方式吗?把可能出现多线程问题的函数都用一个重入锁锁住。但是这个方法的缺点很明显,如果竞争激烈的话,对链表的操作效率会很低,因为add,remove,contains三个函数都需要获取锁,也都需要等待锁的释放。至于如何优化,我们可以一步一步往下看

细粒度同步

我们可以通过锁定单个节点而不是整个链表来提高并发。给每个节点增加一个Lock变量以及相关的lock()和unlock()函数,当线程遍历链表的时候,若它是第一个访问节点的线程,则锁住被访问的节点,在随后的某个时刻释放锁。这种细粒度的锁机制允许并发线程以流水线的方式遍历链表。
 使用这种方式来遍历链表,必须同时获取两个相邻节点的锁,通过“交叉手”的方式来获取锁:除了初始的head哨兵节点外,只有在已经获取pred的锁时,才能获取curr的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//每个Node对象中都有一个Lock对象,可以进行lock()和unlock()操作
public boolean add(T item) {
int key = item.hashCode();
head.lock();
Node pred = head;
try {
Node curr = pred.next;
curr.lock();

try {
while (curr.key < key) {
pred.unlock();
pred = curr;
curr = pred.next;
curr.lock();
}

if (curr.key == key) {
return false;
}
Node newNode = new Node(item);
newNode.next = curr;
pred.next = newNode;
return true;
} finally {
curr.unlock();
}

} finally {
pred.unlock();
}
}

乐观同步

 虽然细粒度锁是对单一粒度锁的一种改进,但它仍然出现很长的获取锁和释放锁的序列。而且,访问链表中不同部分的线程仍然可能相互阻塞。例如,一个正在删除链表中第二个元素的线程将会阻塞所有试图查找后继节点的线程。
 减少同步代价的一种方式就是乐观:不需要获取锁就可以查找,对找到的节点进行加锁,然后确认锁住的节点是正确的;如果一个同步冲突导致节点被错误的锁定,则释放这些锁重新开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean add(T item) {
int key = item.hashCode();

while (true) { //如果不成功,就进行重试
Node pred = head;
Node curr = pred.next;
while (curr.key < key) {
pred = curr;
curr = pred.next;
}
//找到目标相关的pred和curr之后再将二者锁住
pred.lock();
curr.lock();
try {
//锁住二者之后再进行判断,是否存在并发冲突
if (validate(pred, curr)) {
//如果不存在,那么就直接进行正常操作
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
}
}
} finally {
pred.unlock();
curr.unlock();
}
}
}
public boolean validate(Node pred, Node curr) {
//从队列头开始查找pred和curr,判断是否存在并发冲突
Node node = head;
while (node.key <= pred.key) {
if (node == pred) {
return pred.next == curr;
}
node = node.next;
}
return false;
}

 由于不再使用能保护并发修改的锁,所以每个方法调用都可能遍历那些已经被删除的节点,所以在进行添加,删除获取判断是否存在的之前必须再次进行验证。

惰性同步

 当不用锁遍历两次链表的代价比使用锁遍历一次链表的代价小很多时,乐观同步的实现效果非常好。但是这种算法的缺点之一就是contains()方法在遍历时需要锁,这一点并不令人满意,其原因在于对contains()的调用要比其他方法的调用频繁得多。
使用惰性同步的方法,使得contains()调用是无等待的,同时add()和remove()方法即使在被阻塞的情况下也只需要遍历一次链表
对每个节点增加一个布尔类型的marked域,用于说明该节点是否在节点集合中。现在,遍历不再需要锁定目标结点,也没有必须通过重新遍历整个链表来验证结点是否可达。所有未被标记的节点必然是可达的

1
2
3
4
5
//add方法和乐观同步的方法一致,只有检验方法做了修改。
//只需要检测节点的marked变量就可以,并且查看pred的next是否还是指向curr,需要注意的是marked变量一定是voliate的。
private boolean validate(Node pred, Node curr) {
return !pred.marked && !curr.marked && pred.next == curr;
}

 惰性同步的优点之一就是能够将类似于设置一个flag这样的逻辑操作与类似于删除结点的链接这种对结构的物理改变分开。通常情况下,延迟操作可以是批量处理方式进行,且在某个方便的时候再懒惰地进行处理,从而降低了对结构进行物理修改的整体破裂性。惰性同步的主要缺点是add()和remove()调用是阻塞的:如果一个线程延迟,那么其他线程也将延迟。

非阻塞同步

 使用惰性同步的思维是非常有益处的。我们可以进一步将add(),remove()和contains()这三个方法都变成非阻塞的。前两个方法是无锁的,最后一个方法是无等待的。我们无法直接使用compareAndSet()来改变next域来实现,因为这样会出现问题。但是我们可以将结点的next域和marked域看作是单个的原子单位:当marked域为true时,对next域的任何修改都将失败。
 我们可以使用AtomicMarkableReference对象将指向类型T的对象引用next和布尔值marked封装在一起。这些域可以一起或单个地原子更新。可以让每个结点的next域为一个AtomicMarkableReference。线程可以通过设置结点next域中的标记位来逻辑地删除curr,和其他正在执行add()和remove()的线程共享物理删除:当每个线程遍历链表时,通过物理删除所有被标记的节点来清理链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

public Window find(Node head, int key) {
Node pred = null, curr = null, succ = null;
boolean[] marked = {false};
boolean snip;

retry: while(true) {
pred = head;
curr = curr.next.get(marked);
while(true) {
succ = curr.next.get(marked); //获取succ,并且查看是被被标记
while (marked[0]) {//如果被标记了,说明curr被逻辑删除了,需要继续物理删除
snip = pred.next.compareAndSet(curr, succ, false, false);//
if (!snip) continue retry;
curr = succ;
succ = curr.next.get(marked);
}
//当不需要删除后,才继续遍历
if (curr.key >= key) {
return new Window(pred, curr);
}
pred = curr;
curr = succ;
}
}
}

public boolean add(T item) {
int key = item.hashCode();
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = new AtomicMarkableReference<>(curr, false);
if (pred.next.compareAndSet(curr, node, false, false)) {
return true;
}
}
}
}

public boolean remove(T item) {
int key = item.hashCode();
boolean sinp;
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key != key) {
return false;
} else {
Node succ = curr.next.getReference();
//要进行删除了,那么就直接将curr.next设置为false,然后在进行真正的物理删除。
sinp = curr.next.compareAndSet(curr, succ, false, true);
if (!sinp) {
continue;
}
pred.next.compareAndSet(curr, succ, false, false);
return true;
}
}
}


class Node {
AtomicMarkableReference<Node> next;
}

后记

 文中的代码在我的github的这个repo中都可以找到。

 和16年计划一样,建立一个计划目录,记录一下17年的计划和实现情况,进行不定时的更新。

计划清单

英语学习

单词量

  • 口语小助手 2017.1.30~

口语

  • 每天一集《摩登家庭》的听力口语练习 2017.1.30~

Android开发

Android性能方面研究

GT开源项目研究

博客文章

Android富文本
Android Trasication动画

前端开发

要阅读的书籍

  • 《暗时间》2017.1.30~

    习惯养成

生活

  • 关灯之后不准再玩手机
  • 11点半左右睡觉

思维

  • 每日的思考,总结和回顾

Update 2017.1.1

 开始制定计划,一个星期时间。

Update 2017.1.30

 制订2月计划

Android

  • 两篇博文:Android富文本和Android Trasication动画
  • Gt开源项目研究

《暗时间》研读

英语口语练习

思维的习惯,总结,回顾,自醒!!!!!

Update 2017.4.6

书籍 正在进行,预计时间3个月

  • 多核编程的艺术
  • 高效能Mysql

中间件

RPC简单框架

Netty源码分析

Java并发知识

JUC源码分析

无锁算法

Update 2017.5.3

书籍 正在进行,预计时间3个月

  • 多核编程的艺术 读完第一章
  • 高效能Mysql

mit 分布式课程 预计2个月

主页:http://nil.csail.mit.edu/6.824/2015/index.html
github:https://github.com/ztelur/mit-distributed-systems
经典课程啊,使用go语言,坚持自己把lab都做完

中间件

RPC简单框架

 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在JUC包中的所有多线程相关的类都和AQS相关,今天我就在这里总结一下另一个依赖于AQS来实现的同步工具类:BlockingQueue。我们主要以ArrayBlockingQueue为主来分析相关的源码。

阻塞队列

 相信大多数同学都是在学习线程池相关知识时了解到阻塞队列的概念的。知道各种类型的阻塞队列对线程池初始化时的影响。在java doc中这样定义阻塞队列。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞知道另外一个线程从队列中读取一个元素。阻塞队列一般都是FIFO,用来实现生产者和消费者模式。阻塞队列的方法通过四种不同的方式来处理操作无法被立即完成的情况,这四种情况分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,然后还未执行成功就放弃操作。这些方法都总结在下边这种表中了。

BlockingQueue

 我们就只分析puttake方法。

put和take函数

 我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将put函数看作生产者的操作,take是消费者的操作。
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先获得锁
try {
while (count == items.length)
//如果队列满了,就NotFull这个condition对象上进行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
//当putIndex达到最大时,就返回到起点,继续插入,
//当然,如果此时0位置的元素还没有被取走,
//下次put时,就会因为cout == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//因为插入了元素,通知等待notEmpty事件的线程。
notEmpty.signal();
}

 我们会发现put函数也是使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLockCondition相结合的先获得锁,再等待的机制;而不是synchronizedObject.wait的机制。这里的区别我们下一节再详细讲解。
 看完了生产者相关的put函数,我们再来看一下消费者调用的take函数。take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列为空,那么在notEmpty对象上等待,
//当put函数调用时,会调用notEmpty的notify进行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//如果到了尾部,将指针重新调整到头部
takeIndex = 0;
count--;
....
//通知notFull对象上等待的线程
notFull.signal();
return x;
}

Condition.await和Object.wait

 我们发现ArrayBlockingList并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?其中又有哪些原因呢?
Condition对象可以提供和Objectwaitnotify一样的行为,但是后者必须使用synchronized这个内置的monitor锁,而Condition使用的是RenentranceLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。
&emsp;Condition的流程大致如下边两张图所示.

await

notify

 我们首先来看一下await函数的实现,详细的讲解都在代码中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait队列上添加新的节点
Node node = addConditionWaiter();
//释放当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//由于node在之前是添加到condition wait queue上的,现在判断这个node
//是否被添加到Sync的获得锁的等待队列上。
//node在condition queue上说明还在等待事件的notify,
//notify函数会将condition queue 上的node转化到Sync的队列上。
while (!isOnSyncQueue(node)) {
//node还没有被添加到Sync Queue上,说明还在等待事件通知
//所以调用park函数来停止线程执行
LockSupport.park(this);
//判断是否被中断,线程从park函数返回有两种情况,一种是
//其他线程调用了unpark,另外一种是线程被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
//再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
 ....
}

final int fullyRelease(Node node) {
//AQS的方法,当前已经在锁中了,所以直接操作
boolean failed = true;
try {
int savedState = getState();
//获取state当前的值,然后保存,以待以后恢复
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/

private int checkInterruptWhileWaiting(Node node) {
//中断可能发生在两个阶段中,一是在等待singla,另外一个是在获得signal之后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

final boolean transferAfterCancelledWait(Node node) {
//这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
//两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

signal函数将等待事件最长时间的线程节点从等待condition的队列移动到获得lock的等待队列中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final void signal() {
//
if (!isHeldExclusively())
//如果当前线程没有获得锁,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将Condition wait queue中的第一个node转移到acquire lock queue中.
doSignal(first);
}

private void doSignal(Node first) {
do {
   //由于生产者的signal在有消费者等待的情况下,必须要通知
//一个消费者,所以这里有一个循环,直到队列为空
//把first 这个node从condition queue中删除掉
//condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal将node转而添加到Sync的acquire lock 队列
}

final boolean transferForSignal(Node node) {
//如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//需要注意的是这里的node进行了转化
//ws>0代表canceled的含义所以直接unpark线程
//如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
//进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
//这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
//如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
//等待锁被释放掉再次抢夺锁,然后再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

后记

 后边一篇文章主要讲解如何自己使用AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶.

&emsp;对LongAdder的最初了解是从Coolshell上的一篇文章中获得的,但是一直都没有深入的了解过其实现,只知道它相较于AtomicLong来说,更加适合读多写少的并发情景。今天,我们就研究一下LongAdder的原理,探究一下它如此高效的原因。

基本原理和思想

 我们都知道AtomicLong是通过无限循环不停的采取CAS的方法去设置value,直到成功为止。那么当并发数比较多或出现更新热点时,就会导致CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,形成恶性循环,从而降低了效率。而LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的“热度”
 我们知道LongAdder的大致原理之后,再来详细的了解一下它的具体实现,其中也有很多值得借鉴的并发编程的技巧。

Add操作

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { //step1
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //step2
(a = as[getProbe() & m]) == null || //step3
!(uncontended = a.cas(v = a.value, v + x))) //step4
longAccumulate(x, null, uncontended); // step5
}
}

cellsLongAdder的父类Striped64中的Cell数组类型的成员变量。每个Cell对象中都包含一个value值,并提供对这个value值的CAS操作。
 我们来看一下casBase函数相关的源码吧。我们可以认为变量base就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,如果成功了,就不需要在进行下面比较复杂的算法,

1
2
3
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

 然后我们继续看step2第二层条件语句中执行的逻辑。如果cells数组为null或为空,就直接调用longAccumulate方法。因为cells为null或在为空,说明cells未完全初始化,所以调用longAccumulate进行初始化。否则继续判断。
 如果cells中已经有对象了,那么执行step3。我们先来理解一下getProbe() & m的这个操作吧。我们可以首先将这个操作当作一次计算”hash”值,然后将cells中这个位置的Cell对象赋值给变量a。然后判断a是否为null,如果不为null,那么就调用Cell对象自己的cas方法去设置value值。如果a为null,或在cas赋值发生冲突,那么也是开始调用longAccumulate方法。

LongAccumulate方法

longAccumulate函数比较复杂,带有我的注释的代码已经贴在了文章后边,这里我们就只讲一下其中比较关键的一些技巧和思想.
 首先,我们都知道只有当对base的cas操作失败之后,LongAdder才引入Cell数组.所以在longAccumulate中就是对Cell数组进行操作.分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操作.
 在这段代码中,关于cellBusy的cas操作构成了一个SpinLock,这就是经典的SpinLock的编程技巧,大家可以学习一下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended)
{

int h;
if ((h = getProbe()) == 0) { //获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
ThreadLocalRandom.current(); //初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) { //cas经典无限循环,不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells不为null,并且数组size大于0
//表示cells已经初始化了
if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
//当cellsBusy为0时,表示当前可以对cells数组进行操作。
Cell r = new Cell(x);//将x值直接赋值给Cell对象
if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
//就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
//如果失败了,就会再次执行一次循环
boolean created = false;
try {
Cell[] rs; int m, j;
//判断cells是否已经初始化,并且要操作的位置上没有cell对象.
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
created = true;
}
} finally {
//经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
//将cellBusy设置为0就是释放锁.
cellsBusy = 0;
}
if (created)
break; //如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
continue;
}
}
collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
 //fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
//就直接返回
break;
else if (n >= NCPU || cells != as)
  //如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次获得cellsBusy这个spinLock,对数组进行resize
try {
if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
Cell[] rs = new Cell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;//赋予cells一个新的数组对象
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//cells数组未初始化,获得cellsBusy lock,来初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x); //设置x的值为cell对象的value值
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}//如果初始化数组失败了,那就再次尝试一下直接cas base变量,如果成功了就直接返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

后记

 本篇文章写的不是很好,我写完之后又看了一遍coolshell上的这篇关于LongAdder文章,感觉自己没有人家写的那么简介明了。我对代码细节的注释和投入太多了。其实很多代码大家都可以看懂,并不需要大量的代码片段加注释。以后要注意一下。之后会接着研究一下JUC包中的其他类,希望大家多多关注。

 最近在学习zookeeper原理的时候了解到了paxos算法,看了几篇文章之后还是感觉有些迷糊,后来看了知行学社的paxos视频才对这个算法有了一定的了解,这里就做一下总结.

Paxos简介

 Paxos是Lamport于1990年提出的一种基于消息传递而具有高度容错特性的分布式一致性算法.这个算法是分布式中最为重要的算法,Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就是Paxos,其他算法都是残次品.具体Paxos算法的详细内涵和故事背景大家可以参考知乎上的回答;

Paxos的使用场景和假设

 我们都知道基于消息传递通信模型的分布式系列,不可避免的会发生以下错误:进程可能会慢,被杀死或在重启,消息可能会有延迟,丢失和重复.Paxos算法解决的问题就是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证无论发生以上任何异常,都不会破坏决议的一致性。但是Paxos算法也有一定的使用假设。一个假设是在消息传递的过程中不会出现拜占庭将军问题:即虽然有可能一个消息被传递两次,但是绝对不会出现错误的消息。另一个假设是提议不会被反对,只能被同意或在被更新的提议替换。
 Paxos协议中有三种角色,每个节点可以扮演多个角色:

  • 倡议者(Proposer):提议者可以提出提议(数值或在操作命令)以供投票表决。
  • 接受者(Acceptor):接受者可以对提议者提出的提议进行投票表决,提议有超过半数的接收者投票即被选中。
  • 学习者(Learner):学习者无投票者,只是从接收者那里获取哪个提议被选中。

 在Paxos算法中,一个或在多个Proposer都可以并发的提出提议;系统必须针对所有提议中的某个提议达成一致(超过半数大的接受者选中);最多只能对一个确定的提议达成一致;只要超过半数的节点存活且可以互相通信,整个系统一定可以达成一致,即选择一个确定的提议。
 如果直接讲解Paxos算法,大家可能会有些难以理解,这里我们就按着视频里的顺序,先从简单的分布式一致性算法开始,然后不断进行优化,最后将其演变成Paxos算法。

图解Paxos主要流程

 Paxoso算法分为两个的阶段,我们就将其分别记为Phase1和Phase2.每个Proposer都持有一个独有的变量epoch,每个Acceptor都保存三个状态:lastest_prepared__epoch,accepted_epoch和accepted_value.lastest_prepared_epoch是指Acceptor授予访问权的Proposer的epoch值,accepted_epoch是Acceptor接受提议的Proposer的epoch值,而accepted_value就是Acceptor接受的提议值喽,他们的初始值都为null。

阶段一

 Phase1过程中,Proposer向Acceptor发起Prepare(epoch)请求来获取访问权。将自己的epoch发送给Acceptor.而Acceptor只会接受比lastest_prepared_epoch更大的epoch,并给予访问权,并将epoch记录到lastest_prepared_epoch的值中,返回当前的accepted_epoch和accepted_value的值。在初始化状态下,二者都是null,所以返回的是。如果epoch小于lastest_prepared_epoch则不授予访问权,并返回
phase1
 如上图所示,Proposer1向5个Acceptor发送了Prepare(#1)的请求,其中前三个请求顺利到达,Acceptor授予访问权,返回,并修改lastest_prepared_epoch为1。而后两个请求发生了网络延迟,一直未到达相应的Acceptor。
 在阶段一中,Proposer需要获得半数以上的Acceptor的访问权和对应的一组value的取值才会进行第二阶段,这样才会确保,一个Proposer提出的确定的议案会被另外一个Proposer发现,从而在阶段二中会进行正确的操作。

阶段二

 第二阶段采取“后者认同前者”的原则进行。在肯定旧epoch无法生成确定性取值时,新的epoch会提交自己的取值,不会冲突;一旦旧epoch形成了确定性取值,那么该proposer一定可以获得该取值,并且会认同该取值,不会破坏。
 如果Proposer在第一阶段获取的value值都是null,则旧epoch无法形成确定性取值,此时让自己的成为确定性取值:

  • 向epoch对应的所有acceptor提交取值
  • 如果收到半数以上的成功应答,则返回
  • 否则返回

 如果value的取值不为null,则认同最大accepted_epoch对应的取值f,使成为确定性取值,其中epoch是自己的epoch.

  • 如果f出现半数以上,则说明f已经是确定性取值了,直接返回
  • 否则,向epoch所对应的acceptor提交取值

 Acceptor在接收到accept(epoch,V)的请求之后,先查看epoch是不是自己记录的lastest_prepared_epoch,如果是则设置 = 。否在则会返回error

paxos2

 如上图所示,由于在阶段一中Proposer1接受到的值都为null,所以,决定将自己的值设置为确定值,于是发送accept(1,V1)请求。Acceptor1接受到了这个请求,检查lastest_prepared_epoch也等于1,所以将自己存储的设置为<1,v1>。而Proposer1的另外两个accept请求发生了网络延迟。
 如果此时,Proposer2向Acceptor进行propose会怎么样呢?我们来模拟propose来分析一下。

paxos3

 如上图所示,Proposer2向Acceptor发送了prepare(#2)的请求,Acceptor1先检测一下发现2大于现在的lastest_prepared_epoch,所以同意发送访问权,将lastest_prepared_epoch修改为2,并将自己保存的accepted_epoch和acceped_value返回给Proposer2;Acceptor3的操作也是类似,只不过因为Proposer1发送的accept请求发生了延迟,所以Acceptor3返回的是;而Acceptor5的操作和我们在文章第一张图中的Acceptor1的操作相同,他们都是第一次接收到prepare请求。
 然后Proposer2进行第二阶段的操作,从所有的返回数据中,找到accepted_epoch最大的那个accepted_value.这里就是Acceptor返回的<1,v1>,所以,Proposer2会尽力让V1成为确定值,所以它向Acceptor发送accept(2,V1)的请求。然后Acceptor1,Acceptor3,Acceptor5三个Acceptor接受了这个accept请求,更新自己的。此时,已经有三个acceptor形成了一致性的值,所以V1就成了整个系统的确定性取值。
paxos7.png

 那么Proposer1对Acceptor3发送的accept请求在此时达到Acceptor3会怎么样呢?Acceptor3发现当前lastest_prepared_epoch是2,所以直接拒绝了这个请求。

后记

 不清楚大家现在对Paxos算法的过程是否已经有了清楚的了解啊?那么我来问几个问题,大家可以考虑一下:

  • 在本文的情景下,假如Proposer2向Acceptor2,3,4发送了prepare请求,而不是向Acceptor1,3,5发送的请求,会怎么样呢?
  • 为什么强调prepare阶段时必须接受到一般以上Acceptor的返回,才能进行第二阶段?
     后续希望能够分析一下Zookeeper关于Paxos的具体使用场景和算法,希望大家多多关注。

 今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供的队列式的同步器,比如说常用的ReentranLock,SemaphoreCountDownLatch等.
 为了方便理解,我们以一段使用ReentranLock的代码为例,讲解ReentranLock每个方法中有关AQS的使用.

ReentranLock示例

 我们都知道ReentranLock的加锁行为和Synchronized类似,都是可重入的锁,但是二者的实现方式确实完全不同的,我们之后也会讲解Synchronized的原理.除此之外,Synchronized的阻塞无法被中断,而ReentrantLock则提供了可中断的阻塞下面的代码是ReentranLock的相关API,我们就以此为顺序,依次讲解.

1
2
3
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

公平锁和非公平锁

 ReentranLock分为公平锁和非公平锁.二者的区别就在获取锁是否和排队顺序相关.我们都知道,如果当前锁被另一个线程持有,那么当前申请锁的线程会被挂起等待,然后加入一个等待队列里.理论上,先调用lock函数被挂起等待的线程应该排在等待队列的前端,后调用的就排在后边.如果此时,锁被释放,需要通知等待线程再次尝试获取锁,公平锁会让最先进入队列的线程获得锁,而非公平锁则会唤醒所有线程,让它们再次尝试获取锁,所以可能会导致后来的线程先获得了锁,则就是非公平.

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

 我们会发现FairSyncNonfairSync都继承了Sync类,而Sync的父类就是AbstractQueuedSynchronizer.但是AQS的构造函数是空的,并没有任何操作.
 之后的源码分析,如果没有特别说明,就是指公平锁.

lock操作

ReentranLocklock函数如下所示,直接调用了synclock函数.也就是调用了FairSynclock函数.

1
2
3
4
5
6
7
8
//ReentranLock
public void lock() {
sync.lock();
}
//FairSync
final void lock() {
acquire(1);//调用了AQS的acquire函数,这是关键函数之一
}

 好,我们接下来就正式开始AQS相关的源码分析了,acquire函数你可以将其理解为获取一个同一时间只能有一个函数获取的量,这个量就是锁概念的抽象化.我们先分析代码,你慢慢就会明白其中的含义.

1
2
3
4
5
6
7
public final void acquire(int arg) {
if (!tryAcquire(arg) && //tryAcquire先尝试获取"锁",//如果成功,直接返回,失败继续执行后续代码
//addWaiter是给当前线程创建一个节点,并将其加入等待队列
//acquireQueued是当线程已经加入等待队列之后的行为.
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire,addWaiteracquireQueued都是十分重要的函数,我们接下来依次学习一下这些函数,理解它们的作用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//AQS类中的变量.
private volatile int state;
//这是FairSync的实现,AQS中未实现,子类按照自己的需要实现该类
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS中的state变量,代表抽象概念的锁.
int c = getState();
if (c == 0) { //值为0,那么当前独占性变量还未被线程占有
if (!hasQueuedPredecessors() && //如果当前阻塞队列上没有先来的线程在等待,UnfairSync这里的实现就不一致
compareAndSetState(0, acquires)) {
//成功cas,那么代表当前线程获得该变量的所有权,也就是说成功获得锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果该线程已经获取了独占性变量的所有权,那么根据重入性
//原理,将state值进行加1,表示多次lock
//由于已经获得锁,该段代码只会被一个线程同时执行,所以不需要
//进行任何并行处理
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//上述情况都不符合,说明获取锁失败
return false;
}

 由上述代码我们可以发现,tryAcquire就是尝试获取那个线程独占的变量state.state的值表示其状态:如果是0,那么当前还没有线程独占此变量;否在就是已经有线程独占了这个变量,也就是代表已经有线程获得了锁.但是这个时候要再进行一次判断,看是否是当前线程自己获得的这个锁,如果是,那么就增加state的值.

ReentranLock获得锁

 这里有几点需要说明一下,首先是compareAndSetState函数,这是使用CAS操作来设置state的值,而且state值设置了volatile修饰符,通过这两点来确保修改state的值不会出现多线程问题.然后是公平锁和非公平锁的区别问题,在UnfairSyncnonfairTryAcquire函数中不会在相同的位置上调用hasQueuedPredecessors来判断当前是否已经有线程在排队等待获得锁.
 如果tryAcquire返回true,那么就是获取锁成功,如果返回false,那么就是未获得锁.需要加入阻塞等待队列.我们下面就来看一下addWaiter的相关操作

等待锁的阻塞队列

 将保存当前线程信息的节点加入到等待队列的相关函数中涉及到了无锁队列的相关算法,由于在AQS中只是将节点添加到队尾,使用到的无锁算法也相对简单.真正的无锁队列的算法我们等到分析ConcurrentSkippedListMap时在进行讲解.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//先使用快速如列法来尝试一下,如果失败,则进行更加完备的如列算法.
Node pred = tail;//列尾指针
if (pred != null) {
node.prev = pred; //步骤1:该节点的前趋指针指向tail
if (compareAndSetTail(pred, node)){ //步骤二:cas将尾指针指向该节点
pred.next = node;//步骤三:如果成果,让旧列尾节点的next指针指向该节点.
return node;
}
}
//cas失败,或在pred == null时调用enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //cas无锁算法的标准for循环,不停的尝试
Node t = tail;
if (t == null) { //初始化
if (compareAndSetHead(new Node())) //需要注意的是head是一个哨兵的作用,并不代表某个要获取锁的线程节点
tail = head;
} else {
//和addWaiter中一致,不过有了外侧的无限循环,不停的尝试,自旋锁
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

 通过调用addWaiter函数,AQS将当前线程加入到了等待队列,但是还没有阻塞当前线程的执行,接下来我们就来分析一下acquireQueued函数.

等待队列节点的操作

 由于进入阻塞状态的操作会降低执行效率,所以,AQS会尽力避免试图获取独占性变量的线程进入阻塞状态.所以,当线程加入等待队列之后,acquireQueued会执行一个for循环,每次都判断当前节点是否应该获得这个变量(在队首了),如果不应该获取或在再次尝试获取失败,那么就调用shouldParkAfterFailedAcquire判断是否应该进入阻塞状态,如果当前节点之前的节点已经进入阻塞状态了,那么就可以判定当前节点不可能获取到锁,为了防止CPU不停的执行for循环,消耗CPU资源,调用parkAndCheckInterrupt函数来进入阻塞状态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //一直执行,知道获取锁,返回.
final Node p = node.predecessor();
//node的前驱是head,就说明,node是将要获取锁的下一个节点.
if (p == head && tryAcquire(arg)) { //所以再次尝试获取独占性变量
setHead(node); //如果成果,那么就将自己设置为head
p.next = null; // help GC
failed = false;
return interrupted;//此时,还没有进入阻塞状态,所以直接返回false,表示不需要中断
}
//判断是否要进入阻塞状态.如果`shouldParkAfterFailedAcquire`返回true,表示需要进入阻塞
//调用parkAndCheckInterrupt,否在表示还可以再次尝试获取锁,继续进行for循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//调用parkAndCheckInterrupt进行阻塞,然后返回是否为中断状态
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //前一个节点在等待独占性变量释放的通知,所以,当前节点可以阻塞
return true;
if (ws > 0) { //前一个节点处于取消获取独占性变量的状态,所以,可以跳过去
//返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将上一个节点的状态设置为signal,返回false,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //将AQS对象自己传入
return Thread.interrupted();
}

阻塞和中断

 由上述分析,我们知道了AQS通过调用LockSupportpark方法来执行阻塞当前进程的操作.其实,这里的阻塞就是线程不再执行的含义.通过调用这个函数,线程进入阻塞状态,上述的lock操作也就阻塞了.等待中断或在独占性变量被释放.

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);//设置阻塞对象,用来记录线程被谁阻塞的,用于线程监控和分析工具来定位
UNSAFE.park(false, 0L);//让当前线程不再被线程调度,就是当前线程不再执行.
setBlocker(t, null);
}

 关于中断的相关知识,我们以后再说,就继续沿着AQS的主线,看一下释放独占性变量的相关操作吧.

ReentrantLock未获得阻塞,加入队列

unlock操作

 与lock操作类似,unlock操作调用了AQSrelase方法,参数和调用acquire时一样,都是1.

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { //释放独占性变量,起始就是将status的值减1,因为acquire时是加1
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒head的后继节点
return true;
}
return false;
}

 由上述代码可知,release就是先调用tryRelease来释放独占性变量,如果成果,那么就看一下是否有等待锁的阻塞线程,如果有,就调用unparkSuccessor来唤醒他们.

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) {
//由于只有一个线程可以获得独占先变量,所以,所有操作不需要考虑多线程
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果等于0,那么说明锁应该被释放了,否在表示当前线程有多次lock操作.
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

 我们可以看到tryRelease中的逻辑也体现了可重入锁的概念,只有等到state的值为1时,才代表锁真正被释放了.所以独占性变量state的值就代表锁的有无.当state=0时,表示锁未被占有,否在表示当前锁已经被占有.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void unparkSuccessor(Node node) {
.....
//一般来说,需要唤醒的线程就是head的下一个节点,但是如果它获取锁的操作被取消,或在节点为null时
//就直接继续往后遍历,找到第一个未取消的后继节点.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

 调用了unpark方法后,进行lock操作被阻塞的线程就恢复到运行状态,就会再次执行acquireQueued中的无限for循环中的操作,再次尝试获取锁.

ReentrantLock释放锁并通知阻塞线程恢复执行

后记

 有关AQSReentrantLock的分析就差不多结束了,不得不说,我第一次看到AQS的实现时真是震惊,以前都认为SynchronizedReentrantLock的实现原理是一致的,都是依靠java虚拟机的功能实现的,没有想到还有AQS这样一个背后大Boss在提供帮助啊.学习了这个类的原理,我们对JUC的很多类的分析就简单了很多,此外,AQS涉及的CAS操作和无锁队列的算法也为我们学习其他无锁算法提供了基础.知识的海洋是无限的啊!

上一篇文章我们主要讲解了Netty的ChannelPipeline,了解到不同的Channel可以提供基于不同网络协议的通信处理.既然涉及到网络通信,就不得不说一下多线程,同步异步相关的知识了.Netty的网络模型是多线程的Reactor模式,所有I/O请求都是异步调用,我们今天就来探讨一下一些基础概念和Java NIO的底层机制.
 为了节约你的时间,本文主要内容如下:

  • 异步,阻塞的概念
  • 操作系统I/O的类型
  • Java NIO的Linux底层实现

异步,同步,阻塞,非阻塞

同步和异步关注的是消息通信机制,所谓同步就是调用者进行调用后,在没有得到结果之前,该调用一直不会返回,但是一旦调用返回,就得到了返回值,同步就是指调用者主动等待调用结果;而异步则相反,执行调用之后直接返回,所以可能没有返回值,等到有返回值时,由被调用者通过状态,通知来通知调用者.异步就是指被调用者来通知调用者调用结果就绪*所以,二者在消息通信机制上有所不同,一个是调用者检查调用结果是否就绪,一个是被调用者通知调用者结果就绪
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.阻塞调用是指在调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会继续执行.非阻塞调用是指在不能立刻得到结构之前,调用线程不会被挂起,还是可以执行其他事情.
 两组概念相互组合就有四种情况,分别是同步阻塞,同步非阻塞,异步阻塞,异步非阻塞.我们来举个例子来分别类比上诉四种情况.
 比如你要从网上下载一个1G的文件,按下下载按钮之后,如果你一直在电脑旁边,等待下载结束,这种情况就是同步阻塞;如果你不需要一直呆在电脑旁边,你可以去看一会书,但是你还是隔一段时间来查看一下下载进度,这种情况就是同步非阻塞;如果你一直在电脑旁边,但是下载器在下载结束之后会响起音乐来提醒你,这就是异步阻塞;但是如果你不呆在电脑旁边,去看书,下载器下载结束后响起音乐来提醒你,那么这种情况就是异步非阻塞.

Unix的I/O类型

 知道上述两组概念之后,我们来看一下Unix下可用的5种I/O模型:

  • 阻塞I/O
  • 非阻塞I/O
  • 多路复用I/O
  • 信号驱动I/O
  • 异步I/O

 前4种都是同步,只有最后一种是异步I/O.需要注意的是Java NIO依赖于Unix系统的多路复用I/O,对于I/O操作来说,它是同步I/O,但是对于编程模型来说,它是异步网络调用.下面我们就以系统read的调用来介绍不同的I/O类型.
 当一个read发生时,它会经历两个阶段:

  • 1 等待数据准备
  • 2 将数据从内核内存空间拷贝到进程内存空间中

 不同的I/O类型,在这两个阶段中有不同的行为.但是由于这块内容比较多,而且多为表述性的知识,所以这里我们只给出几张图片来解释,具体解释大家可以参看这篇博文

Blocking I/O

NonBlocking I/O

Multiplexing I/O

Asynchronous I/O

对比

Java NIO的Linux底层实现

 我们都知道Netty通过JNI的方式提供了Native Socket Transport,为什么Netty要提供自己的Native版本的NIO呢?明明Java NIO底层也是基于epoll调用(最新的版本)的.这里,我们先不明说,大家想一想可能的情况.下列的源码都来自于OpenJDK-8u40-b25版本.

open方法

 如果我们顺着Selector.open()方法一个类一个类的找下去,很容易就发现Selector的初始化是由DefaultSelectorProvider根据不同操作系统平台生成的不同的SelectorProvider,对于Linux系统,它会生成EPollSelectorProvider实例,而这个实例会生成EPollSelectorImpl作为最终的Selector实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class EPollSelectorImpl extends SelectorImpl
{
.....
// The poll object
EPollArrayWrapper pollWrapper;
.....
EPollSelectorImpl(SelectorProvider sp) throws IOException {
.....
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
.....
}
.....
}

EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用.

1
2
3
4
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;

 上述三个native方法就对应Linux下epoll相关的三个系统调用

1
2
3
4
5
6
//创建一个epoll句柄,size是这个监听的数目的最大值.
int epoll_create(int size);
//事件注册函数,告诉内核epoll监听什么类型的事件,参数是感兴趣的事件类型,回调和监听的fd
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//等待事件的产生,类似于select调用,events参数用来从内核得到事件的集合
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

 所以,我们会发现在EpollArrayWapper的构造函数中调用了epollCreate方法,创建了一个epoll的句柄.这样,Selector对象就算创造完毕了.

register方法

 与open类似,ServerSocketChannelregister函数底层是调用了SelectorImpl类的register方法,这个SelectorImpl就是EPollSelectorImpl的父类.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)

{

if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
//生成SelectorKey来存储到hashmap中,一共之后获取
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
//attach用户想要存储的对象
k.attach(attachment);
//调用子类的implRegister方法
synchronized (publicKeys) {
implRegister(k);
}
//设置关注的option
k.interestOps(ops);
return k;
}

EpollSelectorImpl的相应的方法实现如下,它调用了EPollArrayWrapperadd方法,记录下Channel所对应的fd值,然后将ski添加到keys变量中.在EPollArrayWrapper中有一个byte数组eventLow记录所有的channel的fd值.

1
2
3
4
5
6
7
8
9
10
11
12
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
//获取Channel所对应的fd,因为在linux下socket会被当作一个文件,也会有fd
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
//调用pollWrapper的add方法,将channel的fd添加到监控列表中
pollWrapper.add(fd);
//保存到HashSet中,keys是SelectorImpl的成员变量
keys.add(ski);
}

 我们会发现,调用register方法并没有涉及到EpollArrayWrapper中的native方法epollCtl的调用,这是因为他们将这个方法的调用推迟到Select方法中去了.

Select方法

 和register方法类似,SelectorImpl中的select方法最终调用了其子类EpollSelectorImpldoSelect方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected int doSelect(long timeout) throws IOException {
.....
try {
....
//调用了poll方法,底层调用了native的epollCtl和epollWait方法
pollWrapper.poll(timeout);
} finally {
....
}
....
//更新selectedKeys,为之后的selectedKeys函数做准备
int numKeysUpdated = updateSelectedKeys();
....
return numKeysUpdated;
}

 由上述的代码,可以看到,EPollSelectorImpl先调用EPollArrayWapperpoll方法,然后在更新SelectedKeys.其中poll方法会先调用epollCtl来注册先前在register方法中保存的Channel的fd和感兴趣的事件类型,然后epollWait方法等待感兴趣事件的生成,导致线程阻塞.

1
2
3
4
5
6
7
int poll(long timeout) throws IOException {
updateRegistrations(); ////先调用epollCtl,更新关注的事件类型
////导致阻塞,等待事件产生
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
.....
return updated;
}

 等待关注的事件产生之后(或在等待时间超过预先设置的最大时间),epollWait函数就会返回.select函数从阻塞状态恢复.

selectedKeys方法

 我们先来看SelectorImpl中的selectedKeys方法.

1
2
3
4
5
6
//是通过Util.ungrowableSet生成的,不能添加,只能减少
private Set<SelectionKey> publicSelectedKeys;
public Set<SelectionKey> selectedKeys() {
....
return publicSelectedKeys;
}

 很奇怪啊,怎麽直接就返回publicSelectedKeys了,难道在select函数的执行过程中有修改过这个变量吗?
publicSelectedKeys这个对象其实是selectedKeys变量的一份副本,你可以在SelectorImpl的构造函数中找到它们俩的关系,我们再回头看一下selectupdateSelectedKeys方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private int updateSelectedKeys() {
//更新了的keys的个数,或在说是产生的事件的个数
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
//对应的channel的fd
int nextFD = pollWrapper.getDescriptor(i);
//通过fd找到对应的SelectionKey
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
//更新selectedKey变量,并通知响应的channel来做响应的处理
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}

后记

 看到这里,详细大家都已经了解到了NIO的底层实现了吧.这里我想在说两个问题.
 一是为什么Netty自己又从新实现了一边native相关的NIO底层方法? 听听Netty的创始人是怎麽说的吧链接
 二是看这么多源码,花费这么多时间有什么作用呢?我感觉如果从非功利的角度来看,那么就是纯粹的希望了解的更多,有时候看完源码或在理解了底层原理之后,都会用一种恍然大悟的感觉,比如说AQS的原理.如果从目的性的角度来看,那么就是你知道底层原理之后,你的把握性就更强了,如果出了问题,你可以更快的找出来,并且解决.除此之外,你还可以按照具体的现实情况,以源码为模板在自己造轮子,实现一个更加符合你当前需求的版本.
 后续如果有时间,我希望好好了解一下epoll的操作系统级别的实现原理.

 本文主要讲述Netty框架中Channel相关的知识,Netty通过Channel和Pipeline等一些组件提供了所谓的Universal Communication API.与Channel相关的知识点比较多,本篇文章就主要讲解一下ChannelPipeline的事件处理流原理.Channel,EventLoopChannelFuture的相关知识下篇文章中再进行讲述.

官方文档上的Channel

 官方文档上给出的解释是Channel是与网络Socket相关的或具有一定I/O能力的组件.一个Channel可以给用户提供:

  • 当前Channel的状态(比如,是否保存Open状态,是否处于连接状态)
  • Channel的配置参数,比如说buffer的大小
  • Channel支持的相关I/O操作,比如说read,write,connectbind
  • 提供一个ChannelPipeline来处理所有与该Channel相关的I/O事件和请求

Channel上进行的所有I/O操作都是异步的,也就是说,所有涉及I/O操作的调用都会立刻返回,并不保证操作完成,而是会返回一个ChannelFuture对象来通知你操作是否完成.
 Channel是有层级的,这样的话,你就可以很方便的利用其他已有的Channel来构建自己需要的channel,比如说基于SocketChannel来实现关于SSHChannel

 此外,当你完成某些操作之后调用close()或在close(ChannelPromise)是非常重要的,这样能确保你正确的释放了所有资源.

我眼中的Channel

 首先,我们应该都知道Netty支持很多I/O通信协议:

  • 基于TCP的NIO: NioServerSocketChannel,NioSocketChannel
  • 基于UDP的NIO:NioDatagramChannel
  • 基于TCP的OIO:OioSocketChannelOioServerSocketChannel
  • 基于UDP的OIO:OioDatagramChannel
    如果把关于Channel的类图列出来的话,你会发现支持各种协议的Channel,不信你就看一下这个类图.

 这样想一下,Channel不就是Netty框架用来封装不同协议逻辑的组件吗?,有了Channel的存在,所有于通信协议相关的逻辑都隐藏在不同的Channel实现里,然后在对外提供相对统一的API.
 说道这里,你可能还不知道,即使是OIOChannel,它提供的I/O操作也是异步的.也就是说在Netty框架中,不论是OIO还是NIO模型,读写都会阻塞.这样也体现了Universal Communication API的思想,这就使得我们切换Channel非常方便.我们只要初始化不同的Channel即可.
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(OioServerSocketChannel.class)
 有关Channel协议相关的底层知识,我们会在下一篇文章时进行介绍.

ChannelHandler和ChannelPipeline

 只有Channel的支持,还不足以实现Universal Communication API,还需要上述两个类来提供ChannelHandler的编程模式,基于ChannelHandler来开发业务逻辑,而不需要考虑网络通讯方面的事情.
 Netty源码中一张图形象的描述了这个机制
示意图

 Netty中的ChannelPipeline包含两条线路:Upstream和Downstream.Upstream对应上行,接受信息,被动的状态改变,都属于Upstream.Downstream则对应下行,发送消息,主动状态的改变.Upstream对应InBound Handler,Downstream对应Outbound Handler.从Netty内部IO线程接读到IO数据,依次经过N个Handler到达最内部的逻辑处理单元,这种称之为Inbound Handler;从Channel发出IO请求,依次经过M个Handler到达Netty内部IO线程,这种称之为Outbound Handler
 需要注意的是,这个Handler链中消息或在事件不会自动的向下或在向上流动或转发,而是需要由上一个Handler显示的调用ChannelPipeline.sendUp(down)stream来交给下一个Handler来处理.也就是说,每个Handler接受到一个ChannelEvent,处理结束后,如果需要继续处理,那么它需要向下一个或在上一个Handler发起一个事件.