源码分析-Netty: 并发编程的实践(二)
系列文章:
摘要
前面一篇介绍了多线程在Netty中的大概使用情况,本篇将结合源码,详细描述使用方式,以及值得我们思考、学习和借鉴的地方。
一 synchronized使用
关键字synchronized,我们在并发编程的艺术系列文章中有过描述,用于保证在同一时刻,只有一个线程能够执行某个方法或代码块。同步的作用,既有互斥,也有保证共享变量的可见性:当某个线程修改了变量值并释放锁后,其他线程可以立即获取被修改变量的最新值。
以ServerBootstrap进行分析。在类中,定义了一个final变量childOptions,值为LinkedHashMap。在构造方法中,对bootstrap.childOptions进行了同步。这是因为,LinkedHashMap是非线程安全的,所以当存在多线程并发创建ServerBootstrap的实例时,访问和修改这个变量,必须在外部做好同步,否则会导致不可预料的后果。
private final Map, Object> childOptions = new LinkedHashMap();
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
this.childGroup = bootstrap.childGroup;
this.childHandler = bootstrap.childHandler;
synchronized(bootstrap.childOptions) {
this.childOptions.putAll(bootstrap.childOptions);
}
this.childAttrs.putAll(bootstrap.childAttrs);
}
二 正确使用锁
大家都知道使用锁的必要性,但对于锁使用的准确时机、锁的范围(粒度),还有锁之间如何协同的了解不够,这会导致使用出错、或者因为锁范围过大导致不必要的资源消耗,并且降低系统/执行效率。
java.util.concurrent包内,有一个抽象类ForkJoinTask。有这样一个场景,externalAwaitDone方法,直到指定的条件满足时,才会继续执行,代码如下所示:
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
其中,最重要的逻辑是do... while循环;并且在同步代码块下,有对wait() 和 notifyAll() 方法的使用。这里面涉及几个关键点:
1)wait方法用于让线程等待某个条件,且方法必须在同步块内部调用,否则会提示语法错误。使用示例如下:

2)需要在循环内调用wait方法,而不能在循环制外调用。尽管有可能不满足唤醒条件,但因为其他线程可能会调用notifyAll()方法,这会使被阻塞的线程被意外唤醒。这样对锁保护约定的破坏会导致约束失败,从而导致无法预知的结果。
3)notify和notifyAll都是唤醒线程的方法。当不确定应该调用哪个方法时,notifyAll可以唤醒所有等待的线程。从优化角度来看,如果等待状态的线程都是在等待同一个条件,并且每次只有一个线程可以从这个条件被唤醒,那么应该使用notify。
三 volatile
java中,volatile关键字可以认为是一个轻量级的同步机制。内存模型语义是保证线程可见,以及禁止指令重排序。
NioEventLoop中,定义了一个私有int类型变量:ioRatio。这个变量没有注释,但从定义上看,是用于设置I/O执行时间比例的。
private volatile int ioRatio = 50;
public void setIoRatio(int ioRatio) {
if (ioRatio > 0 && ioRatio <= 100) {
this.ioRatio = ioRatio;
} else {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
}
在NioEventLoop中,线程并没有调用这个设置方法,而是由外部发起设置动作。通常来说,会是业务线程调用这个方法,重新设置参数。 这种情况下,就形成了一个线程写,一个线程读的场景。在这样的场景之下,vplatile可以用来替代synchronize以提升并发访问性能。
四 CAS与原子类
synchronized这类互斥同步,由于进行线程阻塞和唤醒,会带来较大的性能损耗,从而也被成为阻塞同步。从乐观/悲观锁的角度来说,是一种悲观的并发策略,所以属于悲观锁。相对地,非阻塞同步,也可以成为乐观锁。这种方法可以简单描述为,先进行操作,在操作完成后判断是否成功(是否有并发问题),如果有,就做失败补偿,如果没有就说明操作成功。通过这样的方式,在一些场景下避免了同步锁的弊端,降低了不必要的资源消耗。
CAS就是这样的一种非阻塞同步实现方式。IA64和X86这两种指令集下,都是通过cmpxchg指令来完成CAS。其他系统中使用的命令不同。
Netty中,ChannelOutboundBuffer类,就有CAS的使用实例。为了统计发送的总字节数,类中定于了totalPendingSize变量用于记录字节数,这是一个volatile变量。我们已经明确,volatile无法保证多线程并发修改的安全性,所以在类中又定义了一个AtomicIntegerFieldUpdater类型边领:TOTAL_PENDING_SIZE_UPDATER,通过它来实现totalPendingSize的院子更新。
AtomicIntegerFieldUpdater是一个抽象类,注释说明如下:
/**
* A reflection-based utility that enables atomic updates to
* designated {@code volatile long} fields of designated classes.
* This class is designed for use in atomic data structures in which
* several fields of the same node are independently subject to atomic
* updates.
*
* Note that the guarantees of the {@code compareAndSet}
* method in this class are weaker than in other atomic classes.
* Because this class cannot ensure that all uses of the field
* are appropriate for purposes of atomic access, it can
* guarantee atomicity only with respect to other invocations of
* {@code compareAndSet} and {@code set} on the same updater.
*
* @since 1.5
* @author Doug Lea
* @param The type of the object holding the updatable field
*/
另外:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size != 0L) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
this.setUnwritable(invokeLater);
}
}
}
这里没有使用锁,而是用了TOTAL_PENDING_SIZE_UPDATER的addAndGet方法。继续查看这个addAndGet方法:
/**
* Atomically adds the given value to the current value of the field of
* the given object managed by this updater.
*
* @param obj An object whose field to get and set
* @param delta the value to add
* @return the updated value
*/
public long addAndGet(T obj, long delta) {
long prev, next;
do {
prev = get(obj);
next = prev + delta;
} while (!compareAndSet(obj, prev, next));
return next;
}
底层是通过compareAndSet()实现的。
对新旧值的操作,CAS通常的使用方法就是:
1)先对oldValue进行更新,oldValue = totalPendingSize;
2)重新对更新值进行计算: newWriteBufferSize = oldValue + size;
3)继续循环进行CAS,直到成功为止。
JDK中提供的Atomic原子类,可以避免同步锁带来的并发访问性能额外损耗的问题,所以Netty中我们在很多地方都可以看到int、long、boolean等类型的变量使用对应的原子类的情况。这也是Netty高性能保障的一个重要来源。