okspy


  • 首页

  • 标签

  • 分类

  • 归档

JAVA并发(一)线程安全

发表于 2019-10-01 | 分类于 并发

线程安全的定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者线程如何交替执行,主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类为线程安全的。

原子性

原子性指的是一个操作可以作为一个不可分割的操作来执行。也指:对于一个线程正在使用的对象,使用过程中不会被其他线程修改。

1
2
3
public int autoIncrease(int i) {
return ++i;
}

++i或i++看上去是一个操作,但并非是原子操作,它包含了三个操作:

  • 读取i的值
  • 将值加1
  • 将计算结果写入i

这是一个操作序列:读取 - 修改 - 写入,其结果状态依赖于之前的状态。

两个线程同时执行该操作会导致线程不安全,因为两个线程可能会交替的执行上述三个操作,某一个线程读取到的值可能是无效的、过时的。

为了保证线程安全,需要确保两个线程按照顺序依次执行上述三个操作,即线程1执行完了三个操作后,线程2才能开始执行这三个操作。

在并发编程中,这种由于“不恰当”执行顺序而出现的不正确的结果称为:竞态条件。

“不恰当”的执行顺序是CPU优化导致的,是客观存在、有利于提高处理速度的,这种执行顺序总是存在的。程序员能掌控的是编写线程安全的代码,使其在竞态条件下也能满足线程安全。

竞态条件

最常见的竞态条件是:先检查后执行

下面的例子是一个懒加载单例类,它不是线程安全的:

1
2
3
4
5
6
7
8
9
10
11
public class LazyLoadSingleton {
private LazyLoadSingleton singleton = null;

public LazyLoadSingleton getSingleton() {
if (singleton == null) {
singleton = new LazyLoadSingleton();
}

return singleton;
}
}

线程A和线程B同时执行getSingleton。A看到singleton为空,因此创建一个新的LazyLoadSingleton实例;线程B同样要判断singleton是否为空,而B判断时singleton是否为空,取决于A执行到哪一步,即不可预测的执行顺序,可能会导致线程不安全。

避免竞态条件—复合操作

复合操作指将前文中的自增、先检查后执行的操作分别组合成原子操作,这样就能保证线程安全。

在java.util.concurrent.automic包中包含了一些原子变量类,用于实现在数值和对象引用的原子状态转换。比如自增的复合操作

1
2
3
4
5
6
7
public class AutoIncrease {
private final AtomicLong count = new AtomicLong(0);

public long increase() {
return count.incrementAndGet();
}
}

AtomicLong.incrementAndGet底层通过CAS实现了复合操作。

CAS

CAS相当于乐观锁,CAS对应了硬件指令CMPXCHG,该指令对应着”比较并交换的操作,如果一个值原来是A(预期值),修改为B,在CPU回写至内存时,会检查当前值是否为A(比较),如果为A,则将值更新为B(交换)。

CPU循环进行CAS操作直到成功为止。CAS虽然很高效的实现了原子性,但是CAS仍然存在三大问题:

  • ABA问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子操作。

ABA问题

CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。

ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。 从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。

自旋时间长开销大

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用:

  • 延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源。
  • 避免在退出循环时因内存顺序冲突而引起CPU流水线被清空,从而提高CPU的执行效率。

只能保证一个共享变量原子操作

对多个共享变量操作时,循环CAS就无法保证操作的原子性,有两种办法解决:

  • 用锁(下一节重点介绍)
  • 把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了类AtomicReference、AtomicStampedReference(解决ABA)来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。

加锁机制

加锁可以保证原子性,将多个操作复合为一组同步的操作,避免竞态条件。

将上述代码进行如下改动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 改动1
public class AutoIncrease {
private final AtomicLong count1 = new AtomicLong(0);
private final AtomicLong count2 = new AtomicLong(0);

public long increase() {
long num1 = count1.incrementAndGet();
long num2 = num1 + count2.incrementAndGet();
return num2;
}
}

// 改动2
public class AutoIncrease {
private final AtomicLong count = new AtomicLong(0);

public long increase() {
if (count.incrementAndGet() % 2 == 0) {
return count.get();
}
return count.get() + 1;
}
}

改动1中,方法increase中包含两个原子操作,但是increase方法的返回值num2,涉及到了多个变量:count1和count2,这两个变量之间不是独立的,而是某个变量的值会对其他变量的值进行约束。这就导致了increase整体成为了一个非原子的方法,不是线程安全的。

总结1:多个变量彼此不是相互独立时,不是原子操作。

改动2中,先检查count自增后是否为为偶数,为偶数则直接返回,为奇数则加1再返回,这是典型的先检查后执行操作,increase整体是一个非原子方法,不是线程安全的。

一个操作是否是原子的,要看它包含的所有操作是否是

总结2:对于先检查后执行的操作,不是原子操作

对于改动1和改动2出现的非原子操作,JAVA提供了加锁机制,用来保证在一个原子操作中更新所有相关的状态变量,即将上述操作合并为一个整体,这种合并为一个整体是语言逻辑层面的,通过对象锁实现的,不是系统指令集层面的(CAS),锁的粒度是可以在写代码时掌控的。

对象锁

对象锁又称内置锁、Monitor锁,每一个Java对象自带了一把看不见的锁,通过synchronized关键字使用该锁。它可以保证原子性、有序性、可见性。正是因为如此强大,容易导致滥用。

synchronized的原理

synchronized的实现离不开Monitor。Monitor 是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联(对象头的MarkWord中的LockWord指向monitor的起始地址),Monitor包含了下列信息:

字段 含义
owner 占有该Monitor的线程的唯一标识,为Null时表示没有线程占用
EntryQ 关联一个系统互斥量(semaphore),阻塞所有试图锁住monitor record失败的线程
RcThis 被阻止的线程的个数
Nest 计数器,用来实现重入锁,没有线程持有monitor时该值为0
HashCode 与monitor关联的对象的hashcode
Candidate 只有两个值,0表示没有需要唤醒的线程,1表示要唤醒一个继任线程来竞争锁

以下面的代码为例:

1
2
3
4
5
public void synMethod() {
synchronized(this) {
// do smothing
}
}

上述代码反编译后:

1
2
3
monitorenter
...
monitorexit

synchronized对应了两个指令:monitorenter、monitorexit

以JVM中对monitorenter的解释为例:

monitorenter :

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

翻译一下:每个对象都关联着一个monitor,当monitor被某一个且只能被一个线程占用后,monitor就会处于锁定状态。线程执行到monitorenter后,尝试去获取与monitor关联的对象的所有权,此时,会有两种结果:

  • 如果monitor的Nest值为0,则线程会占有monitor。
  • 如果monitor被占有了,通过owner进行判断是否为当前线程占有的,如果是,那么该线程重入一次,计数器Nest的值加1
  • 如果monitor被其他线程占有了,当前线程阻塞,直到Nest值为0

总结一下:每个对象的对象头中都有一个Mark Word用于存储运行时数据,Mark Word中包含了Lock Word,Lock Word记录了Monitor的指针,Monitor中的owner字段记录了持有该Monitor的线程唯一标识,Nest字段是一个计数器,用来表示该Monitor是被持有了几次,当线程执行到montorenter指令时,会判断计数器,计数器为0时,直接持有该锁,不为0时,进一步判断owner是否为当前线程,为当前线程则将计数器加1,继续持有该锁,不为当前线程则阻塞等待至计数器为0。

synchronized的使用

下面给出synchronized的几种常用应用场景:

  • 普通方法上,锁当前实例对象
  • 静态方法上,锁当前类的class对象
  • 代码块,锁括号里的对象
  • 继承的方法上,锁子类的实例对象,锁两次

这里比较容易令人困惑的是应用在继承方法上:首先,继承的本质是让子类拥有父类对象的引用,super关键字就是告知JVM,子类对象需要通过父类的引用调用父类的方法,因此,调用者是子类对象,锁的也是子类对象。在下面代码示例中,进入子类的paraentMethod()方法时,获取一次子类对象锁,调用super.paraentMethod()时,又一次获取了子类的对象锁,共在子类实例对象上加了两次锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class SynParent {
public synchronized void paraentMethod() {
System.out.println("method paraent start...");
}
}

public class SynchronizedTest extend SynParent{
private final volatile Object objLock = new Object();

// 用在继承的方法上,锁子类的当前实例对象
@Override
public synchronized void paraentMethod() {
super.paraentMethod();
System.out.println("method paraent end...");
}

// synchronized应用在普通方法上,锁当前实例对象
public synchronized void method1() {
System.out.println("method 1 start...");
// do something

System.out.println("method 1 end...");
}

// synchronized应用在静态方法上,锁当前类的class对象
public static synchronized void method2() {
System.out.println("method 2 start...");
// do something

System.out.println("method 2 end...");
}

// synchronized应用在代码块,锁当前实例对象
public void method3() {
synchronized(this) {
System.out.println("method 3 start...");
// do something
}

System.out.println("method 3 end...");
}

// synchronized应用在代码块,锁自定义实例对象
public void method4() {
synchronized(objLock) {
System.out.println("method 4 start...");
// do something
}

System.out.println("method 4 end...");
}
}

JVM的锁优化

以64位JVM为例,它的对象头中的Mark Word如下,分别对应了对象的四种状态,无锁、偏向锁、轻量级锁、重量级锁,此外,虚拟机还有自旋锁、锁消除、自旋自适应锁等机制,本节将会逐个介绍。

随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁,但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级。

1568963278276

重量级锁就是前面我们详细分析过的synchronized锁,线程需要持有与对象相关联的monitor,montior中包含了线程唯一表示、系统互斥量、计数器等信息。系统互斥量导致该锁是重量级。重量级锁不属于锁优化,所以不再单独列为一节。

轻量级锁

synchronized原理中已提到过,线程栈帧中有一个名为Lock Record(锁记录,又叫Lock Word)的空间,用于存储对象的Mark Word的拷贝。

线程尝试获取轻量级锁时,虚拟机使用CAS将对象的Mark Word更新为指向Lock Record的指针,如果此次更新成功,那么这个线程就拥有了该对象的锁。锁标志位更新为00,之所以称之为轻量级,是去除了同步使用的互斥量。

如果CAS操作失败,虚拟机首先检查对象的Mrak Word是否指向当前线程,如果是,那就可以直接进入同步块执行,如果对象的Mark Word没有指向当前线程,说明锁已经被其他线程抢占了,轻量级锁不再有效,膨胀为重量级锁,锁标志位变为10。

偏向锁

如果说轻量级锁是在无竞争的状态下使用CAS操作去除同步使用的互斥量,那偏向锁就是在无竞争的状态下把整个同步都消除掉,连CAS操作也省去。

对象头使用54bit存储偏好的线程ID,再使用2bit存储epoch(偏向锁获取的时间戳),当锁对象第一次被线程获取时,进入偏向模式,同时会进行一次CAS(只进行一次),把获取到该锁的线程ID记录在Mark Word中,该线程以后再进入与锁相关的同步块时,虚拟机不再执行任何同步操作,直至另外一个线程尝试获取该锁。偏向锁可以提高带有同步(如synchronized关键字)但实际运行中无线程竞争的代码的效率,即只有一个线程获取该锁,那么使用偏向锁模式。

偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁。-XX:+UseBiasedLocking开启偏向锁。

轻量级锁与偏向锁的区别:

  • 偏向锁只执行一次CAS,后续同一个线程获取锁时完全没有同步操作,偏向锁每次都要执行CAS
  • 偏向锁在有其他线程尝试获取锁时就失效,轻量级锁在其他线程获取锁成功后才会失效

自旋锁

如果有两个以上的处理器,处理器A的线程获取了锁,线程B请求获取同一个对象的锁时会阻塞,在大多数情况下,线程A占有锁的时间不会太久,为了这段很短的时间去挂起和恢复线程B并不值得。

因此,JVM让后面请求锁的那个线程B执行一个忙循环(自旋),不放弃处理器的执行时间,看看处理器A的线程是否很快是否锁。这种情况适用于处理器A的线程只需要很短的时间就释放锁,省去了B线程挂起去等待A释放锁和B线程恢复的时间。自旋锁默认开启,默认次数10次,使用-XX:PreBlockSpin设置次数 。

自旋自适应锁

如果对于某个锁,自旋很少成功过,以后获取该锁可能省去自旋过程。如果对于某个锁,经常很短时间就成功,虚拟机认为这次自旋很有可能再次成功,会允许自旋等待更长的时间。有效的解决了自旋等待时间过长时白白耗费CPU资源的问题。

锁消除

消除锁是虚拟机另外一种锁的优化,这种优化更彻底,Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间。

比如编写了一段看起来没有同步的代码,但是经javac编译后,发现包含了三个sb.append()操作,每个sb.appen()方法都包含一个同步块,锁就是sb对象,虚拟机观察sb,发现它的动态作用于被限制在concatString()方法内部,也就是说,其他线程访问不到当前线程的sb对象,因此,这里虽然有锁,但是可以消除。JVM就会消除该锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 一段看起来没有同步的代码
public String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}
// javac 转化后,可以看出包含了三个sb.append()操作,这些操作都是同步的,耗费性能
public String concatString(String s1, String s2, String s3) {
StringBuilder sb = new StringBuilder();
sb.append(s1);
sb.append(s2);
sb.append(s3);

return sb.toString();
}

锁粗化

原则上,在编写代码时,锁的粒度越小越好,但是如果一系列连续操作都对同一个对象反复加锁和解锁,甚至加锁出现在循环体中,即使没有线程竞争,频繁的进行互斥同步操作也会导致不必要的性能损耗。

如下面的例子中,第一个while循环对当前实例对象加锁1次,第二个while循环对当前实例对象加锁99次,虚拟机优化后,只加锁了1次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TooMuchLock {
private int sum;
private int i;
private int j;

public void setSum() {
synchronized(this) {
while(i< 100) {
i++;
}
}
System.out.println("suming...");

while(j < 100) {
synchronized(this) {
j++;
}
}

sum = i + j;
}
}


// 虚拟机锁粗化后
public void setSum() {
synchronized(this) {
while(i < 100) {
i++;
}
System.out.println("suming...");
while(j < 100) {
j++;
}
sum = i + j;
}
}

性能提升原则

开发过程中,尽量遵循以下原则:

  • 尽量将不影响共享状态且执行时间较长的操作从同步代码块中分离出去,比如尽量不要对I/O操作加锁。
  • 不要频繁的对同一个对象加锁,即使虚拟机有锁粗化机制
  • 不要盲目的为了提高性能而细化锁的粒度,细化锁的粒度时,要时刻警惕线程安全性。

可见性

通常处理器和内存之间都有几级缓存来提高处理速度,处理器先将内存中的数据读取到内部缓存后再进行操作,但是对于缓存写会内存的时机则无法得知,因此在一个处理器里修改的变量值,不一定能及时写会缓存,这种变量修改对其他处理器变得“不可见”了。

因此,可见性指的是内存可见性,当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。与原子性对比如下:

  • 原子性:一个线程使用对象期间,对象不被其他线程修改。

  • 可见性:一个线程A使用对象期间,对象可以被其他线程修改,但是线程A能够看到发生的变化。

通过加锁实现的原子性可以保证可见性:线程A执行某个同步代码块时,线程B随后进入同一个锁保护的同步代码块,在这种情况下,可以保证线程B获取锁后可以看到线程A之前在同一个同步代码块中的所有操作。因此,加锁的含义不仅仅局限于互斥行为,还包括内存可见性。

但在不要求互斥、只要求内存可见性的情况下,再使用锁就显得有些重了,此时可以使用volatile变量,它可以保证内存可见性,变量的修改通知到其他线程。

volatile原理

Java代码

1
private volatile TestInstance instance = new TestInstance();

上述代码的汇编代码:

1
2
0x01a3de1d: movb $0x0,0x1104800(%esi);
0x01a3de24: lock addl $0x0,(%esp);

有Violatile修饰的变量在汇编阶段,会多出一条lock前缀指令,它在多核处理器下引发两件事情:

  • 将当前处理器缓存行的数据写回内存
  • 写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效(缓存一致性协议)

回写内存

处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完之后不知道何时回写内存,如果对声明了Volatile变量进行写操作,JVM就会向处理器发送一条lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存

缓存一致性协议

使用Volatile修饰的变量,在写操作的时候,会强制将这个变量所在缓存行的数据写回到内存中,但即使写回到内存,其他处理器也有可能使用内部的缓存数据,从而导致变量不一致;所以,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议:每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期,如果处理器发现自己缓存行对应的内存地址被修改,就会将该缓存行设置成无效状态,下次要使用就会重新从内存中读取。

lock信号

volatile的核心是lock前缀指令,它负责通知cpu将当前操作立即回写内存,正是因为回写内存的存在,指令重排无法跨过lock信号对应的指令。因此,lock前缀实际上是一种内存屏障,cpu不会跨过该屏障进行重排序,volatie不仅可以保证可见性,也保证有序性。

volatile使用场景

当且仅当满足以下所有条件时,才应该使用volatile变量:

  • 对该变量的写入操作不依赖变量的当前值,或者能确保只有一个线程更新变量的值

  • 该变量不与其他变量一起纳入不变性条件中(因为volatile变量不能确保原子性)

举例一些应用场景:标识一些事件的发生,如初始化、销毁、判断是否处于某个状态,状态的变化只有一个线程能够触发。

小结

本篇主要讨论了以下内容:

  • 通过CAS、对象锁保证原子性;
  • 通过volatile、对象锁保证可见性、有序性。
  • CAS存在的三个问题
  • 缓存一致性协议的定义
  • JVM的锁优化方法。

下一篇主要介绍对象的安全发布。

JAVA并发(二)对象安全

发表于 2019-10-01 | 分类于 并发

this逸出

this逸出就是:在对象还未实例化完成时,就能被其他对象锁获取(发布)。

什么是this逸出

对于一个类C来说,“外部方法”指的是行为不完全由类C规定的方法,包括其他类定义的方法,以及类C中可以被改写的方法。当把类C的对象传递给某个外部方法时,相当于发布了该对象,此时如果C的实例未完成实例化,就称为类C的实例的this逸出。最常见的“外部方法”使用场景是在构造器中生成内部类实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
1  // 定义一个事件监听的接口
2 public interface EventListener {
3 void onEvent();
4 }
5
6 // 定义一个管理事件监听器类
7 public class EventSource {
8 private List<EventListener> source = new ArrayList<>(10);
9
10 public void registerListener(EventListener listener) {
11 try {
12 Thread.sleep(500L);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 listener.onEvent(); //假设listener注册500ms就被调用了
17 source.add(listener);
18 }
19 }
20
21 // this逸出类
22 public class ThisEscape {
23 private String name;
24 private Thread t;
25
26 public ThisEscape(EventSource source, String initName) {
27 name = initName;
28
29 // 在构造器中启动线程
30 t = new Thread(new Runnable() {
31 @Override
32 public void run() {
33 name = "threadName"; // this可能逸出至其他线程
34 }
35 });
36 t.start(); // 一旦启动该线程,this就逸出了,name随时有可能被修改为threadName
37
38 source.registerListener(new EventListener() {
39 @Override
40 public void onEvent() {
41 name = "eventName"; // this隐式逸出
42 }
43 });
44 // 构造函数中需要耗时才能完成this构建,这里为了明显的看到this逸出的效果,设为1s
45 try {
46 Thread.sleep(1000L);
47 } catch (InterruptedException e) {
48 e.printStackTrace();
49 }
50 }
51
52 public static void main(String[] args) {
53 ThisEscape thisEscape = new ThisEscape(new EventSource(), "initName");
54 System.out.println("name = " + thisEscape.name);
55 }
56 }

上述代码中存在两处this逸出,一个是33行、另一个是在41行,最后name的值取决于1115行、3036行、45~49行这三处,它们分别是:

  • 11~15行:listener注册完成后多久才会调用到onEvent?
  • 30~36行:线程t何时启动?
  • 45~49行:thisEscape对象需要多久才能构造完成?

前两处在实际应用的过程中都有可能不是构造器能够控制的,无论是Runable还是EventListener,它们的本质是相同的:在构造器中初始化一个内部类的实例,导致this隐式的泄露。

避免this逸出

为了避免this逸出,有如下策略:

  • 可以在构造器中创建线程,但不要直接启动该线程,应该确保在对象初始化完成后再启动该线程
  • 只要将构造器设置为private,然后使用工厂方法发布对象,就一定不存在this逸出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 基于工厂方法防止this引用逸出
public class SafeListener {
private String name;
private final EventListener listener;

private SafeListener(String initName) {
name = initName;
listener = new EventListener() {
public void onEvent() {
name = "eventName";
}
};
}

public static SafeListener newInstance(EventSource source, String initName) {
SafeListener listener = new SafeListener(initName);
source.registerListener(listener); // listener已构造完成,不存在this泄露
return listener;
}
}

线程封闭

共享的对象在堆中可以被所有线程访问到,所以会存在线程不安全的问题,但如果某个对象只能被单线程访问,就不存在线程安全问题。这种仅在单线程内访问对象、将某个对象封闭起来的技术称为线程封闭。

单线程写入volatile变量

在volatile变量上存在一种特殊的线程封闭,只要能确保只有单个线程对共享的volatile变量执行写入操作,那么就可以安全地在这些共享volatile变量上执行“读取-修改-写入”操作,这种情况相当于将修改操作封闭在单个线程中,避免了竞态条件,并且volatile变量的可见性可以保证其他线程能够看到最新的修改。

栈封闭

栈封闭是线程封闭的一种特例,在栈封闭中,只有通过局部变量才能访问对象,局部变量都存在于栈中,因此,它是线程安全的。例如:

1
2
3
4
5
6
public int calculte(List<Integer> list) {
List<Integer> localList = new ArrayList<>();

localList.addAll(list);
// 后续对localList进行操作,线程安全
}

ThreadLocal

维持线程封闭另一种做法是使用ThreadLocal,这个类能够使线程与对象关联起来,在线程的上下文都可以获取某一个对象,常用在服务器会话上下文变量的传递等场景下。详见ThreadLocal原理。

不变性

不可变的对象一定是线程安全的。不可变对象指的是:只有一种状态,且该状态在构造函数内完成,一旦对象构造完成,不再改变。

final关键字

final关键字用于构造不可变对象,final类型的域是不能修改的。与c/c++的constant常量有些相似;但JMM中,final关键字被增强了,还有另外一层语义:初始化过程是安全的,final域禁止处理器把final域的写重排序到构造函数之外,一个对象的final域的初始化一定在该对象初始化完成之前完成。

finnal域的写

原理图如下,在写final域b=2操作后,添加了一个storestore屏障,然后才是构造函数执行结束,而普通域a,则有可能重排序到构造函数执行后。

final禁止重排序

final域的读

同样的,还有一个loadload屏障用于读final域,初次读对象与读对象的final域之间有一个loadload屏障,一个对象的final域的初始化一定在该对象初始化完成之前完成。

img

final与this逸出

如果出现了this逸出(this逸出:对象在构造函数执行结束前就能被其他对象或线程获取),上述storestore屏障相当于失效了,因此,final的安全性建立在没有this逸出的前提下。

final域的安全性

在storestore、loadload屏障、没有this逸出的保证下,final关键字声明的域是可以安全发布的,一旦构造完成就不可变,且无法读取到未构造完的final域,对象为null时读取不到final域。

综上,final保证对象只能被初始化一次,且初始化过程是安全的:

1
2
3
4
5
private final List<String> list = new ArrayList<>(16);

public void updateList(List newList) {
list = newList;
}

然而,若final域引用的对象是可变的,这些被引用的对象可以被修改,还是存在线程不安全。比如下面的例子,虽然list不能被初始化两次,仍然可以修改list的内容。

1
2
3
4
5
6
7
8
9
private final List<String> list = new ArrayList<>(16);

public void add(String s) {
list.add(s);
}

public void get() {
return list;
}

hashMap源码解读

发表于 2019-09-14 | 分类于 JAVA集合

JDK1.8的hashMap采用数组+链表+红黑树的结构,jdk1.7的hashMap采用数组+链表的结构,红黑树的目的是为了解决过长链表效率低的问题。JDK1.8修改了扩容机制,扩容时不需要计算hash,这个在后文会详细说明。

HashMap有几个重要的成员变量,见下表:

变量 含义
initialCapacity HashMap容量
loadFactory 负载因子
threadshold 扩容阈值,capacity*loadFactory
modCount 修改次数,用于并发时判断数据是否过期(CAS)

通过调节负载因子,可使 HashMap 时间和空间复杂度上有不同的表现。

  • 调低负载因子时,HashMap 所能容纳的键值对数量变少。扩容时,重新将键值对存储新的桶数组里,键的键之间产生的碰撞会下降,链表长度变短。此时,HashMap 的增删改查等操作的效率将会变高,这里是典型的拿空间换时间。
  • 增加负载因子(负载因子可以大于1),HashMap 所能容纳的键值对数量变多,空间利用率高,但碰撞率也高。这意味着链表长度变长,效率也随之降低,这种情况是拿时间换空间。至于负载因子怎么调节,这个看使用场景了。一般情况下,用默认值就可以了

计算哈希

hashMap的所有操作都离不开hash值的计算,因此,有必要将hash值的计算写在前面。

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

从源码可以看出,hash值的计算是由key的hashCode进行位运算后得到的,在Java中,hash是int型,32位,前16位为高位,后16位为低位,先取hash的低位,和高位进行异或运算,得到hash值,位运算如下图:

1567838656869

这么做的好处就是最大限度的发挥高位和低位的作用,提高hash值的复杂度(当我们覆写hashCode方法时,有可能会写出分布性不佳的hashCode方法)。

计算一个键值对在数组中的下标时,采用了公式:(n-1) & hash,n为数组容量,n-1与hash值进行与运算,等价于hash%n,往往只有低位参与了计算,因此,hashMap在计算key的hash值进行的位运算,有利于hash值的高位与低位均可以参与到计算数组下标中去,这就是为什么不直接使用key.hashCode()的原因。

为什么说N-1 & hash 等价于 hash%N

N总是2的n次幂,即只有一位为1,N-1的后img位全部为1(此处不明白的可以查看2.4.3节的表2.1《数组容量与二进制值》),N-1 & hash 即hash的后img位为1的位组成的值,它正是hash%N的余数,如下图:

1567840630947

put

先定位要插入的键值对属于哪个桶,定位到桶后,再判断桶是否为空。如果为空,则将键值对存入即可。如果不为空,则需将键值对放置于链表最后一个位置或插入红黑树中,或者覆盖键完全相同的值。最后,以put后的容量对比threadshold决定是否需要扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1 public V put(K key, V value) {
2 return putVal(hash(key), key, value, false, true);
3 }
4
5 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict) {
7 Node<K,V>[] tab; Node<K,V> p; int n, i;
// 初次使用时,初始化table
8 if ((tab = table) == null || (n = tab.length) == 0)
9 n = (tab = resize()).length;
// 数组中tab[(n-1)&hash]尚未使用,不存在哈希碰撞,直接将键值对存入tab[(n-1)&hash]
10 if ((p = tab[i = (n - 1) & hash]) == null)
11 tab[i] = newNode(hash, key, value, null);
12 else { // 待存入的键key与已存在的键值对p有哈希冲突,解决冲突
13 Node<K,V> e; K k;
// 若键已存在于桶中第一个节点,将e指向该节点,目的是省去稍后树查找或遍历链表
14 if (p.hash == hash &&((k = p.key) == key || (key != null && key.equals(k))))
15 e = p;
// 若p为TreeNode,说明该数组下标处(该桶中)已经有8个以上的哈希冲突了,调用红黑树插入方法
16 else if (p instanceof TreeNode)
17 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 当前桶中使用的是链表
18 else {
// 遍历链表,插入尾部
19 for (int binCount = 0; ; ++binCount) {
// 判断是否为尾节点,若是,将键值对插入至尾节点后面
20 if ((e = p.next) == null) {
21 p.next = newNode(hash, key, value, null);
22 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
23 treeifyBin(tab, hash);
24 break;
25 }
// 若链表存在该键,跳出循环,此时e=p.next
26 if (e.hash == hash &&
27 ((k = e.key) == key || (key != null && key.equals(k))))
28 break;
29 p = e;
30 } // end for loop
31 }

// 上面的if、elseif、else用于确定待插入节点e的位置,现在确定key相同时,是否覆盖value
32 if (e != null) { // existing mapping for key
33 V oldValue = e.value;
34 if (!onlyIfAbsent || oldValue == null)
35 e.value = value;
36 afterNodeAccess(e);
37 return oldValue;
38 }
39 } // 结束哈希冲突的解决,已完成键值对的插入
40 ++modCount; // 更新hashMap的修改次数
// 键值对数量+1 超过threadshold时,提前扩容,便于下一次put
41 if (++size > threshold)
42 resize();
43 afterNodeInsertion(evict);
44 return null;
45 }

上述代码块的核心逻辑是:

  • 1)数组table是否为空?为空则通过扩容的方式初始化
  • 2)要插入的键是否与桶中第一个节点的键是同一个(equals)?若是,则标记该节点并进行第4步,若不是,则遍历树或链表。
  • 3 ) 遍历树或链表,待插入的键是否已存在?若存在,则标记该节点进行第4步,若不存在,则插入链表的尾节点或红黑树对应节点。
  • 4)上述3步已完成了带插入节点e的定位,根据onlyIfAbsent判断是否用新值覆盖旧值
  • 5)最后,根据键值对数量与threadshold的比较,判断是否需要进行扩容

get

hashMap查找操作比插入操作更简单,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

/**
* Implements Map.get and related methods
*
* @param hash hash for key
* @param key the key
* @return the node, or null if none
*/
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
// 定位键值在数组中的下标位置,记为first
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 判断first是否为要查找的值,目的是省去后面遍历链表或树
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// fisrt不是要查找的值,遍历链表或树,找到就跳出循环,找不到返回Null
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

扩容机制

在调用put方法将元素插入后,会判断是否超出负载因子*容量,超出后便调用resize()方法进行扩容。

那么hashmap什么时候进行扩容呢?当hashmap中的元素个数超过(数组大小*loadFactor)时,就会进行数组扩容,loadFactor的默认值为0.75,也就是说,默认情况下,数组大小为16,那么当hashmap中元素个数超过160.75=12的时候,就把数组的大小扩展为216=32,即扩大一倍,然后重新计算每个元素在数组中的位置,而这是一个非常消耗性能的操作

JDK1.7的扩容

JDK1.7版本的HashMap是数组+链表的结构

扩容机制:新生成一个数组,然后拷贝旧数组里面的每一个数组元素表示的链表(Entry)到新数组里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) { //当当前数据长度已经达到最大容量
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity]; // 创建新的数组
boolean oldAltHashing = useAltHashing;
useAltHashing |= sun.misc.VM.isBooted() &&
(newCapacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD);
boolean rehash = oldAltHashing ^ useAltHashing; // 是否需要重新计算hash值
transfer(newTable, rehash); // 将table的数据转移到新的table中
table = newTable; // 数组重新赋值
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); //重新计算阈值
}


// transfer()方法负责创建将旧数组移动至新数组
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {

while(null != e) {
Entry<K,V> next = e.next; // 先把next存下来,最后再处理
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity); // 计算每个元素在新数组中的位置
// 将元素插入新数组,使用头部插入法
e.next = newTable[i]; // e.next 指向 newTable[i]
newTable[i] = e; // newTable[i]赋值为e,完成了e插入newTable[i]对应链表的头部
e = next; // 处理next节点
}
}
}

假设有下面的数组大小为2,loadFactory为默认值0.75的HashMap,在插入第二个元素后(key = 5)会进行扩容,扩容的新数组大小为4(假设hash的计算方法是key%数组大小)

1567598584397

创建新数组,假设hash的计算方法是key%数组大小,那么key为3和7的Entry均应落在新数组下标为3的位置,即newTable[i]的i为3。此时进入while(null != e)开始遍历旧数组下标为1的Entry

1567599032354

resize初始阶段
执行`e.next = newTable[i]`,经计算,i的值为3,新的e.next 指向了新数组`newTable[3]`,`newTable[3]`是空的,所以新的e.next指向了null;

执行newTable[i] = e,将e插入newTable[3]对应的链表的头部

执行e=next,此处的next存放的是旧的e.next,即上图中e.next(key=7), 将e置为key的7的元素。

经过上述三步,HashMap处于下图的状态,记为resize第二阶段完成:

1567599724514

resize第二阶段结束
然后,再进行一次while循环,将key的7的元素移动至`newTable[3]`的头部,next指向了key=3的元素,这样就完成了整个Resize的过程。

1567599830776

resize完成
## JDK1.7扩容的线程不安全问题

为了便于理解,将transfer简化,只留下关键步骤:假设两个线程同时执行put操作,进入了transfer环节。

1
2
3
4
5
6
7
8
0  while(null != e) {
1 Entry<K, V> next = e.next; // 线程1开始,此时e.next指向key为7的Entry
2 e.next = newTable[i];//线程1继续,e.next指向newTable[3],newTable[3]为空,e.next指向null
3 newTable[i] = e; //线程1继续,将newtTable[3]赋值为key为3的Entry
// 线程2在此刻开始执行第1步 Entry<K, V> next = e.next = null
// 线程2执行第2步,e.next = newTable[3] = e
4 e = next;
5 }

代码块的注释是按照时间顺序的,在第3行线程1将newTable[3]的赋值为key为3的Entry后(未执行e=next),线程1的状态如下;
1567645309918

线程1执行完1~3步
线程2开始执行第1步,` Entry next = e.next`,`e.next`此时被线程1修改为了null,所以对于线程2,`next = e.next = null`,然后线程2执行第二步,`e.next = newTable[3]`,`newTable[3]`已被线程1插入了e,所以对于线程2`e.next = e`,然后,线程2执行第3步,将key为3的Entry赋给`newTable[3]`,这个与线程1的操作重复了,不会有影响。此时,线程2的状态如下图。

1567647285939

然后,线程2执行最后一步,e = next,发现e = null不满足while条件,跳出循环,此时,并没有出现死循环的问题,现在的问题仅仅是链表出现了闭环。

待下一次扩容时,才会出现死循环的问题,e.next永远为e,再也无法跳出循环。

上述只是链表闭环的一种情况,而且是简化版,实际情况过多,逐个赘述没有意义,不再赘述,重点是需要了解到多线程put时出现resize,可能会导致链表闭环,从而CPU占用率达到100%。

JDK1.8的扩容

在JAVA 8版本,HashMap改进了扩容的方式,不再使用JDK 1.7的头部插入法。

为了便于理解,只摘取源码中扩容相关的核心代码:一个for循环用于将旧的数组每个元素e迁移至新数组中。

  • 如果e.next == null,表示旧数组中该位置没有哈希碰撞,直接计算e在新数组中的位置并赋值
  • 如果e是TreeNode的实例,计算在新数组的下标,并添加到新数组对应位置的红黑树中,先不赘述
  • e是链表,按照链表的逻辑插入,本节主要介绍链表情况下的插入,用以比较与JDK1.7的不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1  for (int j = 0; j < oldCap; ++j) {
2 Node<K,V> e;
3 if ((e = oldTab[j]) != null) {
4 oldTab[j] = null;
5 if (e.next == null)
6 newTab[e.hash & (newCap - 1)] = e; //如果e后面没有元素,说明没有哈希碰撞,直接赋值
7 else if (e instanceof TreeNode)
8 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);//如果e是红黑树,添加到红黑树
9 else { // preserve order
10 Node<K,V> loHead = null, loTail = null;
11 Node<K,V> hiHead = null, hiTail = null;
12 Node<K,V> next;
13 do {
14 next = e.next;
15 if ((e.hash & oldCap) == 0) { // 此处是重点,后文有分析
16 if (loTail == null) // 判断链表尾节点是否为null,即链表是否为空(不为空17 的链表尾节点不可能为null,只包含一个节点的链表头节点与尾节点是同一个节点)
18 loHead = e; // 如果为null,表示链表为空,loHead = e
19 else
20 loTail.next = e; // 不为null,说明链表已有元素,将尾部指向e
21
22 loTail = e; // 将e插入链表尾部
23 }
24 else { // 与e.hash & oldCap) == 0原理一样,只不过是插入至另一个链表
25 if (hiTail == null)
26 hiHead = e;
27 else
28 hiTail.next = e;
29
30 hiTail = e;
31 }
32 } while ((e = next) != null);
33
34 // 链表尾节点指向null、数组放置链表头节点
35 if (loTail != null) {
36 loTail.next = null;
37 newTab[j] = loHead;
38 }
39 // 与loTail相同
40 if (hiTail != null) {
41 hiTail.next = null;
42 newTab[j + oldCap] = hiHead;
43 }
44 } // else end
45 } // for (int j = 0; j < oldCap; ++j) end

e是链表的情况下,声明了loHead、loTail、hiHead、hiTail,他们分别是lo和hi链表的头结点与尾节点。lo链表会插入与旧数组相同的下标处newTab[j] = loHead,hi链表插入旧下标偏移旧容量处newTab[j+oldCap] = hiHead。

上述代码块都很好理解,建立了两个链表,依次在末端插入新的元素,在多线程环境下,JDK 1.7由于头部插入法导致了链表闭环,JDK 1.8采用尾部插入法,有效的避免了JDK1.7链表闭环的问题。但是

  • 没有解决JDK1.7中的数据丢失的问题,比如多个线程同时put的时候,当index相同而又同时达到链表的末尾时,另一个线程put的数据会把之前线程put的数据覆盖掉,就会产生数据丢失。
  • 多线程下操作同一对象时,对象内部属性的不一致性还会导致死循环

不过,HashMap本来就不是为多线程环境设计的,多线程应该使用ConcurrentHashMap。

现在把目光放在上述代码块第15行的if ((e.hash & oldCap) == 0),它用来判断元素放置在lo链表还是hi链表,这两个链表分别插入在数组下标为旧下标newTab[j]和旧下标+旧容量处newTab[j+oldCap]。上述代码块的第一个注释为如果e后面没有元素,说明没有哈希碰撞,直接赋值,此处将e放置在了newTab[e.hash & (newCap - 1)]处。而在前面介绍的put方法中,也出现了tab[(n - 1) & hash],我们发现总是将n-1、hash进行与操作(&),n表示数组容量,它总是2的n次幂,n的二进制如下:

表2.1 数组容量与二进制值
| 数组容量N的十进制值 | 数组容量N的二进制值 | N-1的二进制值 | | ------------------- | ------------------- | ------------- | | 2 | 0000 0010 | 0000 0001 | | 4 | 0000 0100 | 0000 0011 | | 8 | 0000 1000 | 0000 0111 | | 16 | 0001 0000 | 0000 1111 | | .... | 000100000.. | 0001111... |

数组容量N的二进制值总是只有1位为1,其他位全是0,对于N-1的二进制,后img位全部为1,将img位记为标记位M,那么对于2N-1的二进制,它的标记位为M+1。(n-1)&hash实际上就是hash%(n-1),只不过JDK1.8采用了速度更快的位运算。
根据HashMap的数据散落原理,是取hash值然后对数组的大小取余,且每次扩容后,容量为扩容前的二倍,那么将旧数组容量oldCap记为N:

​ e.hash &(N-1) = oldPos // 标记位为M

​ e.hash & (2N-1) = newPos // 标记位为M+1

它们等价于:

​ e.hash & 0000 0111…. = oldPos // M个1

​ e.hash & 0000 1111…. = newPos // M+1 个1

上述两个式子分别用来计算e在新旧数组中的位置newPos、oldPos,它们实际上是e.hash对一个低位全部为1,高位全部为0的二进制的与运算,所以在计算Position时,e.hash比标记位更高的位是无意义的,这个标记位取决于数组的容量大小:

  • 对于旧的数组,容量N、标记位M,只考虑e.hash的低M位。
  • 对于新的数组,容量2N、标记位M+1,只考虑e.hash的低M+1位。

现在来看if ((e.hash & oldCap) == 0)的意义:

​ e.hash & N = 0 ==> e.hash & 0000…01000 = 0

N的二进制中唯一的1出现在标记位M+1处,即oldCap的唯一的1出现在此处,所以这个if语句用来判断e的hash值的倒数M+1位是否为0:

  • 如果e.hash倒数第M+1位为0,有下面三个式子:

    e.hash & 0000 0111…. = oldPos // M个1

    e.hash & 0000 1111…. = newPos // M+1 个1

    e.hash & 0000 1000…..= 0

    因此,newPos = e.hash & 0000 1111… = e.hash & 0000 0111…. =oldPos

    只考虑e.hash的低M位,与旧数组中位置计算方式一样,得到的结果也必然相同,所以e在新旧数组中位置相同。

  • 如果 e.hash倒数第M+1位不为0(为1),则有下面的三个式子:

    e.hash & 0000 0111…. = oldPos // M个1

    e.hash & 0000 1111…. = newPos // M+1 个1

    e.hash & 0000 1000…..= 000..10000 // 倒数第M+1位为1

    因此,newPos = e.hash & 0000 1111… = (e.hash & 0000 0111….)+ (e.hash & 0000 1000…..) = oldPos + oldCap

通过以上的位运算,不需要再重新计算hash值,即可完成旧数组向新数组的迁移,大大地提高了效率

equals和hashCode

equals

Object的equals比较的是对象的内存地址。源码如下:

1
2
3
public boolean equals(Object obj) {    
return (this == obj); // 如果对象内存地址相同,返回true
}

而在实际的业务中,往往判断两个对象是否equals时,是根据对象所对应的“值”去判断的,这时,就需要重写equals方法。比如JDK的String类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean equals(Object anObject) {    
if (this == anObject) { // 如果对象内存地址相同,返回true
return true;
}

if (anObject instanceof String) {
String anotherString = (String)anObject;
int n = value.length;
if (n == anotherString.value.length) { // 比较两个字符串长度是否相等
char v1[] = value;
char v2[] = anotherString.value;
int i = 0;
while (n-- != 0) { // 逐个字符比较,全部相等时返回true
if (v1[i] != v2[i])
return false;
i++;
}
return true;
}
}

return false;
}

hashcode

java.lang.Object中对hashCode的约定:如果两个对象根据equals方法比较是相等的,那么调用这两个对象的任意一个hashcode方法都必须产生相同的结果。比如String类,因为重写了equals,那么必须重写hashcode。

首先看Object的hashcode的计算方式(参考博客),下面的代码为openjdk1.8的Native源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static inline intptr_t get_next_hash(Thread * Self, oop obj) {
intptr_t value = 0 ;
if (hashCode == 0) {
// This form uses an unguarded global Park-Miller RNG,
// so it's possible for two threads to race and generate the same RNG.
// On MP system we'll have lots of RW access to a global, so the
// mechanism induces lots of coherency traffic.
value = os::random() ;
} else
if (hashCode == 1) {
// This variation has the property of being stable (idempotent)
// between STW operations. This can be useful in some of the 1-0
// synchronization schemes.
intptr_t addrBits = cast_from_oop<intptr_t>(obj) >> 3 ;
value = addrBits ^ (addrBits >> 5) ^ GVars.stwRandom ;
} else
if (hashCode == 2) {
value = 1 ; // for sensitivity testing
} else
if (hashCode == 3) {
value = ++GVars.hcSequence ;
} else
if (hashCode == 4) {
value = cast_from_oop<intptr_t>(obj) ;
} else {
// Marsaglia's xor-shift scheme with thread-specific state
// This is probably the best overall implementation -- we'll
// likely make this the default in future releases.
unsigned t = Self->_hashStateX ;
t ^= (t << 11) ;
Self->_hashStateX = Self->_hashStateY ;
Self->_hashStateY = Self->_hashStateZ ;
Self->_hashStateZ = Self->_hashStateW ;
unsigned v = Self->_hashStateW ;
v = (v ^ (v >> 19)) ^ (t ^ (t >> 8)) ;
Self->_hashStateW = v ;
value = v ;
}

value &= markOopDesc::hash_mask;
if (value == 0) value = 0xBAD ;
assert (value != markOopDesc::no_hash, "invariant") ;
TEVENT (hashCode: GENERATE) ;
return value;
}

JDK1.8默认采用的是最后一个else语句中的计算方式xor-shift算法,该算法根据四个初始值可以生成一系列随机数。

1
2
3
4
5
// xor-shift 伪随机数生成算法
unsigned long xor128(){
static unsigned long x=123456789,y=362436069,z=521288629,w=88675123;
unsigned long t;
t=(xˆ(x<<11));x=y;y=z;z=w; return( w=(wˆ(w>>19))ˆ(tˆ(t>>8)) );

其中,hashStateX计算方式为hashStateX=Slef->os::random(),hashStateY、hashStateZ、hashStateW都是固定初始值,最终的Value由这四个值计算得出。

在第一个if语句if(hashCode==0)中有一行注释: so it’s possible for two threads to race and generate the same RNG,说明了两个线程竞争调用os::random()时有可能产生相同的随机数,这个是我们不想看到的,因此,JDK采用了xor-shift算法,即便初始值相同,产生的随机数也不同,有效的规避了这种情况,最终产生的随机数是线程相关的,支持多线程并发,有可能是目前最好的hashCode算法。(最后一个else语句中的注释:Marsaglia’s xor-shift scheme with thread-specific state This is probably the best overall implementation )

总之,默认地,Object.hashCode()产生一个线程安全的唯一随机值,可以通过在JVM启动参数中添加-XX:hashCode=2,改变默认的hashCode计算方式,JVM参数里的hashCode为0时hashCode的计算方式是os随机值,多线程竞争时可能会出现重复,为1时是对象的内存地址做位移运算后与一个随机数进行异或得到的结果,为2时是固定值、为3时是自增值

再看String类的hashcode方法,String对象的hash值与每个字符都有关,所以该hash值更看重的是业务层面的“值”的比较。这也对应了重写了equals,那么必须重写hashcode的约定。

1
2
3
4
5
6
7
8
9
10
11
public int hashCode() {    
int h = hash;
if (h == 0 && value.length > 0) {
char val[] = value;
for (int i = 0; i < value.length; i++) { // 逐个字符修改原始hash值
h = 31 * h + val[i];
}
hash = h;
}
return h; //返回的hash值与每个字符相关。满足equals的两个不同的new String对象,对应完全相同的hash值
}

使用重写过equals()和hashcode()的对象作为Map的键

在使用hashMap时,判断是不是同一个键的条件如下,为true时表示同一个键

1
if (e.hash == hash &&((k = e.key) == key || (key != null && key.equals(k))))

对于第一个条件:比较的是Node的hash值,它通过构造方法设置:

1
2
3
4
5
6
7
// Node的hash值通过构造方法设置
Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

以put为例,调用过程如下,可以看出,Node的hash值等于key的hash值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true); // 调用putVal
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null); // 调用newNode()
else
... // 省略,与此处逻辑无关
}

Node<K,V> newNode(int hash, K key, V value, Node<K,V> next) {
return new Node<>(hash, key, value, next);
}

因此,满足下面任一个条件即视为同一个键:

  • key的hash值相同且key为同一个(==表示指向同一个堆地址,是同一个对象)
  • 两个键满足equals

对于第一个条件,与key的hash值完全相关,前面介绍了JDK1.8hash值默认是一个线程相关的随机数,而在业务层面上,往往键都是有实际意义的,因此,尽量不要使用JDK的随机数作为对象的hash值。如果不注意,将没有重写hashCode()的对象作为了Map的Key,在调用get、put、remove方法时,可能会出现一些意外的错误,比如,你不想让两个键相同(这里的相同指的是值相同,业务意义相同,比如人名、身份证ID)的对象同时出现在Map中,那么,有必要重写hashCode,否则会出现冲突,例子如下:。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Person {
String name;

public Person(String name) {
this.name = name;
}
}

Person tom = new Person("Tom")
Person tom2 = new Person("Tom")

Map<Person, Blog> map = new HashMap<>();
map.put(tom);
map.put(tom2);

上面的例子中,你本来不想让姓名重复的用户插入到Map中,但是因为没有重写Person的hashCode,导致这两个对象tom、tom2的hashCode不同,都成功插入Map。

对于第二个条件,Object.equals()比较的是键的内存地址,这也与业务无关。示例可参考上面的示例。

因此,尽量使用那些已经覆写了equals和hashCode的类,比如String、Integer等,如果要使用自定义的类作为hashMap的key,要覆写equals和hashCode方法,将它改为值的判断,避免使用时出现意外的错误。

细节疑问

  1. 为什么HashMap实现了Serializable接口,却将table声明为transient?(声明为transient,不会被序列化)

    答:参考StackOverfolw回答,HashMap使用wirteObject、readObject实现自定义的序列化和反序列化,序列化时记录了的table的size、键值对的size、以及所有的key-value映射,没有序列化table(Node数组),Node包含了hash值、key、value、以及下一个Node指针,Node的作用是便于遍历键值对,table不序列化的目的只是为了节省空间,当反序列化的时候,通过readObject重新构建table。

  2. 为什么链表长度为8时进行树化,怎么不是2、4、16、32?

    红黑树占据的空间是链表的两倍,删除和新增数据都需要调整树,所以会尽量避免使用红黑树,在hashMap源码196行的有一段注释提到:当选择计算hash值的算法足够好时,数据均匀分布,呈现泊松状,同一个桶中的节点数等于8的概率为亿分之6,因此,为8时链表转换为红黑树的概率已经极低了。

    • 一个桶中链表长度达到4的概率为1.5%,此时链表查询复杂度为4,红黑树为2,差距不大
    • 一个桶中链表长度为8的概率为亿分之6,此时链表复杂度为8,红黑树为3,差距开始明显,有必要树化
    • 一个桶中长度为16的概率更低,除非你重写的hashCode方法真的很烂,此时链表复杂度为16,红黑树为4,差距较大,树化已经晚了
  3. 为什么是树的节点数是6的时候,退化成链表,怎么不是8?

    如果是8,或者7,假如有大量的操作在长度7和8之间来回切换,这种结构的变换导致耗时更多,所以用6进行一个过渡。

threadLocal原理

发表于 2019-09-12 | 分类于 并发编程

强引用、软引用、弱引用、虚引用

在了解threadLocal之前,有必要了解JAVA中的四种引用:

  • 强引用:正常new出来对象就是强引用,当内存不够的时候,JVM宁可抛出异常,也不会回收强引用对象。
  • 软引用(SoftReference):软引用生命周期比强引用低,在内存不够的时候,会进行回收软引用对象。软引用对象经常和引用队列ReferenceQueue一起使用,在软引用所引用的对象被GC回收后,会把该引用加入到引用队列中。
  • 弱引用(WeakReference):弱引用生命周期比软引用要短,在下一次GC的时候,扫描到它所管辖的区域存在这样的对象: 一个对象仅仅被weak reference指向, 而没有任何其他strong reference指向,,不管当前内存是否够,该对象都会被回收。弱引用和软引用一样,也会经常和引用队列ReferenceQuene一起使用,在弱引用所引用的对象被GC回收后,会把该引用加入到引用队列中。
  • 虚引用(PhantomReference):又叫幻象引用,与软引用,弱引用不同,虚引用指向的对象十分脆弱,我们不可以通过get方法来得到其指向的对象。它的唯一作用就是当其指向的对象将被回收时,自己被加入到引用队列,用作记录该引用指向的对象即将被销毁。

finallized方法: 当对象变成(GC Roots)不可达时(第一次回收),GC会判断该对象是否覆盖了finalize方法,若未覆盖,则直接将其回收。否则,若对象未执行过finalize方法,将其放入F-Queue队列,由一低优先级线程执行该队列中对象的finalize方法。执行finalize方法完毕后,GC会再次判断该对象是否可达(第二次回收),若不可达,则进行回收,否则,对象“复活”。因此,对于重写了finallized方法的对象,会出现两个垃圾回收周期,这两个周期之间可能相隔了很久(取决于finalized方法执行是否及时),所以可能会出现大部分堆被标记为垃圾却还没有被回收,出现内存溢出的错误。

使用虚引用,上述情况将引刃而解,当一个虚引用加入到引用队列时,你绝对没有办法得到一个销毁了的对象。因为这时候,对象已经从内存中销毁了。因为虚引用不能被用作让其指向的对象重生,所以其对象会在垃圾回收的第一个周期就将被清理掉。

ThreadLocal

通常情况下,线程中对全局变量赋值后,可以被任何一个线程访问并修改的。

而创建全局变量ThreadLocal,通过ThreadLocal全局变量传递局部变量,该局部变量只能被当前线程访问,而且可以在线程的上下文传递,其他线程则无法访问和修改。

1
2
3
4
5
6
7
public class Test {
private final ThreadLocal<String> mystr = new ThreadLocal<>();

public void methodA() {
mystr.set("test_str_1");
}
}

实际上通过ThreadLocal设置的值是放入了当前线程的一个ThreadLocalMap实例中,所以只能在本线程中访问,其他线程无法访问。

ThreadLocal的实现原理

每个Thread维护一个ThreadLocalMap映射表,这个映射表的key是ThreadLocal实例,value是真正需要存储的Object。

从set()方法的实现,理解ThreadLocal实现

1
2
3
4
5
6
7
8
9
// jdk1.8 source code 
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

在调用set方法时

  • 首先获取当前线程 Thread.currentThread()
  • 利用当前线程获取一个ThreadLocalMap对象
  • 判断map是否为空,若为空,创建这个ThreadLocalMap对象并设置值,不为空,则设置值。

getMap()方法:

1
2
3
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

在Thread类中,定义了两个属性,threadLocals的初始化是在调用ThreadLocal类中的getMap()方法时完成的,当线程退出时,会将threadLocals和inheritableThreadLocals置为null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ThreadLocal.ThreadLocalMap threadLocals = null; // ThreadLocalMap对象
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; // 子类可继承的ThreadLocalMap对象

// 线程退出后,将threadLocals和inheritableThreadLocals置为null
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}

现在,完成了前两步,获取当前线程的ThreadLocalMap对象。

ThreadLocalMap是ThreadLocal的静态内部类,是基于Entry数组的map。Entry的key是ThreadLocal弱引用,目的是当线程退出时把threadLocal实例置为null时,不再有强引用指向threadLocal实例,不影响threadLocal实例的垃圾回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

在threadlocal的生命周期中,存在这些引用. 看下图: 实线代表强引用,虚线代表弱引用.

img

与上面的分析一致,Entry的key为弱引用,它的引用链是ThreadLocalRef -> ThreadLocal ---> key,当栈中的ThreadLocalRef与堆中的ThreadLocal断开时,ThreadLocal实例就会被垃圾回收。

value为强引用,它的引用链是CurrentThreadRef -> CurrentThread -> ThreadLocalMap -> Entry -> value,只要当前线程没有关闭,CurrentThreadRef -> CurrentThread的引用就不会断开,value就不会被垃圾回收。只有当前thread结束以后, CurrentThread就不会存在栈中,强引用断开, CurrentThread, Map, value将全部被GC回收.

是否存在内存泄露?

上节提到当前线程没有退出,将会一直存在CurrentThread至value的引用链,即便将threadLocal手动设置为null也依然存在CurrentThread至value的引用链。这会给开发者产生一种内存泄露的错觉(错觉:value是通过threadLocal设置的,我明明将threadLocal设置为了null,为什么value还会占用内存?),尤其在使用线程池时更容易出现这样的错觉,因为线程池的线程结束后,会放回线程池中不销毁。

可以理解为:threadLocal没有内存泄露,泄露的是Entry。

JDK的优化

为了减缓这种错觉的产生,Java会在调用threadLocal实例的get、set方法且key为null时,清除Entry。以get方法为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// threadlocal.get()
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this); //此处调用threadlocalMap.getEntry()
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

// threadlocalMap.getEntry()
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e); // 没找到该key(threadlocal)时,调用该方法
}

// hash未命中时调用该方法
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) { // ThreadRef这条链还没断,thread未被销毁,entry不为Null
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null) // threadLocalRef这条链已断开,threadLocal实例为Null
expungeStaleEntry(i); // 删除所有key为null的Entry
else
i = nextIndex(i, len);
e = tab[i];
}

return null;
}

不仅在调用get方法,在调用set、remove方法时,threadLocal为null时,也会最终调用到expungeStaleEntry()方法 ,清除所有threadLocal为null时entry的强引用,这里不赘述了。

因此,正确的使用方式是,首先判断是否存在场景:threadLocal置为null?

如果存在,在调用完set、get后,记得调用remove方法显示的清除Entry的强引用。如果不存在,threadLocal一直在使用,没有被回收的必要,也不care脏读的情况,那更没必要去回收threadLocalMap中的Entry了。

脏读

示例如下,创建一个大小为8的线程池,向该线程池提交100次任务,因为使用的是线程池,线程不会被销毁,所以假设某一个线程写入了值,然后该线程处于空闲态,然后该线程再次读取时,读取到的是上次该线程运行时设置的值。

可能下面的例子很明显就看的出问题所在,但是当项目复杂时,在多处调用get,就比较容易出现这种问题。

不过这种情况也很容易避免,有两种方法:

  • set、get成对出现,set在前、get在后
  • 使用remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test {
private final ThreadLocal<String> mystr = new ThreadLocal<>();

public void methodA() {
ExecutorService executor = Executors.newFixedThreadPool(8);
for (int i=0; i<100; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
if (i % 4 == 0) {
String s = mystr.get();
}
mystr.set("test"+i);
}
});
}

}
}

Hash碰撞

在某个线程中,每new一个ThreadLocal实例,该线程的ThreadLocalMap中就会新增的一个key,当ThreadLocal实例过多时,自然会出现hash碰撞。

和HashMap的最大的不同在于,ThreadLocal.ThreadLocalMap结构非常简单,没有next引用,也就是说ThreadLocalMap中解决Hash冲突的方式并非链表/红黑树的方式,而是采用线性探测的方式,所谓线性探测,就是根据初始key的hashcode值确定元素在table数组中的位置,如果发现这个位置上已经有其他key值的元素被占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

线性探测的方式解决Hash冲突的效率很低,如果有大量不同的ThreadLocal对象放入map中时发送冲突,或者发生二次冲突,则效率很低。所以在开发的过程中,要避免这一点,提高运行效率。

与synchronized的区别

  • ThreadLocal用于处理线程内部上下文变量的传递,变量不会被其他线程访问,而synchronized修饰的变量,只要其他线程获取了锁,就能访问、修改
  • ThreadLocal没有锁的机制,没有锁的开销

MongoDB系列(四)MongoDB副本集与分片

发表于 2019-09-08 | 分类于 数据库

副本集

副本集是一组服务器,其中有一个主服务器(primary),用于处理客户端请求,还有多个备用服务器(secondary),用于保存主服务器的数据副本,如果主服务器崩溃了,备份服务器会自动选举出一个新的主服务器。

一般的,只有主服务器才会用作写操作,备用服务器最多支持读操作,甚至读写均不支持,只用来做备份。

大多数

副本集中有一个很重要的概念是”大多数“,选择主节点时由”大多数“决定,主节点只有在得到”大多数“支持时才能继续作为主节点,这里的”大多数“被定义为副本集中一半以上的成员。它是动态变化的,如果某一个节点挂了,那么”大多数“就有可能发送变化,举个例子:

副本集的成员总数 ”大多数“
1 1
2 2
3 2
4 3
5 3
6 4
7 4

$$
“大多数” >= (总数+1)/2
$$

下图是一个包含5个节点的副本集,其中3个位于数据中心A,另外2个位于数据中心B,如果节点1、2、3全挂了,节点4、5不能选举出主节点。这种规定是有目的的:

对于节点4、5而言,节点1、2、3全挂掉与下图情况(数据中心A与B之间的链路中断了)完全相同,此时,数据中心A中还有3个可用节点,满足”大多数“,会选举出一个主节点,数据中心B只有两个可用节点,如果允许数据中心B选举出主节点,那么会出现两个主节点,这就是为什么一定要保证”大多数“的原因,时刻保证只有一个主节点可用,避免多个主节点写入出现冲突所导致的开发的复杂性。

1566905821363

还有另外一种情况,同样是两个数据中心,A与B的节点数完全相等,A与B链路断开时,任何一边都无法满足”大多数“。为避免这种情况,有两种做法:

  • 将大多数放在同一个数据中心,如节点1、2、3放置在中心A,节点4放在中心B,这样做很简单,但是还会遇到上面的问题,节点1、2全部挂掉了,节点3、4无法提供服务
  • 添加一个仲裁节点,只用来做仲裁,不用来做备份,放置在数据中心C,这样任何一个数据中心的服务器都可以满足”大多数“的条件,这样做的缺点是:将服务器分布到三个地方。

1566906750134

选举机制

当一个备份节点A无法与主节点连通时,它就会请求其他的副本集成员将自己选举为主节点。其他副本集成员会进行以下检查:

  • 自身能否与主节点连通
  • 通过比较A的oplog与自身的oplog,确定A节点的数据是否最新
  • 是否有其他优先级更高的节点请求被选举为主节点

赞成票的权重为1,反对票的权重为-10000,所以即使”大多数“成员中只有一个否决了本次选举,选举就会取消。选举的过程一般只需要几毫秒,实际情况可能会遇到网络问题、服务器过载导致响应慢、选举打成平局,平局后每个成员需要等待30s才能进行下一轮选举,所以,如果有太多错误发生的话,选举可能需要几分钟。

首先关注第二点,确定被选举人的数据是否是最新的。如下图所示,一共有3个节点,主节点网络故障,备用节点1发现主节点无法连通,向备用节点2请求选举自身为主节点,备用节点2首先检查到主节点无法连通,然后将对比备用节点1的local.oplog.rs与自身的local.oplog.rs数据,发现备用节点1的oplog中没有记录自身的最新写操作5,投出反对票。此时,备用节点1获悉自身数据不是最新的,会向备用节点2请求最新的oplog,然后再次请求选举为主节点,新一轮的选举中,之前投否决票的的可以重新投票;同时,备用节点2也可以向备用节点1发起请求,请求被选举为主节点。最终,只要备用节点1与备用节点2之间保持连通的状态,一定能选举出新的主节点。

1566982864577

上面描述的是备用节点1和备用节点2相等时的情况,假如备用节点1和备用节点2已经选举出新的主节点—备用节点1。运行一段时间后,使用下面的命令新添加一个节点,称之为节点3,节点3此时没有任何数据,会从新的主节点(之前的备用节点1)获取最新的oplog,数据更新到最新后,新的主节点(之前的备用节点1)检测到节点3为高优先级节点,新的主节点(之前的备用节点1)主动退位,重新进行选举,直到选举节点3为主节点为止。

1
rs.add({"_id":4, "host":"172.28.70.1:27017", "priority":1.5})

让主节点永远保持最新的oplog是非常重要的,因此,所有的写操作都在主节点进行,在对读取数据一致性要求不高或希望主节点挂掉仍能读数据的场景下,备用节点可以分担主节点读操作的压力。

通过设置readPreference=secondaryPreferred将读请求设置路由至备用节点,建立索引会消耗内存和硬盘空间、降低写操作性能,为进一步缓解主节点压力,

  • 可以设置一个与主节拥有不同索引的备份节点
  • 也可以使驱动程序创建一个直接连接到目标备用节点用作读操作(而不是连接到整个副本集)
  • 甚至部分数据从主节点读另一部分从备用节点读。

心跳

每个节点都需要知道其他成员的状态,用来确定下列信息:

  • 哪个是主节点
  • 哪个挂掉了
  • 是否满足”大多数”
  • 是否有比主节点优先级更高的节点

为了维护集合的最新视图,每个成员每隔2s就会向其他成员发送一次心跳,心跳请求的信息量非常小,用来检查每个成员的状态,获取简要信息。

节点状态

  • STARTUP 节点刚刚启动,还未加载副本集配置
  • STARTUP2 节点加载副本集配置,进行初始化同步
  • RECOVERING 初始化完成,进行检查以确保自身处于有效状态,当节点与其他节点脱节时,也会进入该状态,这时,这个成员处于无效状态,需要重更新同步(不是初始化同步,不过于初始化同步动作是一样的,都是同步oplog),同步完成后,回到正常状态(主节点状态、备份节点状态)
  • PRIMARY 主节点正常运行的状态
  • SECONDARY 备份节点正常运行的状态
  • ARBITER 在正常操作中,仲裁节点始终处于该状态,仲裁节点没有oplog,没有数据
  • DOWN 节点无法连通
  • UNKNOW 所有的节点都无法连通该节点
  • REMOVED 节点被踢出副本集
  • ROLLBACK 回滚,主节点执行一个写操作后挂掉了,备份节点没有复制该操作,新的主节点也会漏掉该操作,旧的主节点重新上线后,会回滚该操作,然后重新同步。
  • FATAL 节点发生不可挽回的错误,也不再尝试恢复正常,这是应该重启该节点、重新同步。

分片

分片(sharding)是指将数据拆分,将其分散在不同的机器上的过程。分片可以分为:

  • 手动分片(manual sharding)
  • 自动分片(autosharing)

几乎所有的数据库都支持手动分片,应用维护与各个服务器之间的连接,每个连接是完全独立的,由应用管理数据的路由规则,这种方式的缺点是:难以维护、向集群新增节点或删除节点都很麻烦、调整分布以及负载模式也不轻松,因此,MongoDB提供了自动分片机制。

基于Mongos的自动分片

MongoDB支持自动分片,使得数据库架构对应用不可见,对于应用而言,好像始终在使用一台单机的MongoDB服务器一样,同时,MongoDB自动处理数据在分片上的分布,也更加容易新增或删除节点。

img

Mongos作为分片集群的访问入口,所有的请求都由mongos来路由、分发、合并,这些动作对客户端驱动透明,用户连接mongos就像连接mongod一样使用,mongos可以是一个或多个,一般部署两个做高可用即可。

Mongos会根据请求类型及片键将请求路由到对应的分片

查询请求

  • 查询请求不包含片键,则必须将查询分发到所有的分片,然后合并查询结果返回给客户端
  • 查询请求包含片键,则直接根据片键计算出需要查询的块(chunk),向对应的分片发送查询请求

插入请求

写操作必须包含片键,mongos根据片键算出文档应该存储到哪个chunk,然后将写请求发送到chunk所在的分片。

更新/删除请求

更新、删除请求的查询条件必须包含片键或者_id,如果是包含片键,则直接路由到指定的chunk,如果只包含_id,则需将请求发送至所有的分片。

其他命令请求

除增删改查外的其他命令请求处理方式都不尽相同,有各自的处理逻辑,比如listDatabases命令,会向每个分片转发listDatabases请求,然后将结果进行合并。

何时分片

通常,不必太早分片,因为分片不仅会增加部署的复杂度、还要求做出设计决策,而且该决策在以后很难再改。

另外,不能太晚分片,因为在一个过载的系统上不停机进行分片是非常困难的。

分片的目的:

  • 增加可用内存空间
  • 增加可用磁盘空间
  • 减轻单台服务器的负载
  • 处理单个MongoDB服务器无法承受的吞吐量

随着不断增加分片的数量,系统性能大致会呈线性增长,但是,如果从一个未分片的系统转换为只有几个分片的系统,性能通常会有所下降。由于迁移数据、维护元数据、路由等开销,少量分片的系统与未分片的系统相比,通常延迟更大,吞吐量甚至更小。一般的,至少应该创建3个或以上的分片。

选择片键

使用分片时,最重要、最困难的任务时选择数据的分发方式。对集合分片时,要选择一个或两个字段用于拆分数据。这个键(或这些键)称为片键。一旦拥有多个分片,再修改片键几乎是不可能的,所以必需在一开始就确定好片键。

最常见的片键有三种:

  • 升序片键(ascending key)
  • 随机分发的片键(random key)
  • 基于位置的片键(location-based key)

升序片键

升序片键有点类似于“data“字段或_id字段,是一种会随着时间稳定增长的字段。

假设已存在一个集合,有5百万条数据,以简化的_id(实际_id是24个16进制字符组成,这里简化便于理解)建立三个分片0001-0003,集合根据_id拆分为多个范围的块,$maxKey指正无穷,5000000->$maxKey是一个最大块。之后插入的数据都会在最大块中,最大块不是无限大的,它会继续拆分成多个小块,不过还是有以下缺点:

  • 会导致之后所有的写操作均被路由至分片0003中
  • MongoDB必须不断地将一些块从分片0003移动至其他分片。

优点是:很好的满足范围查询的请求,比如想查询范围在2500000~2500010的文档,mongos直接路由至分片0002就能查询出所有符合条件的文档。

1566993777773

随机分发的片键

随机分发的片键可以是用户名、邮件地址、设备id、md5散列值等没有规律的键。

假设片键是0-1之间的随机数,数据的随机性意味着新插入的数据会比较均衡的分发至不同的块中,因此,各分片的增长速度大致相同,这就减少了需要进行迁移的次数。

随机分发片键的缺点:范围查询要分发到后端所有的分片才能找出满足条件的文档

1566995020969

基于位置的片键

片键是用户IP、经纬度、地址等,数据会依据这个位置进行分组,与该位置接近的文档会保存在同一个范围的块中,优点是可以将数据与相关联的用户、相关联的数据保存在一起。

片键策略

好的片键策略应该拥有如下特性:

  • key 分布足够离散
  • 写请求均匀分布
  • 读请求均匀分布,尽量避免 scatter-gather 查询 (所有读请求皆在一个分片上,targeted read)

目前主要支持2种数据分布的策略,范围分片(Range based sharding)或hash分片(Hash based sharding)

  • 范围分片的策略的一种实现是升序片键,能很好的满足『范围查询』的需求,缺点在于,如果片键有明显递增(或者递减)趋势,则新插入的文档多会分布到同一个块,无法扩展写的能力。
  • Hash分片是根据用户的片键计算hash值(64bit整型),根据hash值按照『范围分片』的策略将文档分布到不同的块。Hash分片与范围分片互补,能将文档随机的分散到各个chunk,充分的扩展写能力,弥补了范围分片的不足,但不能高效的服务范围查询,所有的范围查询要分发到后端所有的分片才能找出满足条件的文档。

举个例子,某IOT应用使用 MongoDB分片集群存储海量设备(假设100W台)的工作日志,设备每10s向 MongoDB汇报一次日志数据(这个量级,无论从写入还是数据量上看,都应该使用 分片,以便能水平扩张),日志包含deviceId,timestamp信息,应用最常见的查询请求是:查询某个设备某个时间内的日志信息

  • 方案1:使用时间戳作为片键,采用范围分片策略

    时间戳是递增的,支持范围分片策略,新的写入都是连续的时间戳,写入请求会集中到同一个分片上,写请求分布不均匀,但是deviceId不是片键,根据deviceId查询会分散到所有的分片上,效率低下。

  • 方案2:使用时间戳作为片键,采用hash分片策略

    由于采用了hash分片策略,保证了写请求均匀分布,与方案1一样,deviceId不是片键,根据deviceId查询会分散到所有的分片上,效率低下。

  • 方案3:使用deviceId作为片键,采用范围分片策略

    如果deviceId是没有明显规则的,写请求会均匀分布,根据deviceId的查询均会路由至该分片,查询的要求是某个设备的某个时间段,所以,路由至该分片后,还需要全表扫描并排序,才能找出该设备某时间段内的日志信息。

  • 方案4:使用deviceId作为片键,采用hash分片

    与方案3deviceId无规则时基本一致。

  • 方案5:使用deviceId+时间戳作为片键,建立复合索引,采用范围分片策略

    同一个设备的数据能够根据时间戳进一步分散到多个chunk,根据deviceId查询时间范围的数据,能够利用复合索引来完成,性能是最优的,不过前提是deviceId无明显规则。

MongoDB系列(三)MongoDB索引

发表于 2019-09-01 | 分类于 数据库

索引是特殊的数据结构,它以易于遍历的形式存储部分集合数据集。索引存储特定字段或字段集的值,按字段值排序。

MongoDB的索引几乎与传统的关系型数据库索引一模一样,第二章提到的_id实际上也是一个索引,MongoDB的数据按照_id的顺序存储在内存页与磁盘块上。但是,_id与业务毫无关联,在业务相关的条件查询时,还是需要进行全表扫描才能找到对应页,效率并不高。

  • 为了避免性能瓶颈,可以根据常用的查询建立索引
  • 索引的值是按照一定的顺序排列的,因此,使用索引键对文档进行排序效率非常高。

不过,使用索引也是有代价的,不仅会增加磁盘与内存的消耗,对于添加的每一个索引,每次写操作(插入、更新、删除)都会耗费更多时间,这是因为,数据发生变动时,还需要额外的开销更新索引。

聚簇索引与非聚簇索引

在介绍索引之前,先了解下聚簇索引与非聚簇索引。
磁盘上的数据某一时刻只能有一种排序方式,而聚簇索引的特点是:索引顺序与数据存储顺序一致,所以聚簇索引只能有一个。

《数据库原理》中对聚簇索引的定义:聚簇索引的叶子节点是数据节点,非聚簇索引的叶子节点仍然是索引节点,只不过有指向对应数据块的指针。

所以Mysql的InnoDB引擎的主键索引是聚簇索引、MyIsam引擎使用的是非聚簇索引。

MongoDB不会将_id索引与文档内容放在一起,所以MongoDB的_id索引不是聚簇索引,mogoDB将数据与索引分开存放,通过RecordId间接引用。假设为字段”name“创建了索引,主键id为主键索引,那么该集合就通过索引查找RecordId,再查找数据。

1566960990983

主键索引

前面提到的_id索引是默认的主键索引,与业务相关联的项不适合用作主键(难以保障全局唯一、非null),建议使用_id作为主键。

单字段索引

即对单个filed建立索引,也是常说的“普通索引”;建立索引时可以指定索引数据的order:正序还是倒序。MongoDB 3.0后的版本,使用createIndex、ensureIndex是一样的,均是创建索引的命令。

1
2
db.mycollection.ensureIndex({"name":1}) //对score字段建立索引、1表示正序、-1表示倒序
db.mycollection.createIndex({"name":1}) // MongoDB 3.0后的版本,可以使用createIndex

复合索引

两个或两个以上的键建立索引,可以减小检索的范围。复合索引与Mysql一样,也是按照左侧匹配规则,这里不赘述,主要介绍下复合索引与排序共用的情况。

首先,在集合”myc“上创建一个复合索引:

1
db.myc.ensureIndex({"age":1,"name":1}); // 索引1

再创建一个:

1
db.myc.ensureIndex({"name":1,"age":1}); // 索引2

这两个复合索引的唯一区别就是键顺序不同,排序规则都是正序(1表示正序、-1表示倒序)。

由于存在了多个索引,使用hint命令指明使用哪个索引。

1
2
3
4
5
// 查询1使用索引1
db.myc.find({"age":{"$gte":21, "$lte":30}}).sort({"name":1}).hint({"age":1,"name":1});

// 查询2使用索引2
db.myc.find({"age":{"$gte":21, "$lte":30}}).sort({"name":1}).hint({"name":1,"age":1});
  • 对于查询1,先根据索引age查找复合条件的结果集,然后在内存中排序(age索引是有序的,但是排序规则用不到)
  • 对于查询2,遍历整个索引树,找出所有匹配的文档,不需要排序(name索引本身就是有序的),按正序遍历即可。

查询1和查询2究竟哪个性能更强,取决于结果集的大小,一般的,结果集越大,在内存中排序耗时越久,超过一定大小(32MB)后,MongoDB会抛出异常,拒绝对如此多的数据排序。一般的:

  • 结果集只有几条、十几条,使用查询1,排序的开销跟遍历树的开销相比并不大
  • 结果集有几百条、甚至几千条,使用查询2,排序的开销显得过大。
  • 结果集有几万条,使用查询1或查询2建议具体比较一下

结果集的大小可以使用limit关键字人为限制:

1
db.myc.find({"age":{"$gte":21, "$lte":30}}).sort({"name":1}).limit(1000).hint({"name":1,"age":1}); \\使用查询2,并限制结果集

最后,具体使用哪种查询,使用explain关键字在shell中比较一下再做选择。

1
2
3
db.myc.find({"age":{"$gte":21, "$lte":30}}).sort({"name":1}).hint({"age":1,"name":1}).explain()[`millis`]; \\获取查询1耗时

db.myc.find({"age":{"$gte":21, "$lte":30}}).sort({"name":1}).hint({"age":1,"name":1}).explain()[`millis`]; \\获取查询2耗时

唯一索引

唯一索引用来确保集合的每一个文档的指定键都有唯一值,允许null值。例如:在集合mycollection中,给”name“键建立唯一索引,试图插入重复name的值时,会抛出异常,也会影响效率。

1
db.mycollection.ensureIndex({"name":1}, {"unique":true});

使用场景:应对偶尔可能会出现重复的键重复问题,而不是在运行时对重复键进行过滤。比如:为避免消息重复消费,可以为”消息id“键创建唯一索引。

复合唯一索引

复合的唯一索引,单个键的值可以相同,但所有键的组合值必须是唯一的。

例如,如果有一个{”username”:1, “age”:100}上的唯一索引,下面的插入是合法的,不会报错。

1
2
3
db.mycollection.insert({"username":"bob"});
db.mycollection.insert({"username":"bob", "age":23});
db.mycollection.insert({"username":"fred", "age":23});

去除重复

在已有的集合上创建唯一索引时,可能会失败,因为集合中可能已经存在重复的值了。此时,有三种办法:

  • 找出重复数据,想办法去除

  • 直接删除重复的值,创建索引时使用”dropDups“选项,如果遇到重复的值,只会保留第一个值。正是由于这种不确定性(不确定哪条记录被删除),MongoDB 3.0以后移除了该选项。

  • 新建一个集合,建立索引,然后把旧集合的数据拷贝至新集合

    1
    db.mycollection.ensureIndex({"username":"bob"},{"unique":true,"dropDups":true})

稀疏索引

唯一索引会把null看做值,假如集合中有以下两个文档,假设对键”age“建立唯一索引,则文档2中的"age"就是null

1
2
{"name":"bob", "age":23} // 文档1
{"name":"bob"} // 文档2

现在想新增文档3,是无法添加的,因为文档3中”age“也是null,与文档2冲突了,违反了唯一性。

1
{"name":"bob", "addresss":"sz"} // 文档3

此时,应该使用稀疏索引(sparse index),就可以插入文档3,同时也能保证文档4无法插入,满足唯一性。

1
2
3
4
// 创建稀疏索引
db.ensureIndex({"age":1}, {"unique": true, "sparse": true});

{"name":"dod", "age":23} // 文档4

稀疏索引定义如下:如果集合中的文档存在索引键,则必须是唯一的,如果文档不存在索引键,则不要求该文档的唯一性。

注意事项:

根据是否使用稀疏索引,查询结果可能有所不同。例如:对于下面的查询,查询1和查询2是完全相同的语句,不同的是,查询1对应未创建稀疏索引的情况,查询2对应创建稀疏索引的情况。

1
2
db.mycollection.find({"age":{"$ne":23}}) // 查询1,未创建稀疏索引
db.mycollection.find({"age":{"$ne":23}}) // 查询2

查询结果如下,查询2没有查询到文档,这是因为建立了稀疏索引后,查询只根据索引查询,不再全表扫描,因此,会遗漏那些没有索引键的文档。如果一定要获取与查询1相同的结果,通过hint命令指明不使用索引,执行全表扫描。

1
2
3
4
5
6
// 查询1的查询结果
{"name":"bob"}
{"name":"bob", "addresss":"sz"}

// 查询2的查询结果
// nothing...

TTL索引

TTL(Time-to-live index)索引指具有生命周期的索引,这种索引会为文档设置一个超时时间,一旦文档存活时间超过该时间就会被删除。这种类型的索引可以用在:消息日志、服务器会话等具有时效性的场景。

在"createdTime"字段上创建TTL索引:

1
db.mycollection.createIndex({"createdTime":1}, "expireAfterSecs": 60*60*24)

"createdTime"字段必需是日期类型,一般设置为当前时间,

​ 记录被删除的时间点="createdTime"字段对应的时间点+"expireAfterSecs"对应的单位为秒的时间段

为了避免活跃的会话被删除,可以在会话上有活动发生时,更新"createdTime"为当前时间。

一个集合上可以创建多个TTL索引。

全文索引

与Mysql一样,MongoDB也支持全文检索。创建全文索引的开销较大,MongoDB本身就很耗内存,在一个操作频繁的集合上创建全文索引更容易导致内存不足,全文本索引的集合写入性能更差、分片时迁移速度更慢,一般的,如果不是特别强烈的业务需要,不建议使用全文索引。

在"mytext"字段上创建全文索引:

1
db.mycollection.ensureIndex({"mytext":"text"})

使用全文索引检索关键字"keyword":

1
db.mycollection.find({$text:{$search:"keyword"}})

地理空间索引

MongoDB支持几种类型的索引,最常见的是2dsphere索引(用于球面图)和2d索引(用于平面图)。这里只简单介绍下这两种索引的创建:

1
2
db.mycollection.ensureIndex({"myloc":"2dsphere"})
db.mycollection.ensureIndex({"myloc":"2d"})

优化

如果数据库中已有大量数据,此时建立索引将会导致大量的IO操作(内存,磁盘读写),耗时较长。MongoDB提供了2种方式:foreground和background。

  • foreground即前台操作,它会阻塞用户对数据的读写操作直到index构建完毕,即任何需要获取read、write锁的操作都会阻塞,默认情况下为foreground;
  • background即后台模式,不阻塞数据读写操作,独立的后台线程异步构建索引,此时仍然允许对数据的读写操作;其中background比foreground更加耗时。

查询优化

  • 对频繁访问的查询,尽量使用覆盖索引,如果一个索引包含(或者说覆盖)所有需要查询的数据,就称为“覆盖索引”,使用覆盖索引时,需要强制不显示objectId字段。

    1
    2
    db.mycollection.createIndex({"name", 1})
    db.mycollection.find({"name":bob, "_id":0}) // 0表示不显示该字段
  • 选用差异性较强的字段作为索引,不要选用类似于性别、国家这种字段作为索引键。

  • 需要哪些字段查询哪些字段,尽量不要查询整个文档

  • 使用hint强制使用特定的索引

  • 使用explain对比分析多种查询方式的性能

写操作优化

  • 尽量不要创建过多的索引,索引会增加该集合写入、更新、删除的开销,因为要额外维护索引
  • 合理设置journal相关参数,journal日志实现日志预写功能,开启journal保证了数据持久化,但也会存在一定的性能消耗,合理的设置commitIntercalMs控制journal写入磁盘的频率,该参数过大,影响MongoDB写操作的性能,该参数过小,MongoDB意外宕机期间预写日志未持久化的可能增大。

MongoDB系列(二)MongoDB存储引擎

发表于 2019-08-22 | 分类于 数据库

在MongoDB 2.6版本之前(包括2.6),只有一种存储引擎:MMAP(Memory mapping,内存映射引擎)。MongoDB 3.0以后,MMAP升级为MMAPv1, 同时提供了插件式引擎API,引入wiredTiger,mongoDB 3.2默认使用WiredTiger引擎,MongoDB 4.0版本删除了MMAP引擎。

MMAPv1引擎

常规的文件系统操作(调用read等函数)为了提高读写效率和保护磁盘,采用的是页缓存机制,读文件时需要先将文件页从磁盘拷贝至页缓存中,页缓存处在内核空间,不能被用户进程直接寻址,还需要将页缓存中的数据再次拷贝到内存对应的用户空间中,所以需要通过两次数据拷贝的过程,才能完成进程对文件内容的获取。

在MMAP操作文件时,创建新的虚拟存储区域,建立文件磁盘地址与虚拟地址的映射关系,此时MMAP只是在虚拟内存分配了地址空间,所以32位的机器,最多支持2GB的文件映射。之后访问数据时,通过已建立好的映射关系,只使用一次数据拷贝,就从磁盘中将数据传入内存的用户空间中,供进程使用。

img

在MongoDB的MMAPv1引擎机制中,服务器启动时,其内存对所有数据文件进行映射,接下来完全由操作系统负责将数据刷新到磁盘,以及管理内存中数据页的交换。

MMAPv1引擎的命名空间与区段

MMAPv1引擎中,每个数据库由一个.ns文件和若干数据文件组成,数据文件从0开始编号,mydb.0、mydb.1、mydb.2等,文件大小从64MB起,依次倍增,最大为2GB。这一特性使得较小的数据库不会浪费过多的空间,而较大的数据库可使用连续的磁盘空间。图中,mydb.1、mydb.2、mydb.3(为便于理解,此处省略了mydb.0)分别是数据库mydb的三个数据文件,mydb.ns的文件用于保存mydb数据库的命名空间元数据,图中未给出。

每个数据库包含多个命名空间(namespace),存放在.ns文件中,单个命名空间128字节,数据库按照命名空间进行组织,每个命名空间中存放特定集合的数据,集合中的文档、索引都拥有自己的命名空间。mydb.ns文件实际是一个hash表,用于快速定位某个namespace的起始位置。

如下图,数据库mydb包含了两个集合c1、c2,对应两个命名空间mydb.c1、mydb.c2,mydb.$freelist是一个特殊的命名空间,用于跟踪记录不再使用的区段(如被删除的集合或索引所在的区段),最后,还有一个预分配命名空间。

每个命名空间的数据可以在磁盘上分为几组数据,即区段。这几个区段在磁盘上未必是连续的(图例中不连续)。

MongoDB也会预分配数据文件,数据文件一旦被填满,就开始预分配,这意味着MongoDB服务器总会为每个数据库维护一个额外的空白数据文件(如图中的mydb.3),以提前避免文件分配失败。使用 -- noprealloc选项可以关闭预分配功能。

mydb.1、mydb.2,分成了分属于不同命名空间的区段,mydb.1有三个区段、mydb.2有四个区段、mydb.3只有一个区段,该区段属于预分配空间。在为命名空间分配一个新的区段时,会先搜索空闲列表mydb.$freelist,查看是否存在合适大小的区段。

1566543786631

前面提到,mydb.ns是一个hash表,一个namespace对应一个集合或索引,该hash表中,一个节点元数据结构如下,每个节点628字节,16MB的.ns文件最多存储26715个namespace。哈希碰撞的概率也较低,采用的线性探针的方式解决哈希冲突。

1
2
3
4
5
struct Node {
Namespace key;
int hash;
NamespcaeDetails value;
}
  • key为namespace的名字,固定分配128字节的空间
  • hash为namespace的hash值
  • value包含该namespace的所有元数据,定义如下
1
2
3
4
5
6
7
8
9
10
11
12
class NamespaceDetails {
DiskLoc firstExtent; // 第一个区段
DiskLoc lastExtent; // 最后一个区段
// 不同大小的删除列表
DiskLoc deletedListSmall[SmallBuckets];
...
}

class DiskLoc {
int _a; // 数据文件编号,mydb.1编号为1,定位文件
int ofs; // 文件内部偏移量,定位文件内部的存储位置
}
  • firstExtent描述了第一个区段的位置
  • lastExtent描述了最后一个区段的位置
  • deletedList描述了各个被删除的元素

通过这些信息,可以遍历一个namespace下的所有区段的有效数据,区段的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct Extent {
unsigned magic; // 魔法数,校验合法性
DiskLoc myLocation; // extent自身位置指针
DiskLoc next; // 下一个extent位置指针
DiskLoc pre; // 上一个extent位置指针
int len; // extent长度
DiskLoc firstRecord; // extent内第一个record位置指针
DiskLoc lastRecord; // extent内最后一个record位置指针
char _extentData[4];
}

class Record {
int _len;
int _extentOfs;
int _nextOfs;
int _preOfs;
char _data[4];
}

// record 被删除后,以deletedRecord存储
class DeletedRecord {
int _len; // record长度
int _extentOfs; // record所在的extent位置指针
DiskLoc _nextDeleted; // 下一个已删除记录的位置
};

一条record对应mongoDB的一个文档,即一条数据记录。同一个区段(extent)下的所有record以双向链表的形式组织,record 被删除后,以deletedRecord存储,deletedRecord以单向链表的形式组织。

MMAPv1引擎CRUD

写入

1、检查namespaceDetail中的deletedList中是否有合适的deletedRecord可以利用,如果有,则删除该记录并复用删除空间。

2、检查数据文件的$freelist里是否有大小合适的不再使用的区段,如果有则复用该空间

3、第1、2步均不成功,创建新的区段,如果当前数据文件没有足够空间创建新区段,创建新数据文件。

删除

删除的记录会以DeleteRecord的形式插入到对应集合的删除链表里,删除的空间在下一次写入新的记录时可能会被利用上;但也有可能一直用不上而浪费。比如某个128Bytes大小的记录被删除后,接下来写入的记录一直大于128B,则这个128B的DeletedRecord不能有效的被利用。当删除很多时,可能产生很多不能重复利用的”存储碎片”,从而导致存储空间大量浪费;可通过compact命令整理碎片。该命令会持有数据库级别的锁。

1
db.runCommand ( { compact: '<collection>' } )

更新

更新跟删除类似,也有可能产生很多存储碎片

  • 更新的Record比原来小,可以直接复用现有的空间(原地更新);多余的空间如果足够多,会将剩余空间插入到DeletedRecord链表;
  • 更新的Record比原来大,更新相当于删除 + 新写入,原来的空间会插入到DeletedRecord链表里。文档需要移动到文件中的其他位置,这种因更新导致的文档位置移动会严重降低写入性能,因为一旦文档移动,集合中的所有索引都要同步修改文档新的存储位置,可通过设置填充因子(paddingFactor)进行优化,比如:如果填充因子为2,一个大小为200字节的文档插入是,会自动在文档后填充100个字节的空间,这样在更新时,会使用第一种方式(更新的record比原来小)。

查询

没有索引的情况下,查询某个Record需要遍历整个集合,读取出符合条件的Record;如果经常需要根据每个纬度查询Record,则需要给集合建立索引以提供查询效率。

MMAPv1引擎锁粒度

MMAPv1 3.0版本之前锁粒度是库,3.0版本后所粒度是集合,即表级锁,不支持事务,原子操作是文档的保存、修改、删除。

WiredTiger引擎

mongoDB 3.2设置wiredTiger为默认的存储引擎(之前版本默认MMAPv1),WiredTiger存储引擎负责将写操作写入cache(B树结构),满足条件后持久化(默认条件每隔60s或达到2GB)

与MMAPv1一样,journal日志(预写式日志,write ahead log,WAL)用于数据恢复。对于write操作,首先被持久写入journal,然后在内存中保存变更数据,条件满足后提交一个新的检测点checkpoint,即检测点之前的数据只是在journal中持久存储,但并没有在mongodb的数据文件中持久化,延迟持久化可以提升磁盘效率,如果在提交checkpoint之前,mongodb异常退出,此后再次启动可以根据journal日志恢复数据。,如果60s内机器宕机,且未开启journal日志,会丢失这60s的数据。journal日志默认每100毫秒同步磁盘一次,每100M数据生成一个新的journal文件,journal默认使用了snappy压缩,检测点创建后,此前的journal日志即可清除。

1566802633253

WiredTiger引擎文件空间分配

MMAPv1引擎中,集合和索引都以命名空间的方式混合存储在数据库文件中mydb.1、mydb.2等,同一个数据库文件存在多个集合的数据,例如mydb.1保存了集合c1、c2,即便删除了某个集合或索引,其占用的磁盘空间也会产生碎片难易清除。文件的存储级别是数据库级别

WiredTiger引擎中,文件的存储级别是集合和索引级别。将每个数据库中的所有集合和索引分别存储在单独的文件中,删除了集合或索引后,对应的文件自动清除,磁盘回收效率更高。

data/db目录下的文件如下:

1566795516284

整体的目录结构如下图:

1566796884855

  • collection.wt存储集合信息,以编号不同区分,collection1.wt、collection2.wt
  • index.wt存储索引信息,编号区分
  • WiredTiger.lock定义锁操作
  • WiredTiger.wt 存储collection.wt与index.wt的元数据
  • WiredTiger.turtle 存储WiredTiger.wt 元数据
  • journal 目录 存储journal日志

WiredTiger引擎存储模型

WiredTiger在执行写入任务时,不是直接写入到磁盘,首先写入的是cache,然后批量持久化,这也是MongoDB吃内存的主要原因,cache中使用B树保存数据,每个B树对应磁盘上的一个物理文件,树节点对应一个内存页、硬盘块,所以根节点与内部节点均会用于存储数据,目的是尽可能减少磁盘IO从而提高性能。

1566801831809

cache中的一个Page对应磁盘上的一个Extent(Mysql Innodb是1对4的关系),Extent大小为4K,存储了一系列键值对。

1566869294605

WiredTiger引擎更新、插入、删除

  • 遍历B树,找到待更新的页(如果cahce中没有热数据,从磁盘中获取,生成一个WT_ROW)
  • 如果有必要,生成预写日志
  • 在待更新的页执行更新、插入、删除操作

当对某个键的值进行更新、删除时,将创建一个用于更新的结构,包含了事务id、已更改数据、指向后续更新的指针,之后的更新会将自己添加到前一个结构的末尾,随着时间的推移创建一个不同版本值的链式结构,N次更新组成长度为N的linkedlist。

当进行插入时,生成一个skip linkedlist用于保存插入的信息,N次插入生成长度为N的保存了各skip linkedlist头信息的linkedlist—WT_INSERT_HEAD。

Copy on write

WiredTiger引擎采用Copy on write的方式管理修改操作(insert、update、delete),修改操作会先缓存在cache里,持久化时,修改操作不会在原来的leaf page上进行,而是写入新分配的page,每次checkpoint都会产生一个新的Root Page。

1566804600903

与MMAPv1引擎类似,当一个文档被删除时,WiredTiger不会立即归还该空间,会在后续的删除、更新、插入操作中优先复用该空间,可能会存在碎片,但影响不大,如果要整理碎片,可以调用compact命令。

为什么WiredTiger引擎使用B树而不是B+树

img

img

B树与B+树的简要图如上,二者最大的区别就是B树所有节点都用来存储key+data,而B+树只有叶子节点存储key+data,根节点与中间节点保存的是key的副本,相应的页存储的是key+指针,所有 data 存储在叶节点导致查询时间复杂度固定为 log n。而B-树查询时间复杂度不固定,与 key 在树中的位置有关,最好为O(1)。

无论是MongoDB选择B树,还是Mysql的InnoDB、MyIsam引擎选择B+树,目的都是尽可能减少磁盘IO。

MongoDB是一种聚合型数据库,它组织数据的特点就是将经常访问的数据放在一块(同一个JSON下包含所有信息),对于单个查询能够在与数据库的一次交互中将所有数据全部取出来,对于上图中key为37的数据,无论是B树还是B+树,都是3次IO,而对于key为50的数据,使用B树只需要1次IO,使用B+树需要3次IO。

而Mysql是关系型数据库,使用B+树提高根节点和内部节点存放的信息量(由于内节点无 data 域,每个节点能索引的范围更大更精确),从而减少查询次数,达到减小磁盘IO的目的。最重要的是,B+树由于数据全部存储在叶子节点,并且通过指针串在一起,这样就很容易的进行区间遍历甚至全部遍历,然而,MongoDB很少有区间访问的需求,也就没有这种磁盘预读机制的需求。

WiredTiger引擎锁粒度

WiredTiger的锁粒度为文档,对应Mysql的行级锁。

WiredTiger引擎4.0——事务

mongoDB对一个文档的写操作,会产生三个动作:

  • 对存储数据的Btree执行写操作
  • 对存储索引的Btree执行写操作
  • 对oplog(option log,与预写日志不是一回事,一个是已发生的,一个是将要发生的)执行写操作

MongoDB的单文档事务指:上述三个动作的更新是原子的,处于同一个事务中。不存在索引段中的某个RecordId,在数据段中找不到,也不存在一条记录的更改被应用,但是没有记录到oplog中, 反之亦然。

MongoDB 4.0提供了事务API,开始支持事务操作。它的事务是基于快照SANPSHOT、MVCC(Multi-Version Concurrency Control,多版本并发控制)实现的。

sanpshot

snapshot即快照。事务开始时,对整个WiredTiger内部正在执行或将要执行的所有事务进行一次截屏,保存当时整个引擎所有事务的状态。

1566873597953

snapshot_object保存了快照信息。

  • snap_min 最小执行事务
  • snap_max 最大执行事务
  • snap_array 位于snap_min与snap_max之间正在执行的事务,是不可见的。
1
2
3
4
5
snapshot_object = {
snap_min=T1,
snap_max=T5,
snap_array={T1, T4, T5},
};

凡是出现在snap_array中或事务ID>snap_max的事务均是不可见的。即便建立snapshot之后T1、T4、T5提交了,T6也无法访问的T1、T4、T5的修改。

MVCC

MVCC基于事务ID和记录值实现一个链表,新的事务与相应的修改value,插入链表头部,链表中的节点定义抽象如下:

1
2
3
4
5
wt_mvcc{
transaction_id: 本次修改事务的ID
value: 本次修改后的值
next;
}

读取值时从链表头开始,根据snapshot来判断是否可读,如果不可读,向链表尾方向移动,直到找到第一个能够读的数据版本,下图中,读事务T5与读事务T3均会读到V1。

1566880968502

事务隔离

传统的数据库事务隔离分为:Read-Uncommited(未提交读)、Read-Commited(提交读)、Repeatable-Read(可重复读)和Serializable(串行化),WiredTigerT引擎并没有按照传统的事务隔离实现这四个等级,而是基于snapshot的特点实现了下列事务隔离方式:

  • Read-Uncommited
  • Read-Commited
  • snapshot-Isolation(快照隔离)

Read-Uncommited

又称为脏读,是最低的隔离级别,总是读取到系统中最新的修改(包括未提交)。WiredTiger的实现方式很简单,将snap_array置为空即可。在上图中,隔离级别设置为脏读后,事务T5读取到的值为V4。一般数据库不会设置成这种隔离方式,它违背了事务的ACID的特性。

Read-Commited

又称为幻读,总是读取到最新的、已提交的修改。这种隔离级别可能在一个长事务多次读取一个值的时候前后读到的值可能不一样。

假设上图中的T5包含了两次读操作,中间sleep了2s,在这2s内T4提交了,则事务T5中,第一次读到的值为V1,第二次读到的值为V4。

snapshot-Isolation(快照隔离)

只在事务开始时生成一次快照,无论事务持续的过程中其他事务修改了几次值,该快照都不改变,所以值在整个事务执行过程中只有一个版本。

事务T4的修改对T5不可见,如果T5也是一个写事务,在T5开始时,T4未提交,T5执行过程中,T4提交了,T5再去修改值,会产生失败回滚。这样做的目的是防止忽略不可见数据的修改。

与Mysql事务的区别

通过上面对三种事务隔离方式的分析,WiredTiger并没有使用传统的事务独占锁和共享访问锁来保证事务隔离,而是通过对系统中写事务的snapshot截屏来实现。这样做的目的是在保证事务隔离的情况下又能提高系统事务并发的能力。

MongoDB系列(一)MongoDB简介

发表于 2019-08-15 | 分类于 数据库

MongoDB是一款强大、灵活、易于扩展的、由C++编写的聚合型、文档型、NoSQL数据库,支持二级索引、范围查询、排序、聚合、地理空间索引等功能。

MongoDB是面向文档的数据库,不是关系型数据库,不采用关系模型是为了更好的扩展性。与关系型数据库相比,有以下特点:

  • 没有“一行数据”的概念,取而代之的是“一条文档”
  • 不再有预定义模式,文档的键和值不再是固定的类型和大小,可以随意扩展。
  • 易于扩展,随着数据的增长,横向扩展(增加机器)变得更加方便,MongoDB能够自动处理跨集群的数据和负载,自动重新分配文档、自动将用户请求路由至正确的机器上。
  • 功能丰富,支持索引、聚合、文件存储、以及特殊集合,如会过期的数据:日志、会话等。
  • 性能更强,4.0版本前不支持事务,但提供了诸多原子操作:文档的保存、修改、删除。

img

文档与集合

文档是MongoDB中数据的基本单元,类似于关系型数据库管理系统中的行,但灵活性更高。例如:

{"name":"xiaoming","age":20}

集合是一组文档,可以看做是一个拥有动态模式的表。因此,MongoDB中的文档没有预定义模,一个集合中可以保存多个完全不同的文档,例如:

1
2
3
1  {"name":"xiaoming", "age":20}
2 {"name":"xiaohei", "age":20, "man":true, "address":"sz"}
3 {"device":"rcu","devId":"1234567890"}

备注:集合命名时,不能有空字符串、不能包含\0、不能包含$、不能以system.开头

数据库

多个集合组成一个数据库、一个MongoDB实例包含多个数据库,每个数据库拥有独立的权限,在磁盘上,不同的数据库放置在不同的文件夹中,一般的,一个应用程序使用一个数据库。

有三个数据库是默认保留的:admin(root库)、local(用于副本集模式)、config(用于分片模式)

数据类型

MongoDB的文档与JavaScript中对象相近,类似于JSON,MongoDB支持JSON包含的6种数据类型:null、布尔型、数值、字符串、数组、对象,MongoDB还添加了其他一些数据类型:日期、正则表达式、数组、内嵌文档、对象id、二进制数据、JavaScript代码。

下面只介绍:日期、数组、内嵌文档、对象id。

日期

对应JAVA、JS中的Date类,存入数据库的是Date对象,不是日期字符串。数据库中存储的日期为新纪元以来的毫秒数,并未存储时区,Date对象与毫秒之间的转换,由MongoDB负责,客户端不需要关心。

数组

与JAVA数组不同的是,数组可以包含不同类型的元素,数组可以嵌套数组。

1
{"things":["apple", 3.1415926]}

内嵌文档

文档可以作为键的值,这样的文档就是内嵌文档。类似于JSON嵌套。

1
2
3
4
5
6
{
"things":{
"name":"apple"
"price":3.14
}
}

objectId

MongoDB存储的文档必需有一个”_id“键,这个键可以是任意类型,默认是个ObjectId对象。objectId作为文档的唯一标识符。如果插入文档时,没有设置objectId,MongoDB客户端驱动会自动创建一个。

object_id大小为12字节,是一个由24个十六进制数字组成的字符串。

1566887583799

  • 时间戳:占用前4个字节,从标准纪元开始的时间戳,单位为秒,

  • 机器标识位:占用3个字节,是所在主机名的唯一标识,通常是机器名的hash值。

  • PID:产生 objectId的进程的ID

  • 计数器: 前9个字节保证了同一秒钟不同机器不同进程的ObjectId是唯一的,最后三个字节用来确保同一秒同机器同进程的objectId的唯一性。所以每个进程同一秒最多允许产生
    $$
    (2^8)^3=16777216
    $$
    个不同的objectId。对于服务器来说,1600W个足够用了。

从objectId的设计可以看出,MongoDB的设计初衷就是用作分布式数据库,能够在副本集、分片环境下生成全局唯一id。

springboot+RabbitMQ系列(四)MessageConvert

发表于 2019-08-13 | 分类于 springboot

可以为ListenerContainer和RabbitTemplate设置MessageConverter。这样就不用每次都写重复的消息格式转换代码了。spring提供的Message Converter均是双向的,负责将入站消息转换为特定结构(如:字节数组、序列化java对象、字符串、自定义的消息domain对象),将特定格式转换为出站消息。

消息格式

springboot-amqp涉及到两种消息格式,定义如下:

  1. org.springframework.messaging.Message<?> message,spring框架中通用的Message。简称spring-messaging Message。

    1
    2
    3
    4
    public interface Message<T> {    
    T getPayload();
    MessageHeaders getHeaders();
    }
  2. spring AMQP Message,spring为了适配AMQP协议,简化接口参数引入的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
    return this.body;
    }

    public MessageProperties getMessageProperties() {
    return this.messageProperties;
    }
    }

spring-messaging Message中的payload对应了spring AMQP Message中的byte[] body,他们均是Rabbit Client 中的body,即消息内容。

spring-messaging Message中的MessageHeaders对应了spring AMQP Message中的MessageProperties,他们均是Rabbit Client 中的BasicProperties,即消息头。

因此,后文的MessgeConverter如不加特殊说明,均指的消息内容的格式转换。消息头的格式转换见2.3.7 MessagePropertiesConverter

@RabbitListener底层实现原理

在了解MessageConveter之前,有必要清楚spring底层消息处理机制,此处以最常用的@RabbitListener为例。

通过注解@RabbitListener声明一个消费者时,底层由MessagingMessageListenerAdapter的onMessage()负责处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 public void onMessage(Message amqpMessage, Channel channel) throws Exception { 
2
3 org.springframework.messaging.Message<?> message =
4 this.toMessagingMessage(amqpMessage);
5
6 if (this.logger.isDebugEnabled()) {
7 this.logger.debug("Processing [" + message + "]");
8 }
9
10 try {
11 Object result = this.invokeHandler(amqpMessage, channel, message);
12 if (result != null) {
13 this.handleResult(result, amqpMessage, channel, message);
14 } else {
15 this.logger.trace("No result object given - no result to handle");
16 }
17 } catch (ListenerExecutionFailedException var7) {
18 ListenerExecutionFailedException e = var7;
19 if (this.errorHandler != null) {
20 try {
21 Object result = this.errorHandler.handleError(amqpMessage, message, e); 22
23 if (result != null) {
24 this.handleResult(result, amqpMessage, channel, message); 25
26 } else {
27 this.logger.trace("Error handler returned no result"); 28
29 }
30 } catch (Exception var6) {
31 this.returnOrThrow(amqpMessage, channel, message, var6, var6); 32
33 }
34 } else {
35 this.returnOrThrow(amqpMessage, channel, message, var7.getCause(), var7); 36
37 }
38 }
39 }

3~4行,通过toMessagingMessage()将spring AMQP的Message转换为spring-messaging的Message。this.getMessagingMessageConverter是一个内部类的实例,内部类继承了MessagingMessageConverter,最终调用的是MessagingMessageConverter的fromMessage。完成Spring AMQP Message至spring-messaging Message的转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected org.springframework.messaging.Message<?> toMessagingMessage(Message amqpMessage) {    
return (org.springframework.messaging.Message)this
.getMessagingMessageConverter()
.fromMessage(amqpMessage);
}

// fromMessage
public Object fromMessage(Message message) throws MessageConversionException {
if (message == null) {
return null;
} else {
Map<String, Object> mappedHeaders =
this.headerMapper.toHeaders(message.getMessageProperties());

Object convertedObject = this.extractPayload(message);

MessageBuilder<Object> builder = convertedObject instanceof
org.springframework.messaging.Message ?
MessageBuilder.fromMessage((org.springframework.messaging.Message)convertedObject)
: MessageBuilder.withPayload(convertedObject);

return builder.copyHeadersIfAbsent(mappedHeaders).build();
}
}

11行通过反射处理消息,调用的是HandlerAdapter中的invoke(),为了简便,把这些调用全部放在了同一个代码块中,一般地,使用@RabbitListener时不会自定义invokHandler,所以调用的是代理的反射方法:delegatingHandler.invoke()

再继续关注下getMethodArgumentValues,包含了两部分,一部分是预设参数转换,如:Message、Channel,这个也是最开始传入Spring AMQP的Message的原因,它的作用就是作为预设参数,另一部分是Listener中消息处理的其他自定义参数,如@Payload注解、@Headers注解等声明的参数,args[i] == null时,抛出MethodArgumentResolutionException异常,这就是1.3中异常抛出的地方,参数为空。该异常会一直向上抛,直至17行被捕获,如果在Listener容器中注册了errorHandler,调用errorHandler处理异常。

还有一点值得注意的是:在整个过程中,真正作为消息载体的就是spring-messaging.Message而不是Spring AMQP的`Message。因此,消息处理的过程实际如下:

调用RabbitMQ JAVA API接收消息并封装为Spring AMQP Message,在消息处理onMessage中调用toMessagingMessage(Message amqpMessage)将消息转换至spring-messaging.Message,通过反射处理消息。

因此,消息转换实际上包含了两个过程,一个是消息的反序列化并封装为Spring AMQP Message,另一个是Spring AMQP Message与spring-messaging.Message之间的转换。后文中所指的消息转换如果不加特别说明,均指第一个转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private Object invokeHandler(Message amqpMessage, Channel channel, org.springframework.messaging.Message<?> message) {    
try {
return this.handlerMethod.invoke(message, new Object[]{amqpMessage, channel});
} catch (MessagingException var5) {
throw new
ListenerExecutionFailedException(this.createMessagingErrorMessage("Listener method could not be invoked with the incoming message", message.getPayload()),
var5, amqpMessage);
}
catch (Exception var6) {
throw new ListenerExecutionFailedException("Listener method '" + this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", var6, amqpMessage);
}
}

//HandlerAdapter中的invoke
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs);
} else if (this.delegatingHandler.hasDefaultHandler()) {
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
} else {
return this.delegatingHandler.invoke(message, providedArgs);
}
}

// delegatingHandler.invoke()
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Class<? extends Object> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = this.getHandlerForPayload(payloadClass);
Object result = handler.invoke(message, providedArgs);
if (message.getHeaders().get("amqp_replyTo") == null) {
Expression replyTo = (Expression)this.handlerSendTo.get(handler);
if (replyTo != null) {
result = new ResultHolder(result, replyTo);
}
}

return result;
}

// 代理完成的invoke
@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = this.getMethodArgumentValues(message, providedArgs);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Invoking '" + ClassUtils.getQualifiedMethodName(this.getMethod(), this.getBeanType())
+ "' with arguments " + Arrays.toString(args));
}

Object returnValue = this.doInvoke(args);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Method [" + ClassUtils.getQualifiedMethodName(this.getMethod(),this.getBeanType())
+ "] returned [" + returnValue + "]");
}

return returnValue;
}

// 关注下getMethodArgumentValues
private Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
MethodParameter[] parameters = this.getMethodParameters();
Object[] args = new Object[parameters.length];

for(int i = 0; i < parameters.length; ++i) {
MethodParameter parameter = parameters[i];
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
args[i] = this.resolveProvidedArgument(parameter, providedArgs);//预设参数的转换
if (args[i] == null) {
if (this.argumentResolvers.supportsParameter(parameter)) {
try {
args[i] = this.argumentResolvers
.resolveArgument(parameter, message);// 自定义的参数的转换
} catch (Exception var8) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(this.getArgumentResolutionErrorMessage(
"Failed to resolve", i), var8);
}

throw var8;
}
} else if (args[i] == null) {
throw new MethodArgumentResolutionException(
message, parameter, this.getArgumentResolutionErrorMessage(
"No suitable resolver for", i));
}
}
}
return args;
}

已有的MessageConverter

前面提到,消息格式转换有两次,第一次转换完成序列化与反序列化的工作,被称为Message Converter,spring AMQP提供了默认的转换器SimpleMessageConverter。以反序列化为例,将Spring AMQP Message转换为字符串、序列化对象、字节数组,这次转换也是文中所指的Message Convert。反序列化源码如下:

第二次转换,springboot默认使用的是GenericMessageConverter。它是属于org.springframework.messaging.包下的,继承了该包下的SimpleMessageConverter(第一次转换的SimpleMessageConverter在org.springframework.amqp包下),默认情况下,不需要特别的设置。

spring-messaging的MessageConverter是所有消息转换器(无论是spring-messaging还是spring AMQP`)最底层的接口。

1
2
3
4
5
6
7
8
9
10
11
package org.springframework.messaging.converter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public interface MessageConverter {
@Nullable
Object fromMessage(Message<?> var1, Class<?> var2);

@Nullable Message<?> toMessage(Object var1, @Nullable MessageHeaders var2);

}

SimpleMessageConverter

spring AMQP的SimpleMessageConverter实现了MessageConverter接口(最底层),是默认的消息转换器。在未给RabbitTemplate配置message conveter时,将会调用SimpleMessageConverter的fromMessage和createMessage处理消息,从源码可以看出,支持三种类型:字符串、序列化java对象,字节数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package org.springframework.amqp.support.converter;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.remoting.rmi.CodebaseAwareObjectInputStream;
import org.springframework.util.ClassUtils;

public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = "UTF-8";
private String codebaseUrl;
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
public SimpleMessageConverter() { }

public void setBeanClassLoader(ClassLoader beanClassLoader) {
this.beanClassLoader = beanClassLoader;
}

public void setCodebaseUrl(String codebaseUrl) {
this.codebaseUrl = codebaseUrl;
}

public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = defaultCharset != null ? defaultCharset : "UTF-8";
}

public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();

if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}

try {
content = new String(message.getBody(), encoding);
} catch (UnsupportedEncodingException var8) {
throw new MessageConversionException("failed to convert text-based Message content", var8);
}
} else if (contentType != null
&& contentType.equals("application/x-java-serialized-object")) {
try {
content = SerializationUtils
.deserialize(this.createObjectInputStream(
new ByteArrayInputStream(message.getBody()),this.codebaseUrl)); } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
throw new MessageConversionException("failed to convert serialized Message content", var7);
}
}
}

if (content == null) {
content = message.getBody();
}

return content;

}


protected Message createMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}

if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " +
object.getClass().getName());
}
}

protected ObjectInputStream createObjectInputStream(InputStream is, String
codebaseUrl) throws IOException {
return new CodebaseAwareObjectInputStream(is, this.beanClassLoader, codebaseUrl)
{
protected Class<?> resolveClass(ObjectStreamClass classDesc) throws
IOException, ClassNotFoundException {
Class<?> clazz = super.resolveClass(classDesc);
SimpleMessageConverter.this.checkWhiteList(clazz);
return clazz;
}
};
}


}

SerializerMessageConverter

与SimpleMessageConverter类似,唯一不同的是,多了一个属性用来自定义序列化与反序列化规则。

Jackson2JsonMessageConverter

消息载体是网络字节序时,使用默认的SimpleMessageConverter就足够了,但是消息载体为java序列化对象application/x-java-serialized-object时,不利于跨语言和跨平台,更推荐使用JSON作为消息的载体,Jackson2JsonMessageConverter负责JSON和java bean之间转换。使用时将jsonConverter注入rabbitTemplate实例中,替换SimpleMessageConverter。在替换后,收发消息可以直接发送消息Object的实例,大大得简化了开发。

注意事项:

使用时,需要生产者额外在消息头中添加一个字段”__ TypeId __ “用于注明该消息映射的domain对象,在下方的示例中,头信息中的字段"__ TypeId __"分别"foo"和”bar“如果生产者未注明,可以为classMapper设置默认值映射domain对象,例如:classMapper.setDefaultType(MyMessage.class)。

需要生产者在消息头注明contentType为application/json或text/x-json 或者生产者也使用 Jackson2JsonMessageConverter,它会自动在消息头中声明contentType。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("foo", Foo.class);
idClassMapping.put("bar", Bar.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

ContentTypeDelegatingMessageConverter

顾名思义,ContentTypeDelegatingMessageConverter是一个根据消息头中content-Type动态选择MessageConverter的Message Converter。当content-Type为空或根据content-Type匹配不到MessageConverter时,将Message Convert的任务委托给SimpleMessageConverter

1
2
3
4
5
6
7
8
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>

MarshallingMessageConverter

负责Spring的Object与XML之间的转换。

为默认的Message Converter设置反序列化权限

在处理 content-type 为application/x-java-serialized-object的java序列化对象时,默认会扫描所有的packages/classes,为了提高安全性,可以设置白名单,所有的Message Converter都有一个属性whiteListPatterns,示例如下:

1
2
3
4
5
6
7
8
SimpleMessageConverter messageConverter = new SimpleMessageConverter();List<String> 

myWhiteList = new ArrayList<>(10);
myWhiteList.add("safe.*");
myWhiteList.add("unstable.recent.SafeClass");
myWhiteList.add("*.MySafeClass");

messageConverter.setWhiteListPatterns(myWhiteList);

注意:该属性仅在Message Converter使用DefaultDeserializer有效,即不要主动去配置DefaultDeserializer。

特殊的Conerter——MessagePropertiesConverter

前面介绍的MessageConverter负责body的转换,MessagePropertiesConverter 负责Rabbit Client的BasicProperties与Spring AMQP MessageProperties之间的转换,它的默认实现是DefaultMessagePropertiesConverter,足以满足绝多数场景下的需求。部分源码如下,仅截取了构造器和属性声明,当BasicProperties中的某一元素长度小于等于longStringLimit时,转化为MessageProperties中的String属性,当BasicProperties中的某一元素长度超过longStringLimit时,根据convertLongLongStrings判断是否需要转换为LongString,如果不需要则转换为DataInputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter {    
private static final int DEFAULT_LONG_STRING_LIMIT = 1024;
private final int longStringLimit;
private final boolean convertLongLongStrings;

public DefaultMessagePropertiesConverter() {
this(1024, false);
}

public DefaultMessagePropertiesConverter(int longStringLimit) {
this(longStringLimit, false);
}

public DefaultMessagePropertiesConverter(int longStringLimit,
boolean convertLongLongStrings) {
this.longStringLimit = longStringLimit;
this.convertLongLongStrings = convertLongLongStrings;
}

private Object convertLongString(LongString longString, String charset) {
try {
if (longString.length() <= (long)this.longStringLimit) {
return new String(longString.getBytes(), charset);
} else {
return this.convertLongLongStrings ? longString.getStream() : longString;
}
} catch (Exception var4) {
throw RabbitExceptionTranslator.convertRabbitAccessException(var4);
}
}

}

springboot+RabbitMQ系列(三)消费者

发表于 2019-08-08 | 分类于 springboot

消息接收有两种模式,简单的模式是消费者不断地去轮询,轮询到一条就消费一条,复杂些的模式是注册一个异步Listener,由容器负责接收消息并选择对应的Listener处理消息。

轮询模式消费者

在springboot-amqp中,可使用AmqpTemplate(rabbitTemplate)实现轮询模式接收消息,默认是阻塞的,有消息时就拉去消息,没有消息时,立刻返回null。

从springboot1.5版本以后,可以设置单次轮8询的超时时间,即消费者接收消息的阻塞时间,超时时间设置为负值,意味着无限期阻塞。并发量高时,建议使用异步Listener或将超时时间设为0.

如果需要转换消息格式,需要预先为AmpqTemplate设置MessageConverter,然后调用receiveAndConvert()方法接收消息。

如果需要Replay,可以在调用amqpTemplate.receiveAndReply()接口时传入ReceiveAndReplyCallBack。

异步消费者

异步消费者中有一个预取消息(prefetch)的概念,即一个消费者预取一定数目的消息,这可能会导致多消费者情况下其他消费者利用率不足。springboot 2.0之前,预取消息默认值是1,spring boot 2.0以后,默认值为250, 预取值的设置取决于你的业务,要尽可能保证所有消费者的高效运行从而提升吞吐量。比如:

  1. 当单条消息体很大,消息处理的又比较慢时,预取值如果设置的过大,将导致客户端内存占用率飙升。
  2. 如果严格的要求执行顺序时,建议预取值设置为1
  3. 在消息吞吐量不高、消费者又多时,预取值设置的过大会导致消费者利用率不足。
  4. 在手动确认的模式下,预取值应该设置为1,如果prefech不为1,basicAck是异步的操作,如果出现异常时,消费者会继续处理其他预取消息,但是不会ack(批量成功时才会ack),因此,其他的消息处于unack的状态,其他的消费者会重新获取该消息,消息会出现重复消费的情况。

Message Consuming callback

异步消费是通过回调实现的,消息的消费逻辑在回调方法onMessage()中实现,springboot AMQP提供了两个回调接口,MessageListener、ChannelAwareMessageListener,究竟用哪个取决于你是否需要获取channel信息,比如手动ack时必须要有channel才可以。

MessageListenerAdapter

接口回调已经可以实现消息的消费了,这还不够灵活,如果业务需要动态的指定queue或tag对应哪个methodName时,可以继承MessageListenerAdapter。它包含了下面的构造器,使用该构造器时,与前面提到的接口回调本质上是一样的。

1
2
3
4
5
public MessageListenerAdapter(Object delegate) {    
this.queueOrTagToMethodName = new HashMap();
this.defaultListenerMethod = "handleMessage";
this.doSetDelegate(delegate);
}

delegate(代理)即自定义的消费者bean,它必须是ChannelAwareMessageListener或MessageListener的实例,否则,它不会生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
Object delegate = this.getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) { ((ChannelAwareMessageListener)delegate).onMessage(message, channel); return;
}
if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
}
}
if (delegate instanceof MessageListener) { ((MessageListener)delegate).onMessage(message);
return;
}
}

动态的指定消息处理method由下面两个接口实现。

1
2
3
4
5
6
7
public void setDefaultListenerMethod(String defaultListenerMethod) {
this.defaultListenerMethod = defaultListenerMethod;
}

public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName) {
this.queueOrTagToMethodName.putAll(queueOrTagToMethodName);
}

它们在getListenerMethodName中被调用,如果你没有定义queueOrTagToMethodName,那么将会调用你设置的defaultListenerMethod,如果你都没有设置,那么默认值是“handleMessage”。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {    
if (this.queueOrTagToMethodName.size() > 0) {
MessageProperties props = originalMessage.getMessageProperties();
String methodName = (String)this.queueOrTagToMethodName.get(props.getConsumerQueue());
if (methodName == null) {
methodName = (String)this.queueOrTagToMethodName.get(props.getConsumerTag()); }
if (methodName != null) {
return methodName;
}
}

return this.getDefaultListenerMethod();
}

容器完成了回调

前面介绍了如何实现异步消费的回调接口,而真正完成回调的是容器(container),容器是有生命周期的,如启动、运行、停止,容器本质上就是桥接AMQP queue和MessageListener的实例。因此,如果想实现消费者的功能,必须为容器配置connectionFactory、队列、MessageLisener, 即告知容器:如何连接MQ服务器、成功连接后哪个队列的消息应该交给哪个消费者。

在springboot 2.0之前,只有一种容器:SimpleMessageListenerContainer。在springboot2.0以后,新增了一种容器:DirectMessageListenerContainer。二者的区别就是消费者的线程与RabbitMQ客户端线程是否共用,对于SimpleMessageListenerContainer,每一个消费者配置一个线程,如果为容器配置了多个队列,可能会使用同一个线程处理多个队列(消费者数量<队列数)。并发性能取决于你设置的消费者的数conCurrentConsumer,它等于消费者线程数。

一个消息的后半生是这样的:spring-amqp负责将消息从MQ服务器传递给消费者,springboot-amqp提供了默认的容器,用来从MQ服务器接收消息,我们提供的消费者完成onMessage()的消息处理逻辑,并将它注册到容器上。

当消息从RabbitMQ客户端传递过来时,客户端线程通过队列将消息传递给消费线程(消息处理线程)。这是由于早期MQ客户端不支持并发传递消息,一个队列只会有一个线程传递消息,更不可能让它完成消息的处理,这个机制的设置当然是低效的,会增加线程之间切换的开销。

在新版本后,MQ客户端已经支持并发了,完全可以使用MQ客户端的线程完成消息的接收、处理工作,在DirectMessageListenerContainer中,不再区分客户端线程和消费线程,并发的控制由参数consumersPerQueue控制,不再使用conCurrentConsumer、maxConCurrentConsumer、txSize(事务大小,一次事务中传递txSize条消息,用来减少ack的次数,这个参数>1且消息消费出现异常时,会导致同一个事务中后续的消息重复消费)。DirectMessageListenerContainer中提供了messagesPerAck,但是它不是事务,每一条消息都有一个独立的事务用来传递和确认,出现异常时,后续的消息会一直处于unack的状态,所以,不会重复消费。

自定义容器

前面介绍了两种容器,而且springboot-amqp会提供默认地容器,如果想个性化的设置或有需要设置多种容器时,就需要考虑自定义容器了,在实际项目中,建议使用自定义容器。

现在以SimpleMessageListenerContainer为例介绍下容器的使用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class ExampleAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}

还有一种方式是使用@RabbitListener注解+Java Config。

1
2
@RabbitListener(queues = "example.queue", containerFactory = "exampleContainer")
public void onMessage(Message message, Channel channel) throws Exception {}

容器工厂

容器工厂专门用来配合@RabbitListener使用,前面提到,springboot为@RabbitListener提供了默认的容器,但为了个性化的设置,建议自定义容器工厂,然后在@RabbitListener中设置”containerFactory“属性。容器和容器工厂的对应如下:

  1. SimpleMessageListenerContainer 对应 SimpleRabbitListenerContainerFactory
  2. DirectMessageListenerContainer 对应 DirectRabbitListenerContainerFactory
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class AppConfig {
@Bean(name="myContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}
1
2
@RabbitListener(queues = ”myqueue“, containerFactory = "myContainerFactory")
public void onMessage(Message message, Channel channel) throws Exception {}

容器属性

前文涉及的conCurrentConsumer、connectionFactory都是容器的属性,还有其他常用的属性:

Springboot2.0.3.RELEASE容器属性介绍

channelTransacted

是否在事务中ack(确认)所有消息。为true时表示需要确认事务中的所有消息。

transactionManager

提供给Listener的外部事务管理器,是channelTransacted的补充,如果channel是事务的,它的事务会与外部事务进行同步。

acknowledgeMode

  • NONE 配合channelTransacted=false使用,不发送ACK,MQ服务器认为所有的消息都会被确认,所以 在RabbitMQ中称为自动ACK,但在springboot中,称为NONE ACK,视角不同导致的称谓不同。
  • MANUAL 消费者必须手动确认所有的消息,包括异常情况
  • AUTO 容器自动确认消息,除非Listener抛出容器无法自动处理的异常。该模式也是channelTransacted为true时的默认模式。追求并发时可以配合使用。

prefetchCount

每个消费者能够持有的未ack的消息数,该值越大消息传输给消费者的速度越快。该参数越大,消息的顺序处理性越差。值得注意的是:在AcknowledgeMode.NONE模式下,该参数的设置是无效的,这是由于该模式下根本不存在ack。

txSize

适用于SimpleMessageListenerContainer,该参数仅在AcknowledgeMode.AUTO模式下生效,容器在发送一次ack之前批量处理txSize条消息,这一批消息处于同一个事务中,会一直等待它们到超时时间,如果prefetchCount小于txSize,会自动将prefetchCount设置的与txSize相等。只能用于channelTransacted为true的场景下。

messagePerAck

适用于DirectMessageListenerContainer,容器在两次ack之间处理的消息数目,目的是减少向MQ服务器发送ack的次数,代价就是在出现异常时,增大重传消息的可能性,往往用在高并发场景下。在出现异常时(比如拒绝了一批消息中的某一条),则其他消息不管有没有消费完都会被ack,异常的消息被拒绝。所以它不能用在channelTransacted为true的场景。

errorHandler

自定义未捕获的异常的处理机制,默认使用ConditionalRejectingErrorHandler

@RabbitListener

异步接收消息最简单的方式是通过注解实现,前面的@RabbitListener即是,底层是MessagingMessageListenerAdapter实现的,使用注解,不需要指定methodName,因为@RabbitListener已经注明了该方法用来接收消息。在一个类中,可以定义多个不同的Listener,如下:

processOrder中,使用@QueueBinding声明了队列、routingkey、exchange以及他们的绑定关系

processInvoice中,绑定、声明了一个匿名队列,也可以声明多个@QueueBinding

1
2
3
4
5
6
7
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "myqueue",         
durable = "true",
autoDelete = "false"),
exchange = @Exchange(value = "ss")),
@QueueBinding(value = @Queue(value = "myqueue2"),
exchange = @Exchange(value = "ss1"))})
public void rcv(Message message, Channel channel) {}

handleWithSimpleDeclare中,没有声明exchange,routingkey,使用默认地exchange,routingkey与队列名称相同,是direct模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class MyService {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}

@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}

}

在多个方法使用同种@RabbitListener时,可以自定义元注解(常用于广播fanout模式),比如下面的例子,就使用了一个自动删除(默认地,如果不想自动删除,需要设置auto-delete为false)、匿名、广播模式的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}



// 使用元注解
public class MetaListener {

@MyAnonFanoutListener
public void handle1(String foo) {
...
}

@MyAnonFanoutListener
public void handle2(String foo) {
...
}

}

@RabbitListener异常处理

Springboot2.0版之后,@RabbitListener注解新增了errorHandler和returnException属性,默认是无配置的。自定义errorHandler,需要实现RabbitListenerErrorHander接口, 并将其配置在@RabbitListener上。第二个参数messaging.Message是Message Converter产生的,ListenerExecutionFailedException是Listener抛出的。可以在自定义的handleError中处理异常,或抛出其他异常至容器,默认地,如果没有自定义errorHandler,异常将会抛至容器中。由容器的errorHandler处理,见后文。

1
2
3
4
5
6
@FunctionalInterface
public interface RabbitListenerErrorHandler {

Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception) throws Exception;

}

returnException属性为“true”时,表示异常需要通知到生产者,一般地,不需要设置该属性,毕竟引入MQ的目的是解耦。

容器的errorHandler

前面提到,没有自定义errorHandler时,异常会抛至容器默认异常处理器ConditionalRejectingErrorHandler,包含两个构造器,一个无参,一个含参,通过含参构造器可以自定义异常处理策略。若使用的无参构造器,则默认使用内部类DefaultExceptionStrategy中定义的异常处理策略。

ConditionalRejectingErrorHandler源码如下,11行是处理异常的接口,在13行中,异常如果不是AmqpRejectAndDontRequeueException而且是致命异常时,会抛出AmqpRejectAndDontRequeueException,致命异常的判断在37行,首先判断异常产生的原因:

MessagingException位于spring-messiging包下的,是异常MessageConversionException、MethodArgumentResolutionException、MessageDeliveryException、MessageHandlingException、DestinationResolutionException、MethodArgumentNotValidException、MethodArgumentTypeMismatchException、MissingSessionUserException的父类。

ListenerExecutionFailedException是所有异常被抛出时的最上层栈信息,所有异常都以该形式抛出。

39~42行是一个for循环,遍历异常产生的栈信息一层层解析异常产生的原因,一旦有MessageConversionException、MethodArgumentResolutionException就跳出循环,处理该异常,如果遍历完整个循环都没有出现,则处理最后一个异常(42行cause = cause.getCause())。因此,如果有多个异常均需要处理时,建议重写isFatal方法。

59-67行判断cause是否致命,有六种:

  • Spring AMQP的MessageConversionException异常
  • spring-messaging的MessageConversionException异常
  • spring-messaging的MethodArgumentResolutionException异常
  • NoSuchMethodException异常
  • ClassCastException异常
  • 自定义的异常isUserCauseFatal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.springframework.amqp.rabbit.listener;

// omit the package import for brevity

1 public class ConditionalRejectingErrorHandler implements ErrorHandler {
2 protected final Log logger = LogFactory.getLog(this.getClass());
3 private final FatalExceptionStrategy exceptionStrategy;
4 public ConditionalRejectingErrorHandler() {
5 this.exceptionStrategy = new
6 ConditionalRejectingErrorHandler.DefaultExceptionStrategy();
7 }
8 public ConditionalRejectingErrorHandler(FatalExceptionStrategy exceptionStrategy) { 9 this.exceptionStrategy = exceptionStrategy; 10 }
11 public void handleError(Throwable t) {
12 this.log(t);
13 if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) { 14 throw new AmqpRejectAndDontRequeueException("Error Handler converted 15 exception to fatal", t);
16 }
17 }
18 protected void log(Throwable t) {
19 if (this.logger.isWarnEnabled()) {
20 this.logger.warn("Execution of Rabbit message listener failed.", t);
21 }
22 }
23 protected boolean causeChainContainsARADRE(Throwable t) {
24 for(Throwable cause = t.getCause(); cause != null; cause = cause.getCause()) { 25 if (cause instanceof AmqpRejectAndDontRequeueException) {
26 return true;
27 }
28 }
29 return false;
30 }
31
32 public static class DefaultExceptionStrategy implements FatalExceptionStrategy { 33
34 protected final Log logger = LogFactory.getLog(this.getClass());
35 public DefaultExceptionStrategy() {
36 }
37 public boolean isFatal(Throwable t) {
38 Throwable cause;
39 for(cause = t.getCause(); cause instanceof MessagingException
40 && !(cause instanceof MessageConversionException)
41 && !(cause instanceof MethodArgumentResolutionException);
42 cause = cause.getCause()) {
43 ;
44 }
45 if (t instanceof ListenerExecutionFailedException
46 && this.isCauseFatal(cause)) {
47 if (this.logger.isWarnEnabled()) {
48 this.logger.warn("Fatal message conversion error; message rejected; 49 it will be dropped or routed to a dead letter exchange, if so 50 configured: " + ((ListenerExecutionFailedException)t)
51 .getFailedMessage());
52 }
53 return true;
54 } else {
55 return false;
56 }
57 }
58
59 private boolean isCauseFatal(Throwable cause) {
60 return cause instanceof
61 org.springframework.amqp.support.converter.MessageConversionException
62 || cause instanceof MessageConversionException
63 || cause instanceof MethodArgumentResolutionException
64 || cause instanceof NoSuchMethodException
65 || cause instanceof ClassCastException
66 || this.isUserCauseFatal(cause);
67 }
68
69 protected boolean isUserCauseFatal(Throwable cause) {
70 return false;
71 }
72 }
73 }

当然,也可以选择完全自定义异常处理。

1
2
3
4
5
6
listenerContainerFactory.setErrorHandler(new ErrorHandler() {    
@Override
public void handleError(Throwable t) {
// do something handle error
}
});

补充说明

对于消息的异常处理springboot2.0.3还不够完善。比如使用的@RabbitListener、AcknowledgeMent.MANUAL发送一条空的消息,这条消息无法到达自定义的onMessage()方法,提前抛出了异常,若尝试通过RabbitListenerErrorHandler处理异常,并按照下面的方式自定义了一个errorHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Bean(name = "myErrorHandler")
public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message message,
org.springframework.messaging.Message<?> message1,
ListenerExecutionFailedException e) throws Exception { // 如果消息是空的,这条消息不再归队
if (message.getBody().length == 0) {
Channel channel = message1.getHeaders().get(AmqpHeaders.CHANNEL,
Channel.class);
logger.error("rcv error");
Channel channel1 = (Channel)message1.getHeaders().getReplyChannel();
Channel channel2 = (Channel)message1.getHeaders().getReplyChannel();

logger.error("channel1={}", channel);
logger.error("channel2={}", channel2);
if (channel == null) {
logger.error(”channel null,can not send ack...“)
throw e;
}
channel.basicReject(message1.getHeaders().get(AmqpHeaders.DELIVERY_TAG,
Long.class), false);
throw new AmqpRejectAndDontRequeueException("msg format error");
}
else {
throw e;
}
}
};
}

发送10条empty的消息,打印了10次:channel null,can not send ack…

这10条消息一直处于unack的状态,消息并没有被reject,而是一直处于unack的状态,但是该消息是无意义的,不应该再重新入队.

原因:
springboot ListenerContainer负责传递消息给消费者,容器通过反射调用自定义的Listener并处理消息时出现参数错误异常,message转byte异常报错。
由于方法参数反射错误,无法调用到onMessage方法,又采用的手动确认的方式,导致没办法通过channel.basicReject拒绝该消息,所以这条消息会一直处于unack的状态。

解决的方法有四种:
1、升级springboot至2.1.6版本,在Listener容器中注册RabbitListenerErrorHandler,该版本中,可以通过org.springframework.messaging.Message获取channel信息。通过channel拒绝该消息。在之前的版本中,获取到的channel都是null(如代码所示,channel、channel1、channel2均为null,说明spring没有将channel信息封装在org.springframework.messaging.Message),无法给MQ服务器发送ACK。
2、修改onMessage()的参数,使用Message类型作为消息的载体,不再使用byte、string等其他类型,定义MessageConverter或使用默认的MessageConverter实现消息格式转换。
3、使用springboot的Acknowledge.AUTO模式,该模式下Listener容器会自动发送ACK给MQ服务器

4、自定义MessageConveter

最后使用了方法2,springboot升级会带来兼容性的问题,比如数据库驱动、数据库版本等,方法3中交给容器自动确认更适合高并发设置多个消费者同时消费1个队列、添加一些批量拉取消息、批量事务处理的场景,该场景会牺牲消息的有序性、异常时可能会有重复消费的问题。我们现在的业务场景更偏向于保证消息的可靠性,一个队列只会有一个消费者,消费一条拉取一条,消息处理的线程池也是自定义的,这样的方式更灵活稳定。所以最后否定了方法3,选用方案2

12

shipengyang

12 日志
5 分类
2 标签
© 2019 shipengyang
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4