我研究和阅读Spring Cloud Stream源码已经有一个多月了,但是由于自己的Spring基础知识不是很充足,所以导致很多地方都没有融会贯通,并且相关的文章一直无从下手。于是我先整理了当时阅读代码时的知识点记录,算是源码分析之前的基础知识储备吧,整理的有些杂乱,希望大家理解。
 本文涉及的Spring知识如下:

  • Spring Boot的@Import用法和原理,与ConfigurationImportBeanDefinitionRegistrar相关
  • Bean初始化各个周期的回调,比如InitializingBean,BeanPostProcessor,SmartInitializingSingleton
  • FactoryBeanMethodInterceptor
  • Aware系列回调
  • LifecycleSmartLifecycleDefaultLifecycleProcessor

BeanDefinitionRegistryPostProcessor

BeanDefinitionRegistryPostProcessor实现了BeanFactoryPostProcessor接口,是Spring框架的BeanDefinitionRegistry的后处理器,用来注册额外的BeanDefinitionpostProcessBeanDefinitionRegistry方法会在所有的BeanDefinition已经被加载了,但是所有的Bean还没有被创建前调用。BeanDefinitionRegistryPostProcessor经常被用来注册BeanFactoryPostProcessorBeanDefinition

ImportBeanDefinitionRegistrar

@Import注解用来支持在Configuration类中引入其他的配置类,包括Configuration类,ImportSelectorImportBeanDefinitionRegistrar的实现类。ImportBeanDefinitionRegistrarConfigurationClassPostProcessor处理Configuration类期间被调用,用来生成该Configuration类所需要的BeanDefinition。而ConfigurationClassPostProcessor正实现了BeanDefinitionRegistryPostProcessor接口。下面我们就来看一下其processConfigBeanDefinitions方法到底是如何处理Configuration类的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void processConfigBeanDefinitions(BeanDefinitionRegistry registry) {
List<BeanDefinitionHolder> configCandidates = new ArrayList<>();
String[] candidateNames = registry.getBeanDefinitionNames();
//第一步:先把所有Configuration的beanDefinition找到。
for (String beanName : candidateNames) {
BeanDefinition beanDef = registry.getBeanDefinition(beanName);
//利用AnnotationMetadata是否有@Configuration这个注解。需要注意的是
//Configuration是一个元注解,它是可以使用在其他注解上的,被这些注解注释的类也被认为是Configuration
if (ConfigurationClassUtils.checkConfigurationClassCandidate(beanDef, this.metadataReaderFactory)) {
configCandidates.add(new BeanDefinitionHolder(beanDef, beanName));
}
}
//第二步:通过Order注解的值来排序,定义了Configuration的先后顺序
configCandidates.sort((bd1, bd2) -> {
int i1 = ConfigurationClassUtils.getOrder(bd1.getBeanDefinition());
int i2 = ConfigurationClassUtils.getOrder(bd2.getBeanDefinition());
return (i1 < i2) ? -1 : (i1 > i2) ? 1 : 0;
});
//..... 此处有省略
ConfigurationClassParser parser = new ConfigurationClassParser(
this.metadataReaderFactory, this.problemReporter, this.environment,
this.resourceLoader, this.componentScanBeanNameGenerator, registry);

Set<BeanDefinitionHolder> candidates = new LinkedHashSet<>(configCandidates);
Set<ConfigurationClass> alreadyParsed = new HashSet<>(configCandidates.size());
do {
//第三步:通过BeanDefinition来读取ConfigurationClass
parser.parse(candidates);
parser.validate();

Set<ConfigurationClass> configClasses = new LinkedHashSet<>(parser.getConfigurationClasses());
configClasses.removeAll(alreadyParsed);

if (this.reader == null) {
this.reader = new ConfigurationClassBeanDefinitionReader(
registry, this.sourceExtractor, this.resourceLoader, this.environment,
this.importBeanNameGenerator, parser.getImportRegistry());
}
//第四步:重点,通过ConfigurationClass来获得BeanDefinition
this.reader.loadBeanDefinitions(configClasses);
alreadyParsed.addAll(configClasses);

candidates.clear();
//第五步:由于在loadBeanDefinitions过程中会向registry中添加BeanDefinition,所以这里需要把新的Definition
//在重新检测一遍,先看是否是Configuration类,如果是的那么还要再进行一次处理。
if (registry.getBeanDefinitionCount() > candidateNames.length) {
//.....此处有省略,大致逻辑就是通过registry多出的BeanDefinition获得新的candidateNames
candidateNames = newCandidateNames;
}
}
while (!candidates.isEmpty());
//.....此处有省略
}

 接着我们直接到ConfigurationClassBeanDefinitionReader类中查看loadBeanDefinition函数的实现。它会调用loadBeanDefinitionsForConfigurationClass函数。在该函数中会处理所有和Configuration相关的BeanDefinition,其中就会调用loadBeanDefinitionsFromRegistrars来通过ImportBeanDefinitionRegistrar加载BeanDefinition
 看到这里,大家可能会有个疑问,多个Configuration和多个ImportBeanDefinitionRegistrar存在的情况下,它们之间的对应关系是如何确定的呢?
ConfigurationClassParser的parse方法会将Configuration类相关的配置信息全部解析出来。我们可以看其doProcessConfigurationClass方法的源码。通过@Import注解将Configuration类和相应的ImportBeanDefinitionRegistrar联系在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
protected final SourceClass doProcessConfigurationClass(ConfigurationClass configClass, SourceClass sourceClass)
throws IOException {

//首先处理内部成员类的情况
processMemberClasses(configClass, sourceClass);

// 处理 @PropertySource 注解
for (AnnotationAttributes propertySource : AnnotationConfigUtils.attributesForRepeatable(
sourceClass.getMetadata(), PropertySources.class,
org.springframework.context.annotation.PropertySource.class)) {
if (this.environment instanceof ConfigurableEnvironment) {
processPropertySource(propertySource);
}
}

// 处理 @ComponentScan 注解
Set<AnnotationAttributes> componentScans = AnnotationConfigUtils.attributesForRepeatable(
sourceClass.getMetadata(), ComponentScans.class, ComponentScan.class);
if (!componentScans.isEmpty() &&
!this.conditionEvaluator.shouldSkip(sourceClass.getMetadata(), ConfigurationPhase.REGISTER_BEAN)) {
for (AnnotationAttributes componentScan : componentScans) {
// The config class is annotated with @ComponentScan -> perform the scan immediately
Set<BeanDefinitionHolder> scannedBeanDefinitions =
this.componentScanParser.parse(componentScan, sourceClass.getMetadata().getClassName());
// Check the set of scanned definitions for any further config classes and parse recursively if needed
for (BeanDefinitionHolder holder : scannedBeanDefinitions) {
if (ConfigurationClassUtils.checkConfigurationClassCandidate(
holder.getBeanDefinition(), this.metadataReaderFactory)) {
parse(holder.getBeanDefinition().getBeanClassName(), holder.getBeanName());
}
}
}
}

// 处理 @Import 注解
processImports(configClass, sourceClass, getImports(sourceClass), true);

// 处理 @ImportResource 注解
if (sourceClass.getMetadata().isAnnotated(ImportResource.class.getName())) {
AnnotationAttributes importResource =
AnnotationConfigUtils.attributesFor(sourceClass.getMetadata(), ImportResource.class);
String[] resources = importResource.getStringArray("locations");
Class<? extends BeanDefinitionReader> readerClass = importResource.getClass("reader");
for (String resource : resources) {
String resolvedResource = this.environment.resolveRequiredPlaceholders(resource);
configClass.addImportedResource(resolvedResource, readerClass);
}
}

// 处理configuration中的 @Bean 函数
Set<MethodMetadata> beanMethods = retrieveBeanMethodMetadata(sourceClass);
for (MethodMetadata methodMetadata : beanMethods) {
configClass.addBeanMethod(new BeanMethod(methodMetadata, configClass));
}
//......有省略
return null;
}

InitializingBean,FactoryBean,MethodInterceptor

 Spring Cloud Stream的BindableProxyFactory类实现了上述接口。

1
BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean

 其中,InitializingBean接口有一个afterPropertiesSet方法,该方法在bean所有的属性都被赋值后调用。bean的属性被初始化是在初始化的时候做的,与BeanPostProcessor结合来看,afterPropertiesSet方法在postProcessBeforeInitializationpostProcessAfterInitialization之间被调用。
 Spring中有两个类型的Bean,普通Bean和工厂Bean。FactoryBean有三个接口,分别是:

  • Object getObject():返回FactoryBean创建的对象实例。
  • boolean isSingleton():表示FactoryBean返回的对象实例是否为单例。
  • Class getObjectType():返回FactoryBean返回的对象类型。
     我们可以看一下BindableProxyFactory的相关实现,这里会和MethodInterceptor配合。MethodInterceptor是AOP相关的接口,用于在调用对象接口时进行切片注入或在直接实现接口。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    @Override
    public synchronized Object getObject() throws Exception {
    //使用AOP的ProxyFactory类,由于该类本身也是先了MethodInterceptor接口
    //所以这样配合使用,直接返回ProxyFactory类。
    if (this.proxy == null) {
    ProxyFactory factory = new ProxyFactory(this.type, this);
    this.proxy = factory.getProxy();
    }
    return this.proxy;
    }

    @Override
    public Class<?> getObjectType() {
    return this.type;
    }

    @Override
    public boolean isSingleton() {
    return true;
    }

BeanPostProcessor,ApplicationContextAware,BeanFactoryAware,SmartInitializingSingleton

 Spring Cloud Stream的StreamListenerAnnotationBeanPostProcessor实现了如下接口

1
2
3
public class StreamListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton,
InitializingBean

BeanPostProcessorbean的后处理器,通过它我们可以在Bean初始化前后进行处理。它的postProcessBeforeInitialization方法在Bean初始化之前被调用,而postProcessAfterInitializationBean初始化后被调用。相关原理涉及到Spring创建Bean的流程,这个之后有时间再研究吧。

Aware系列接口

 Spring中提供了一些Aware相关的接口,像是BeanFactoryAware,ApplicationContextAware等。当一个类实现了这些接口之后,Aware接口的Bean在初始化之后,可以取得相应的资源的实例。比如StreamListenerAnnotationBeanPostProcessor对象就实现了ApplicationContextAwareBeanFactoryAware接口来获取ConfigurableApplicationContextBeanFactory实例。

SmartInitializingSingleton

 当所有的singleton的bean都初始化完成之后才会调用这个接口
afterSingletonsInstantiated函数

SmartLifecycle

 之前介绍的接口都是在Bean的生命周期内的某个阶段中被调用,如果我们希望在容器本身的生命周期事件上做一些事情该怎麽办呢?Spring容器提供了Lifecycle接口。当ApplicationContext接口启动或在关闭时,它会调用本容器内所有的Lifecycle接口。

1
2
3
4
5
6
7
8
9
public interface Lifecycle {
//启动该组件
void start();
//停止组件
void stop();
//查看组件是否正在运行
boolean isRunning();

}

 如果两个对象有依赖关系,希望某一个bean先初始化完成,完成一些工作之后,再初始化另一个bean。在这个场景下,可以使用SmartLifecycle接口,该接口的getPhase方法返回一个整型数字,表明执行顺序。如果其getPhase()方法返回Integer.MIN_VALUE,那么该对象最先启动,最后停止;如果返回Integer.MAX_VALUE,那么该对象最后启动,最先停止。在Spring容器里,有DefaultLifecycleProcessor这个类来处理所有的Lifecycle的bean。在AbstractApplicationContextfinishRefresh函数中会调用到该processer的onRefresh函数,从其调用其本身的startBeans函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();
//遍历所有的Lifecycle,按照phase分成不同的LifecycleGroup
for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {
Lifecycle bean = entry.getValue();
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(entry.getKey(), bean);
}
}
if (!phases.isEmpty()) {
//按照phase排序,然后启动
List<Integer> keys = new ArrayList<Integer>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
phases.get(key).start();
}
}
}

 前些日子在公司接触了spring bootspring cloud,有感于其大大简化了spring的配置过程,十分方便使用者快速构建项目,而且拥有丰富的starter供开发者使用。但是由于其自动化配置的原因,往往导致出现问题,新手无法快速定位问题。这里我就来总结一下spring boot 自定义starter的过程,相信大家看完这篇文章之后,能够对spring boot starter的运行原理有了基本的认识。
 为了节约你的时间,本篇文章的主要内容有:

  • spring boot starter的自定义
  • spring boot auto-configuration的两种方式,spring.factories和注解
  • Conditional注解的使用

引入pom依赖

 相信接触过spring boot的开发者都会被其丰富的starter所吸引,如果你想给项目添加redis支持,你就可以直接引用spring-boot-starter-redis,如果你想使项目微服务化,你可以直接使用spring-cloud-starter-eureka。这些都是spring boot所提供的便利开发者的组件,大家也可以自定义自己的starter并开源出去供开发者使用。
 创建自己的starter项目需要maven依赖是如下所示:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>

核心配置类StorageAutoConfigure

 构建starter的关键是编写一个装配类,这个类可以提供该starter核心bean。这里我们的starter提供一个类似redis的键值存储功能的bean,我们叫它为StorageService。负责对这个bean进行自动化装配的类叫做StorageAutoConfigure。保存application.properties配置信息的类叫做StorageServiceProperties。这三种类像是铁三角一样,你可以在很多的spring-boot-starter中看到他们的身影。
 我们首先来看StorageAutoConfigure的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@ConditionalOnClass(StorageService.class)
@EnableConfigurationProperties(StorageServiceProperties.class)
public class StorageAutoConfigure {
@Autowired
private StorageServiceProperties properties;

@Bean
@ConditionalOnMissingBean(StorageService.class)
@ConditionalOnProperty(prefix = "storage.service", value = "enabled", havingValue = "true")
StorageService exampleService() {
return new StorageService(properties);
}
}

 我们首先讲一下源码中注解的作用。

  • @Configuration,被该注解注释的类会提供一个或则多个@bean修饰的方法并且会被spring容器处理来生成bean definitions
  • @bean注解是必须修饰函数的,该函数可以提供一个bean。而且该函数的函数名必须和bean的名称一致,除了首字母不需要大写。
  • @ConditionalOnClass注解是条件判断的注解,表示对应的类在classpath目录下存在时,才会去解析对应的配置文件。
  • @EnableConfigurationProperties注解给出了该配置类所需要的配置信息类,也就是StorageServiceProperties类,这样spring容器才会去读取配置信息到StorageServiceProperties对象中。
  • @ConditionalOnMissingBean注解也是条件判断的注解,表示如果不存在对应的bean条件才成立,这里就表示如果已经有StorageService的bean了,那么就不再进行该bean的生成。这个注解十分重要,涉及到默认配置和用户自定义配置的原理。也就是说用户可以自定义一个StorageService的bean,这样的话,spring容器就不需要再初始化这个默认的bean了。
  • ConditionalOnProperty注解是条件判断的注解,表示如果配置文件中的响应配置项数值为true,才会对该bean进行初始化。

 看到这里,大家大概都明白了StorageAutoConfigure的作用了吧,spring容器会读取相应的配置信息到StorageServiceProperties中,然后依据调节判断初始化StorageService这个bean。集成了该starter的项目就可以直接使用StorageService来存储键值信息了。

配置信息类StorageServiceProperties

 存储配置信息的类StorageServiceProperties很简单,源码如下所示:

1
2
3
4
5
6
7
8
9
@ConfigurationProperties("storage.service")
public class StorageServiceProperties {
private String username;
private String password;
private String url;

......
//一系列的getter和setter函数
}

@ConfigurationProperties注解就是让spring容器知道该配置类的配置项前缀是什么,上述的源码给出的配置信息项有storage.service.username,storage.service.passwordstorage.service.url,类似于数据库的host和用户名密码。这些配置信息都会由spring容器从application.properties文件中读取出来设置到该类中。

starter提供功能的StorageService

StorageService类是提供整个starter的核心功能的类,也就是提供键值存储的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class StorageService {
private Logger logger = LoggerFactory.getLogger(StorageService.class);
private String url;
private String username;
private String password;
private HashMap<String, Object> storage = new HashMap<String, Object>();
public StorageService(StorageServiceProperties properties) {
super();
this.url = properties.getUrl();
this.username = properties.getUsername();
this.password = properties.getPassword();
logger.debug("init storage with url " + url + " name: " + username + " password: " + password);
}


public void put(String key, Object val) {
storage.put(key, val);
}

public Object get(String key) {
return storage.get(key);
}
}

注解配置和spring.factories

 自定义的starter有两种方式来通知spring容器导入自己的auto-configuration类,也就是本文当中的StorageAutoConfigure类。
 一般都是在starter项目的resources/META-INF文件夹下的spring.factories文件中加入需要自动化配置类的全限定名称。

1
org.springframework.boot.autoconfigure.EnableAutoConfiguration=starter.StorageAutoConfigure

spring boot项目中的EnableAutoConfigurationImportSelector会自动去每个jar的相应文件下查看spring.factories文件内容,并将其中的类加载出来在auto-configuration过程中进行配置。而EnableAutoConfigurationImportSelector@EnableAutoConfiguration注解中被import
 第一种方法只要是引入该starter,那么spring.factories中的auto-configuration类就会被装载,但是如果你希望有更加灵活的方式,那么就使用自定义注解来引入装配类。

1
2
3
4
5
6
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(StorageAutoConfigure.class)
@Documented
public @interface EnableStorage {
}

 有了这个注解,你可以在你引入该starter的项目中使用该注解,通过@import注解,spring容器会自动加载StorageAutoConfigure并自动化进行配置。

后记

 上述只是关于spring boot starter最为简单的定制和原理分析,后续我准备研究一下spring cloud stream的源码,主要是因为工作上一直在使用这个框架。请大家继续关注。

 前段时间一直在学习mit的分布式课程Distributed Systems,仔细阅读了raft论文,但是中间又跑去搞docker了,所以一直没有整理raft相关的文章,今天就来总结一下。
 文章中没有多少详细的图片,但是大家可以边看文章边看Raft演示动画
 之前介绍的Paxos算法一直都是分布式一致性协议的标准,但是Paxos难以理解,更难以理解。于是Stanford的教授提出了Raft协议,它是一个为真实世界应用建立的协议,主要注重协议的落地性和可理解性。这里有Raft的论文,大家有兴趣可以自行阅读一下。
 Raft是为了managing a replicated log。Raft会首先选举一个leader,然后让这个leader来管理replicated log。Raft将consensus问题(也就是一致性问题)划分成三个相互独立的子问题:

  • leader election
  • log replication
  • safety

Raft basis

 任何时间每个server都处于下列三个状态之一:leader,follower,或在candidate之一。在正常状态下,整个集群只会有一个leader并且其他所有server都处于followers状态下。followers是被动的,它们只会对leader的request进行反应。第三个状态candidate是用来选举新的leader的。
 Raft以Term来划分运行时间,你可以将其理解为任期。Term以连续的整数来命名,每个Term都以一个election开始。在一次选举中,一个或多个candidate试图成为leader。如果一个candidate赢得了election,那么它就成为leader。如果一次election中没有candidate获胜,那么就进行下一个Term,重新进行election。每个Term最多只有一leader,否则进入下一个Term,这样Term就可以作为一个logical clock。
 Raft服务器通过RPC来交互,只需要两个RPC操作,RequestVote RPCs and AppendEntries RPCs。RequestVote用于选举而AppendEntries用于leader发送请求进行relicate log entries和心跳。

Leader election

 Raft通过心跳机制来触发leader selection。当一个服务器启动时,默认位于followers状态,并且一直持续知道它一直接受到leader的RPC请求。leader会周期性发送心跳给所有的followers。如果follower一段时间内没有接受到心跳,那么就认为当前没有leader应该开始leader selection。
 开始election后,server将其Term进行加一,然后转变成candidate状态,并且给其他所有server发送RequestVote RPC请求来进行vote。这个过程一直持续到:server自己赢得election,其他的server赢得election,或者这个Term期间没有server获胜,进入下一个Term。
 candidate收到半数以上server的vote就赢得了election。每个server在一个Term中只会vote一次。server基于first-come-and-first-serve的规则来进行投票。一旦某个candidate赢得了election, 就变成了leader,并且开始周期性发送心跳。
 当等待投票时,candidate受到了其他candidate发送的AppendEntries PRC请求,如果candiate发现在包含在请求当中的Term数值大于或则等于自己的Term数值,那么该candiate主动退回到follower状态,否在拒绝该请求,继续保持candidate状态。
 当很多server变成candidate状态进行election时,选举失败的可能性就很高了。那么每个candiate会推迟随机时间之后进入下一个Term并进行新的election。以此来避免大量的选举失败的情况发生。

Log replication

 一旦一个leader被选举成功,它就开始处理client请求。每个client请求都包含一个需要被replicated state machine处理的命令,leader将这些命令当作一个新的entry添加到log中。然后给follower发送AppendEntries RPCs请求来复制这个log entry。当一个entry被safely relicated(在下一小结中会讲解),leader就会将entry交给state machine进行执行,并且将结果返回。
 当一个log entry可以被安全的交给state machine处理时,我们认为它是committed的。Raft保证所有committed的log entry一定是持久化的,并且一定被state machine执行。Log entry是committed一旦该entry在大多数follower上被replicated。一旦一个entry被committed,那么在它之前的所有log也是committed的。Leader会随时关注最大的committed的log的index,并在AppendEntries RPCs请求中携带该信息,这样follower就能知道哪些entry被committed,它们就会将其提交给自己的state machine来执行。
 当followers crash或则网络丢包时,leader会一直发送AppendEntries RPCs直到所有followers都存储了entry。

log entry的排列

 每个Log entry都有其唯一标识,entry中包括了 leader Term,index和要执行的comand。index是指entry在Log中的位置。Raft通过Log Machine Property来维护Log的合理性:

  • 如果两个entries在不同的logs中(存储在不同的server上)拥有相同的index和term,那么他们包含相同的command。
  • 如果两个entries在不同的logs中拥有相同的index和term,那么他们之前的entries也都是一致或在内容相同的。
     第一条规定保证leader每个Term中的每个index最多只能创建一个entry。而第二条规定使得followers在处理AppendEntries RPCs请求时要进行一致性检测。leader在AppendEntries请求中带上了自己logs中排在新entry之前的那个entry的index和term,如果follower在自己的logs中找不到该entry,那么就拒绝添加new entry。这样就保证了第二条规定不会被违反。
     正常情况下,leader和followers的logs都是一致的,但是当一系列的leader crash,followers crash和election之后,followers的logs可能会被当前leader的logs多出一些entry,也可能会少一些entry。在Raft中,leader通过强迫followers的logs复制leader的logs来保持一致性。这就意味着follower logs中的冲突的entry会被重写。

leader和follower的logs冲突

 为了一致化logs,leader的logs需要和follower的logs进行对比,找出它们之间最后一条相同的entry。然后将follower logs中那条entry之后的所有entry删除,并发送leader logs中那条entry之后的entry给follower。这些行为都发生在AppendEntries RPCs的一致性检查过程中。
 leader会每个follower维护一个nextIndex来记录发送给这个follower的下一条log entry的index。nextIndex初始化为leader logs的最后一条entry之后的index。如果follower的logs和leader的logs不一致,那么AppendEntries RPCs的一致性检查就会失败。leader发现自己的请求被follower拒绝了,那么就减少该follower的nextIndex然后再次发送AppendEntries请求。最终nextIndex就会变成二者log中最后一个一致的entry的index。当上述情况发生之后,AppendEntries请求就会成功,就会删除follower中多的entry和添加缺少的entry。

Safety

 这一小节主要描述在leader election过程中的一些限定。这些限定保证任何一个Term的leader的logs都包含了之前Term中所有committed的entry。这也是所谓的Leader Completeness Property。

Election限制

 Raft规定:在election过程中,new leader本身必须有之前Term中所有committed的log entry。也就是说每次election成功的leader必然包含之前所有的committed的log entry。这样保证了log的单向流动,一定是从leader到follower。
 Raft通过election vote过程来保证上述限制。一个candidate必须得到集群中多于半数的server的vote,而每个committed的log entry一定也会存在于多于半数的server的logs中。也就是说在RequestVote RPC中包含了candidate自己logs中最后一个committed的log信息,接受到该请求的server会将其和自己log中最后一个committed的log进行对比,如果自己的log晚于candiate的,那么就同意该candiate成为leader,否在拒绝。这样的话,没有包含所有committed log entry的candidate就一定不会得到超过半数的server的vote。Raft根据entry的term和index来确定每个entry的先后顺序。较大term的log entry比较新,如果log entry的term一致,那就是越大的index约新。

Committing entries from previous terms

 如果旧的leader在committing an entry时crash了,那么新的leader是否需要重新commit这个entry呢?但是为了简化,Raft重来不会提交之前Term的log entry。没有被committed的log entry就会被重写。

Followers and candidate crashs

 如果followers或在candidate在接受到RPC之前crash,leader会一直重试发送RPC。如果是在接受处理之后crash,没有发送回复,leader也是会重复发送RPC,但是因为RPC都是幂等的,所以不会造成额外的影响。

后记

 Raft的应用十分广泛,比如etcd项目就是使用Raft来保证分布式一致性的,之后我也想去研究一下etcd中Raft的实现,毕竟之前都是理论。

 学习完Docker之后,发现了kubernetes这个容器云框架,于是就自己部署来玩玩。大家也可以按照这个和我一步步部署 kubernetes 集群文章来部署。最近在这里花费了大量的时间,之后希望整理一下相关的原理介绍。

kuber1.png

kube3.png

问题列表和解决方案

 Devops的概念已经火了很久了,我一直想对这方面进行一定的了解;再加上实验室项目环境依赖比较复杂,希望使用Docker来解决,所以最近就好好研究了一波Docker的相关实践和原理。这里整理一下,希望组成一个系列,从实践到原理详细讲解一下Docker的使用。
 第一篇就讲一下Jenkins+Docker的自动化部署实践。大致的流程如下:目前我有两个服务器,分别是阿里云和bandwagon,代码存储在github上,每次push都会触发阿里云上的jenkins的构建任务,jenkins将github上的代码fetch到本地,编译打包成war文件,生成docker image并上传到docker registry上,然后通过ssh来登录bandwagon服务器pull下来新生成的image并启动。由于篇幅问题,本篇文章不会介绍有关docker image的build和docker registry的搭建,但是我会在后续文章中再做详细讲解。
 学习Docker,我推荐先在网络上找说明指南,一步一步自己尝试的使用,然后如果觉得有必要可以看一下《Docker容器和容器云》这本书。
 本文内容都是docker和jenkins的基础知识,为了节约你的时间,本文的主要内容如下:

  • docker 基础命令
  • jenkins docker版本的搭建,构建任务的配置
  • Pubish Over SSH 安装和配置
  • 通过github的webhook来触发jenkins构建任务

Docker运行jenkins

 Docker如此火爆的一个原因是因为它形成了一个良好的生态圈,基本上主流的软件应用都有相应的Docker image。如果大家不清楚Docker image的含义,建议大家看一下Docker中文指南,我们可以通过docker pull命令来下载响应的image,然后运行。比如我们希望在阿里云服务器上部署一个jenkins应用,首先可以执行下列语句来获取一个jenkins的image。

1
docker pull jenkinsci/jenkins:lts

 这里我们使用pull从docker registry上拉取image,但是目前业界上有很多共有或在私有的docker registry,比如说docker hub和daoCloud。所以image的全称就由三部分组成:域名或在ip + / + 软件名称 + : + 版本号,所以上边的这条命令就是让docker去jenkinsci这个Jenkins机构自己部署的registry上下载jenkins的lts版本的image.你也可以直接使用docker pull jenkins来下载image,但docker会默认的从docker hub上下载jenkins的laster版本。

 下载成功之后,你可以使用docker images命令来查看当前下载的image信息

 你可以通过docker run命令来运行docker容器,请注意我这里的用词,在Docker中image和container是不同的概念,你可以将他们简单的理解成Java中类和对象的关系。我们使用下面的命令来启动这个jenkins容器。

1
sudo docker run -d --name jenkins -p 9090:8080 -v /var/jenkins_home:/var/jenkins_home jenkinsci/jenkins:lts

 我们来依次讲解一下run命令的几个参数把:

  • -d 后台运行docker容器并打印容器ID。如果不加-d参数,那么容器运行会和终端绑定,如果终端关闭,那么容器也会关闭,但是容器不会被删除。但是如果你只是想试一试某个容器,运行后自动进入命令行,那么可以使用-it参数;如果你想容器关闭之后自动删除,那么就使用-rm参数。

  • --name 给docker container起一个别名,后续可以通过别名来管理容器,否在会系统会默认分配一个随机的别名。

  • -p docker容器和外侧的端口映射,jenkins服务是运行在docker容器内部的,但是docker容器默认不对外暴露接口,所以通过这个参数将内部的8080端口映射到服务器本身的9090端口上。

  • -v 数据卷的挂载。这里涉及到docker container的一个特性,container如果停止运行了,那么再次启动时,之前所有运行相关的数据和文件就都不存在了,就类似于设置了自动还原的电脑一般,无论你做了多少的操作,一旦关机重启之后就又恢复到最初的状态。数据卷就是来解决上述问题的,通过Docker container外部的文件夹的挂载,将可持久化的文件存储到外部挂载的文件夹中。

 然后你就可以根据你自己的ip地址来键入下列地址http:ip:9090来访问jenkins的主页了。
 这里有一点需要注意的是,需要注意你阿里云服务器设置的网络安全协议,是否禁用掉了9090这个端口。

Publish over SSH配置

 Jenkins的初始化配置和SSH Over Publish的安装请大家自行百度,这里我主要讲解一下SSH Over Pushlish配置。
 首先我们要在jenkins服务器上生成密钥对,使用ssh-keygen -t rsa命令来生成秘密对,这样的话,在~/.ssh/下就会有私钥id_rsa和公钥id_rsa.pub。
 然后你需要上传公钥到目标服务器上,也就是我的bandwagon服务器上,可以使用ssh-copy-id来将文件上传到服务器上,类似于scp命令的使用方式。

1
ssh-copy-id -i ~/.ssh/id_rsa.pub <username>@<host>

 最后我们需要修改目标服务器的ssh配置文件,配置文件为/etc/ssh/sshd_config。设置ssh-server允许使用私钥和公钥对的方式登录,然后使用sudo /etc/init.d/ssh restart命令重启ssh服务。

1
2
3
RSAAuthentication yes
PubkeyAuthentication yes
#AuthorizedKeysFile %h/.ssh/authorized_keys

 上述步骤成功之后,大家在系统管理中配置Publish over SSH。相关的配置信息如下图所示。

jenkins1.png

 你还可以点击下方的高级选项,来配置ssh服务器的端口,超时时间等信息,还可以点击Test Configuration来检测是否配置成功。

构建任务配置

 我们先创建一个构建任务,该任务从github repo上将代码拉取下来,然后执行构建任务,然后通过Publish Over SSH在目标服务器上进行部署。
 我们首先配置源码管理模块,选择Git选项,然后配置Repository URL 并添加认证信息。可以将自己的github帐号和密码加入其中。

jenkins2.png

 不同的项目的构建命令不同,但是我们可以在构建后操作模块设置后续操作,通过ssh登录目标服务器,让目标服务器执行命令行操作来pull最新上传的image并且执行,这样就完成了部署。

jenkins3.png

Push触发构建任务

 完成上述配置,你就可以手动在jenkins上启动构架任务了,但是要做到自动化部署,还必须设置Push操作自动触发jenkins构建任务的机制。
 我们先到首页-用户管理界面打开自己的用户界面,然后点击左侧的设置按钮,并点击show API token按钮来获取API token.然后在构建任务设置页面的构建触发器模块勾选触发远程构建选项,并将token填到里边去。这是jenkins会提示你如何通过URL来触发构建任务。
jenkins5.png

 然后我们打开github上相应库的设置页面。点击左侧的Webhooks选项,然后添加hook.将上述的url填写到Payload URL栏中,点击添加。如果添加成功之后,每次你push一个新版本,那么jenkins就会自动进行部署了。
jenkins6.png

 如果你发现webhooks发送请求失败,那可能是因为你jenkins安全设置的问题,禁止掉了发送请求自动化构建。

后记

 本篇讲的都是十分基础性的内容,后一篇文章讲一下dockerfile的原理和注意事项与docker registry。

 最近在阅读《多处理器编程艺术》一书,掌握了很多Java多线程的底层知识,现在就做一下书中链表-锁的作用一章的总结。
 为了节约你的时间,本文主要内容如下:

  • 带锁的链表队列
  • 细粒度同步
  • 乐观同步
  • 惰性同步
  • 非阻塞同步

粗粒度同步

 所谓粗粒度同步其实很简单,就是在List的add,remove,contains函数的开始就直接使用Lock加锁,然后在函数结尾释放。
add函数的代码如下所示,函数的主体就是链表的遍历添加逻辑,只不过在开始和结束进行了锁的获取和释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Node head;
private Lock lock = new ReentrantLock();
public boolean add(T item) {
Node pred, curr;
int key = item.hashCode();
lock.lock();
try {
pred = head;
curr = pred.next;
while(curr.key < key) {
pred = curr;
curr = pred.next;
}
if (key == curr.key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
return true;
}

} finally {
lock.unlock();
}
}

 大家看到这里就会想到,这不就是类似于Hashtable的实现方式吗?把可能出现多线程问题的函数都用一个重入锁锁住。但是这个方法的缺点很明显,如果竞争激烈的话,对链表的操作效率会很低,因为add,remove,contains三个函数都需要获取锁,也都需要等待锁的释放。至于如何优化,我们可以一步一步往下看

细粒度同步

我们可以通过锁定单个节点而不是整个链表来提高并发。给每个节点增加一个Lock变量以及相关的lock()和unlock()函数,当线程遍历链表的时候,若它是第一个访问节点的线程,则锁住被访问的节点,在随后的某个时刻释放锁。这种细粒度的锁机制允许并发线程以流水线的方式遍历链表。
 使用这种方式来遍历链表,必须同时获取两个相邻节点的锁,通过“交叉手”的方式来获取锁:除了初始的head哨兵节点外,只有在已经获取pred的锁时,才能获取curr的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//每个Node对象中都有一个Lock对象,可以进行lock()和unlock()操作
public boolean add(T item) {
int key = item.hashCode();
head.lock();
Node pred = head;
try {
Node curr = pred.next;
curr.lock();

try {
while (curr.key < key) {
pred.unlock();
pred = curr;
curr = pred.next;
curr.lock();
}

if (curr.key == key) {
return false;
}
Node newNode = new Node(item);
newNode.next = curr;
pred.next = newNode;
return true;
} finally {
curr.unlock();
}

} finally {
pred.unlock();
}
}

乐观同步

 虽然细粒度锁是对单一粒度锁的一种改进,但它仍然出现很长的获取锁和释放锁的序列。而且,访问链表中不同部分的线程仍然可能相互阻塞。例如,一个正在删除链表中第二个元素的线程将会阻塞所有试图查找后继节点的线程。
 减少同步代价的一种方式就是乐观:不需要获取锁就可以查找,对找到的节点进行加锁,然后确认锁住的节点是正确的;如果一个同步冲突导致节点被错误的锁定,则释放这些锁重新开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean add(T item) {
int key = item.hashCode();

while (true) { //如果不成功,就进行重试
Node pred = head;
Node curr = pred.next;
while (curr.key < key) {
pred = curr;
curr = pred.next;
}
//找到目标相关的pred和curr之后再将二者锁住
pred.lock();
curr.lock();
try {
//锁住二者之后再进行判断,是否存在并发冲突
if (validate(pred, curr)) {
//如果不存在,那么就直接进行正常操作
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = curr;
pred.next = node;
}
}
} finally {
pred.unlock();
curr.unlock();
}
}
}
public boolean validate(Node pred, Node curr) {
//从队列头开始查找pred和curr,判断是否存在并发冲突
Node node = head;
while (node.key <= pred.key) {
if (node == pred) {
return pred.next == curr;
}
node = node.next;
}
return false;
}

 由于不再使用能保护并发修改的锁,所以每个方法调用都可能遍历那些已经被删除的节点,所以在进行添加,删除获取判断是否存在的之前必须再次进行验证。

惰性同步

 当不用锁遍历两次链表的代价比使用锁遍历一次链表的代价小很多时,乐观同步的实现效果非常好。但是这种算法的缺点之一就是contains()方法在遍历时需要锁,这一点并不令人满意,其原因在于对contains()的调用要比其他方法的调用频繁得多。
使用惰性同步的方法,使得contains()调用是无等待的,同时add()和remove()方法即使在被阻塞的情况下也只需要遍历一次链表
对每个节点增加一个布尔类型的marked域,用于说明该节点是否在节点集合中。现在,遍历不再需要锁定目标结点,也没有必须通过重新遍历整个链表来验证结点是否可达。所有未被标记的节点必然是可达的

1
2
3
4
5
//add方法和乐观同步的方法一致,只有检验方法做了修改。
//只需要检测节点的marked变量就可以,并且查看pred的next是否还是指向curr,需要注意的是marked变量一定是voliate的。
private boolean validate(Node pred, Node curr) {
return !pred.marked && !curr.marked && pred.next == curr;
}

 惰性同步的优点之一就是能够将类似于设置一个flag这样的逻辑操作与类似于删除结点的链接这种对结构的物理改变分开。通常情况下,延迟操作可以是批量处理方式进行,且在某个方便的时候再懒惰地进行处理,从而降低了对结构进行物理修改的整体破裂性。惰性同步的主要缺点是add()和remove()调用是阻塞的:如果一个线程延迟,那么其他线程也将延迟。

非阻塞同步

 使用惰性同步的思维是非常有益处的。我们可以进一步将add(),remove()和contains()这三个方法都变成非阻塞的。前两个方法是无锁的,最后一个方法是无等待的。我们无法直接使用compareAndSet()来改变next域来实现,因为这样会出现问题。但是我们可以将结点的next域和marked域看作是单个的原子单位:当marked域为true时,对next域的任何修改都将失败。
 我们可以使用AtomicMarkableReference对象将指向类型T的对象引用next和布尔值marked封装在一起。这些域可以一起或单个地原子更新。可以让每个结点的next域为一个AtomicMarkableReference。线程可以通过设置结点next域中的标记位来逻辑地删除curr,和其他正在执行add()和remove()的线程共享物理删除:当每个线程遍历链表时,通过物理删除所有被标记的节点来清理链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

public Window find(Node head, int key) {
Node pred = null, curr = null, succ = null;
boolean[] marked = {false};
boolean snip;

retry: while(true) {
pred = head;
curr = curr.next.get(marked);
while(true) {
succ = curr.next.get(marked); //获取succ,并且查看是被被标记
while (marked[0]) {//如果被标记了,说明curr被逻辑删除了,需要继续物理删除
snip = pred.next.compareAndSet(curr, succ, false, false);//
if (!snip) continue retry;
curr = succ;
succ = curr.next.get(marked);
}
//当不需要删除后,才继续遍历
if (curr.key >= key) {
return new Window(pred, curr);
}
pred = curr;
curr = succ;
}
}
}

public boolean add(T item) {
int key = item.hashCode();
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key == key) {
return false;
} else {
Node node = new Node(item);
node.next = new AtomicMarkableReference<>(curr, false);
if (pred.next.compareAndSet(curr, node, false, false)) {
return true;
}
}
}
}

public boolean remove(T item) {
int key = item.hashCode();
boolean sinp;
while(true) {
Window window = find(head, key);
Node pred = window.pred, curr = window.curr;
if (curr.key != key) {
return false;
} else {
Node succ = curr.next.getReference();
//要进行删除了,那么就直接将curr.next设置为false,然后在进行真正的物理删除。
sinp = curr.next.compareAndSet(curr, succ, false, true);
if (!sinp) {
continue;
}
pred.next.compareAndSet(curr, succ, false, false);
return true;
}
}
}


class Node {
AtomicMarkableReference<Node> next;
}

后记

 文中的代码在我的github的这个repo中都可以找到。

 和16年计划一样,建立一个计划目录,记录一下17年的计划和实现情况,进行不定时的更新。

计划清单

英语学习

单词量

  • 口语小助手 2017.1.30~

口语

  • 每天一集《摩登家庭》的听力口语练习 2017.1.30~

Android开发

Android性能方面研究

GT开源项目研究

博客文章

Android富文本
Android Trasication动画

前端开发

要阅读的书籍

  • 《暗时间》2017.1.30~

    习惯养成

生活

  • 关灯之后不准再玩手机
  • 11点半左右睡觉

思维

  • 每日的思考,总结和回顾

Update 2017.1.1

 开始制定计划,一个星期时间。

Update 2017.1.30

 制订2月计划

Android

  • 两篇博文:Android富文本和Android Trasication动画
  • Gt开源项目研究

《暗时间》研读

英语口语练习

思维的习惯,总结,回顾,自醒!!!!!

Update 2017.4.6

书籍 正在进行,预计时间3个月

  • 多核编程的艺术
  • 高效能Mysql

中间件

RPC简单框架

Netty源码分析

Java并发知识

JUC源码分析

无锁算法

Update 2017.5.3

书籍 正在进行,预计时间3个月

  • 多核编程的艺术 读完第一章
  • 高效能Mysql

mit 分布式课程 预计2个月

主页:http://nil.csail.mit.edu/6.824/2015/index.html
github:https://github.com/ztelur/mit-distributed-systems
经典课程啊,使用go语言,坚持自己把lab都做完

中间件

RPC简单框架

Update 2017.10.7

 已经很久没有指定过计划了,前段时间因为家里和校招的原因,牵扯了很多精力,而且个人也逐渐变得放松起来,确实,工作已经确定,是可以放松一下了。所以我决定之后的目标集中在个人习惯方面,并且增加对经济和金融的学习,为以后个人财务管理和投资做准备。

Java后台编程

 Spring Cloud Stream的源码已经研究很久了,但是由于之前对Spring的不熟悉,一直没有结果,希望尽快把文章写出来,并且可以发到一些比较有影响的平台上去,另外Stream和Rocket的结合也希望作为我个人的项目来尽快完成

  • Spring Cloud Stream 源码分析
  • Spring Cloud Stream Rocket binder的编写

编程习惯

 我一直认为我不算一个优秀的程序员,一是很多编程习惯不是很好,二是确实实际代码行数也不够,所以我最近一段时间从下面这些方面来提升

  • 自动化与快捷键 shell编程的利用,编程IDE的快捷键,虽然这些都是最简单的,但是我希望从这里开始入手吧
  • 编程风格:统一一致,代码大全学过很多,但是自己写代码有是另外一种情况,还得从新来过

工作生活习惯

 最近由于比较懈怠,平常学习过程中都比较随意,经常边写代码边看游戏视频,学习股票知识也都是看看就好,我希望之后的一个月内能够养成下列的习惯吧。

  • 一心一用:学习时就别看视频,也别学个10分钟就玩一下手机了。番茄工作法吧。
  • 看书,看学习视频要认真,笔记,复习
  • 每日计划和回顾

 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在JUC包中的所有多线程相关的类都和AQS相关,今天我就在这里总结一下另一个依赖于AQS来实现的同步工具类:BlockingQueue。我们主要以ArrayBlockingQueue为主来分析相关的源码。

阻塞队列

 相信大多数同学都是在学习线程池相关知识时了解到阻塞队列的概念的。知道各种类型的阻塞队列对线程池初始化时的影响。在java doc中这样定义阻塞队列。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞知道另外一个线程从队列中读取一个元素。阻塞队列一般都是FIFO,用来实现生产者和消费者模式。阻塞队列的方法通过四种不同的方式来处理操作无法被立即完成的情况,这四种情况分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,然后还未执行成功就放弃操作。这些方法都总结在下边这种表中了。

BlockingQueue

 我们就只分析puttake方法。

put和take函数

 我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将put函数看作生产者的操作,take是消费者的操作。
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先获得锁
try {
while (count == items.length)
//如果队列满了,就NotFull这个condition对象上进行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
//当putIndex达到最大时,就返回到起点,继续插入,
//当然,如果此时0位置的元素还没有被取走,
//下次put时,就会因为cout == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//因为插入了元素,通知等待notEmpty事件的线程。
notEmpty.signal();
}

 我们会发现put函数也是使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLockCondition相结合的先获得锁,再等待的机制;而不是synchronizedObject.wait的机制。这里的区别我们下一节再详细讲解。
 看完了生产者相关的put函数,我们再来看一下消费者调用的take函数。take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列为空,那么在notEmpty对象上等待,
//当put函数调用时,会调用notEmpty的notify进行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//如果到了尾部,将指针重新调整到头部
takeIndex = 0;
count--;
....
//通知notFull对象上等待的线程
notFull.signal();
return x;
}

Condition.await和Object.wait

 我们发现ArrayBlockingList并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?其中又有哪些原因呢?
Condition对象可以提供和Objectwaitnotify一样的行为,但是后者必须使用synchronized这个内置的monitor锁,而Condition使用的是RenentranceLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。
&emsp;Condition的流程大致如下边两张图所示.

await

notify

 我们首先来看一下await函数的实现,详细的讲解都在代码中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait队列上添加新的节点
Node node = addConditionWaiter();
//释放当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//由于node在之前是添加到condition wait queue上的,现在判断这个node
//是否被添加到Sync的获得锁的等待队列上。
//node在condition queue上说明还在等待事件的notify,
//notify函数会将condition queue 上的node转化到Sync的队列上。
while (!isOnSyncQueue(node)) {
//node还没有被添加到Sync Queue上,说明还在等待事件通知
//所以调用park函数来停止线程执行
LockSupport.park(this);
//判断是否被中断,线程从park函数返回有两种情况,一种是
//其他线程调用了unpark,另外一种是线程被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
//再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
 ....
}

final int fullyRelease(Node node) {
//AQS的方法,当前已经在锁中了,所以直接操作
boolean failed = true;
try {
int savedState = getState();
//获取state当前的值,然后保存,以待以后恢复
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/

private int checkInterruptWhileWaiting(Node node) {
//中断可能发生在两个阶段中,一是在等待singla,另外一个是在获得signal之后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

final boolean transferAfterCancelledWait(Node node) {
//这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
//两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

signal函数将等待事件最长时间的线程节点从等待condition的队列移动到获得lock的等待队列中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final void signal() {
//
if (!isHeldExclusively())
//如果当前线程没有获得锁,抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将Condition wait queue中的第一个node转移到acquire lock queue中.
doSignal(first);
}

private void doSignal(Node first) {
do {
   //由于生产者的signal在有消费者等待的情况下,必须要通知
//一个消费者,所以这里有一个循环,直到队列为空
//把first 这个node从condition queue中删除掉
//condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal将node转而添加到Sync的acquire lock 队列
}

final boolean transferForSignal(Node node) {
//如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//需要注意的是这里的node进行了转化
//ws>0代表canceled的含义所以直接unpark线程
//如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
//进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
//这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
//如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
//等待锁被释放掉再次抢夺锁,然后再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

后记

 后边一篇文章主要讲解如何自己使用AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶.

&emsp;对LongAdder的最初了解是从Coolshell上的一篇文章中获得的,但是一直都没有深入的了解过其实现,只知道它相较于AtomicLong来说,更加适合读多写少的并发情景。今天,我们就研究一下LongAdder的原理,探究一下它如此高效的原因。

基本原理和思想

 我们都知道AtomicLong是通过无限循环不停的采取CAS的方法去设置value,直到成功为止。那么当并发数比较多或出现更新热点时,就会导致CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,形成恶性循环,从而降低了效率。而LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的“热度”
 我们知道LongAdder的大致原理之后,再来详细的了解一下它的具体实现,其中也有很多值得借鉴的并发编程的技巧。

Add操作

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { //step1
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //step2
(a = as[getProbe() & m]) == null || //step3
!(uncontended = a.cas(v = a.value, v + x))) //step4
longAccumulate(x, null, uncontended); // step5
}
}

cellsLongAdder的父类Striped64中的Cell数组类型的成员变量。每个Cell对象中都包含一个value值,并提供对这个value值的CAS操作。
 我们来看一下casBase函数相关的源码吧。我们可以认为变量base就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,如果成功了,就不需要在进行下面比较复杂的算法,

1
2
3
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

 然后我们继续看step2第二层条件语句中执行的逻辑。如果cells数组为null或为空,就直接调用longAccumulate方法。因为cells为null或在为空,说明cells未完全初始化,所以调用longAccumulate进行初始化。否则继续判断。
 如果cells中已经有对象了,那么执行step3。我们先来理解一下getProbe() & m的这个操作吧。我们可以首先将这个操作当作一次计算”hash”值,然后将cells中这个位置的Cell对象赋值给变量a。然后判断a是否为null,如果不为null,那么就调用Cell对象自己的cas方法去设置value值。如果a为null,或在cas赋值发生冲突,那么也是开始调用longAccumulate方法。

LongAccumulate方法

longAccumulate函数比较复杂,带有我的注释的代码已经贴在了文章后边,这里我们就只讲一下其中比较关键的一些技巧和思想.
 首先,我们都知道只有当对base的cas操作失败之后,LongAdder才引入Cell数组.所以在longAccumulate中就是对Cell数组进行操作.分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操作.
 在这段代码中,关于cellBusy的cas操作构成了一个SpinLock,这就是经典的SpinLock的编程技巧,大家可以学习一下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended)
{

int h;
if ((h = getProbe()) == 0) { //获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
ThreadLocalRandom.current(); //初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) { //cas经典无限循环,不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells不为null,并且数组size大于0
//表示cells已经初始化了
if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
//当cellsBusy为0时,表示当前可以对cells数组进行操作。
Cell r = new Cell(x);//将x值直接赋值给Cell对象
if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
//就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
//如果失败了,就会再次执行一次循环
boolean created = false;
try {
Cell[] rs; int m, j;
//判断cells是否已经初始化,并且要操作的位置上没有cell对象.
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
created = true;
}
} finally {
//经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
//将cellBusy设置为0就是释放锁.
cellsBusy = 0;
}
if (created)
break; //如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
continue;
}
}
collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
 //fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
//就直接返回
break;
else if (n >= NCPU || cells != as)
  //如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次获得cellsBusy这个spinLock,对数组进行resize
try {
if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
Cell[] rs = new Cell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;//赋予cells一个新的数组对象
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//cells数组未初始化,获得cellsBusy lock,来初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x); //设置x的值为cell对象的value值
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}//如果初始化数组失败了,那就再次尝试一下直接cas base变量,如果成功了就直接返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

后记

 本篇文章写的不是很好,我写完之后又看了一遍coolshell上的这篇关于LongAdder文章,感觉自己没有人家写的那么简介明了。我对代码细节的注释和投入太多了。其实很多代码大家都可以看懂,并不需要大量的代码片段加注释。以后要注意一下。之后会接着研究一下JUC包中的其他类,希望大家多多关注。

 最近在学习zookeeper原理的时候了解到了paxos算法,看了几篇文章之后还是感觉有些迷糊,后来看了知行学社的paxos视频才对这个算法有了一定的了解,这里就做一下总结.

Paxos简介

 Paxos是Lamport于1990年提出的一种基于消息传递而具有高度容错特性的分布式一致性算法.这个算法是分布式中最为重要的算法,Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就是Paxos,其他算法都是残次品.具体Paxos算法的详细内涵和故事背景大家可以参考知乎上的回答;

Paxos的使用场景和假设

 我们都知道基于消息传递通信模型的分布式系列,不可避免的会发生以下错误:进程可能会慢,被杀死或在重启,消息可能会有延迟,丢失和重复.Paxos算法解决的问题就是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证无论发生以上任何异常,都不会破坏决议的一致性。但是Paxos算法也有一定的使用假设。一个假设是在消息传递的过程中不会出现拜占庭将军问题:即虽然有可能一个消息被传递两次,但是绝对不会出现错误的消息。另一个假设是提议不会被反对,只能被同意或在被更新的提议替换。
 Paxos协议中有三种角色,每个节点可以扮演多个角色:

  • 倡议者(Proposer):提议者可以提出提议(数值或在操作命令)以供投票表决。
  • 接受者(Acceptor):接受者可以对提议者提出的提议进行投票表决,提议有超过半数的接收者投票即被选中。
  • 学习者(Learner):学习者无投票者,只是从接收者那里获取哪个提议被选中。

 在Paxos算法中,一个或在多个Proposer都可以并发的提出提议;系统必须针对所有提议中的某个提议达成一致(超过半数大的接受者选中);最多只能对一个确定的提议达成一致;只要超过半数的节点存活且可以互相通信,整个系统一定可以达成一致,即选择一个确定的提议。
 如果直接讲解Paxos算法,大家可能会有些难以理解,这里我们就按着视频里的顺序,先从简单的分布式一致性算法开始,然后不断进行优化,最后将其演变成Paxos算法。

图解Paxos主要流程

 Paxoso算法分为两个的阶段,我们就将其分别记为Phase1和Phase2.每个Proposer都持有一个独有的变量epoch,每个Acceptor都保存三个状态:lastest_prepared__epoch,accepted_epoch和accepted_value.lastest_prepared_epoch是指Acceptor授予访问权的Proposer的epoch值,accepted_epoch是Acceptor接受提议的Proposer的epoch值,而accepted_value就是Acceptor接受的提议值喽,他们的初始值都为null。

阶段一

 Phase1过程中,Proposer向Acceptor发起Prepare(epoch)请求来获取访问权。将自己的epoch发送给Acceptor.而Acceptor只会接受比lastest_prepared_epoch更大的epoch,并给予访问权,并将epoch记录到lastest_prepared_epoch的值中,返回当前的accepted_epoch和accepted_value的值。在初始化状态下,二者都是null,所以返回的是。如果epoch小于lastest_prepared_epoch则不授予访问权,并返回
phase1
 如上图所示,Proposer1向5个Acceptor发送了Prepare(#1)的请求,其中前三个请求顺利到达,Acceptor授予访问权,返回,并修改lastest_prepared_epoch为1。而后两个请求发生了网络延迟,一直未到达相应的Acceptor。
 在阶段一中,Proposer需要获得半数以上的Acceptor的访问权和对应的一组value的取值才会进行第二阶段,这样才会确保,一个Proposer提出的确定的议案会被另外一个Proposer发现,从而在阶段二中会进行正确的操作。

阶段二

 第二阶段采取“后者认同前者”的原则进行。在肯定旧epoch无法生成确定性取值时,新的epoch会提交自己的取值,不会冲突;一旦旧epoch形成了确定性取值,那么该proposer一定可以获得该取值,并且会认同该取值,不会破坏。
 如果Proposer在第一阶段获取的value值都是null,则旧epoch无法形成确定性取值,此时让自己的成为确定性取值:

  • 向epoch对应的所有acceptor提交取值
  • 如果收到半数以上的成功应答,则返回
  • 否则返回

 如果value的取值不为null,则认同最大accepted_epoch对应的取值f,使成为确定性取值,其中epoch是自己的epoch.

  • 如果f出现半数以上,则说明f已经是确定性取值了,直接返回
  • 否则,向epoch所对应的acceptor提交取值

 Acceptor在接收到accept(epoch,V)的请求之后,先查看epoch是不是自己记录的lastest_prepared_epoch,如果是则设置 = 。否在则会返回error

paxos2

 如上图所示,由于在阶段一中Proposer1接受到的值都为null,所以,决定将自己的值设置为确定值,于是发送accept(1,V1)请求。Acceptor1接受到了这个请求,检查lastest_prepared_epoch也等于1,所以将自己存储的设置为<1,v1>。而Proposer1的另外两个accept请求发生了网络延迟。
 如果此时,Proposer2向Acceptor进行propose会怎么样呢?我们来模拟propose来分析一下。

paxos3

 如上图所示,Proposer2向Acceptor发送了prepare(#2)的请求,Acceptor1先检测一下发现2大于现在的lastest_prepared_epoch,所以同意发送访问权,将lastest_prepared_epoch修改为2,并将自己保存的accepted_epoch和acceped_value返回给Proposer2;Acceptor3的操作也是类似,只不过因为Proposer1发送的accept请求发生了延迟,所以Acceptor3返回的是;而Acceptor5的操作和我们在文章第一张图中的Acceptor1的操作相同,他们都是第一次接收到prepare请求。
 然后Proposer2进行第二阶段的操作,从所有的返回数据中,找到accepted_epoch最大的那个accepted_value.这里就是Acceptor返回的<1,v1>,所以,Proposer2会尽力让V1成为确定值,所以它向Acceptor发送accept(2,V1)的请求。然后Acceptor1,Acceptor3,Acceptor5三个Acceptor接受了这个accept请求,更新自己的。此时,已经有三个acceptor形成了一致性的值,所以V1就成了整个系统的确定性取值。
paxos7.png

 那么Proposer1对Acceptor3发送的accept请求在此时达到Acceptor3会怎么样呢?Acceptor3发现当前lastest_prepared_epoch是2,所以直接拒绝了这个请求。

后记

 不清楚大家现在对Paxos算法的过程是否已经有了清楚的了解啊?那么我来问几个问题,大家可以考虑一下:

  • 在本文的情景下,假如Proposer2向Acceptor2,3,4发送了prepare请求,而不是向Acceptor1,3,5发送的请求,会怎么样呢?
  • 为什么强调prepare阶段时必须接受到一般以上Acceptor的返回,才能进行第二阶段?
     后续希望能够分析一下Zookeeper关于Paxos的具体使用场景和算法,希望大家多多关注。