Bootstrap

源码分析-Netty: 并发编程的实践(二)

系列文章:

摘要

前面一篇介绍了多线程在Netty中的大概使用情况,本篇将结合源码,详细描述使用方式,以及值得我们思考、学习和借鉴的地方。

一 synchronized使用

关键字synchronized,我们在并发编程的艺术系列文章中有过描述,用于保证在同一时刻,只有一个线程能够执行某个方法或代码块。同步的作用,既有互斥,也有保证共享变量的可见性:当某个线程修改了变量值并释放锁后,其他线程可以立即获取被修改变量的最新值。

以ServerBootstrap进行分析。在类中,定义了一个final变量childOptions,值为LinkedHashMap。在构造方法中,对bootstrap.childOptions进行了同步。这是因为,LinkedHashMap是非线程安全的,所以当存在多线程并发创建ServerBootstrap的实例时,访问和修改这个变量,必须在外部做好同步,否则会导致不可预料的后果。

private final Map, Object> childOptions = new LinkedHashMap();

private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        this.childGroup = bootstrap.childGroup;
        this.childHandler = bootstrap.childHandler;
        synchronized(bootstrap.childOptions) {
            this.childOptions.putAll(bootstrap.childOptions);
        }

        this.childAttrs.putAll(bootstrap.childAttrs);
    }

二 正确使用锁

大家都知道使用锁的必要性,但对于锁使用的准确时机、锁的范围(粒度),还有锁之间如何协同的了解不够,这会导致使用出错、或者因为锁范围过大导致不必要的资源消耗,并且降低系统/执行效率。

java.util.concurrent包内,有一个抽象类ForkJoinTask。有这样一个场景,externalAwaitDone方法,直到指定的条件满足时,才会继续执行,代码如下所示:

/**
     * Blocks a non-worker-thread until completion.
     * @return status upon completion
     */
    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }

其中,最重要的逻辑是do... while循环;并且在同步代码块下,有对wait() 和 notifyAll() 方法的使用。这里面涉及几个关键点:

1)wait方法用于让线程等待某个条件,且方法必须在同步块内部调用,否则会提示语法错误。使用示例如下:

2)需要在循环内调用wait方法,而不能在循环制外调用。尽管有可能不满足唤醒条件,但因为其他线程可能会调用notifyAll()方法,这会使被阻塞的线程被意外唤醒。这样对锁保护约定的破坏会导致约束失败,从而导致无法预知的结果。

3)notify和notifyAll都是唤醒线程的方法。当不确定应该调用哪个方法时,notifyAll可以唤醒所有等待的线程。从优化角度来看,如果等待状态的线程都是在等待同一个条件,并且每次只有一个线程可以从这个条件被唤醒,那么应该使用notify。

三 volatile

java中,volatile关键字可以认为是一个轻量级的同步机制。内存模型语义是保证线程可见,以及禁止指令重排序。

NioEventLoop中,定义了一个私有int类型变量:ioRatio。这个变量没有注释,但从定义上看,是用于设置I/O执行时间比例的。

private volatile int ioRatio = 50;

    public void setIoRatio(int ioRatio) {
        if (ioRatio > 0 && ioRatio <= 100) {
            this.ioRatio = ioRatio;
        } else {
            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
        }
    }

在NioEventLoop中,线程并没有调用这个设置方法,而是由外部发起设置动作。通常来说,会是业务线程调用这个方法,重新设置参数。 这种情况下,就形成了一个线程写,一个线程读的场景。在这样的场景之下,vplatile可以用来替代synchronize以提升并发访问性能。

四 CAS与原子类

synchronized这类互斥同步,由于进行线程阻塞和唤醒,会带来较大的性能损耗,从而也被成为阻塞同步。从乐观/悲观锁的角度来说,是一种悲观的并发策略,所以属于悲观锁。相对地,非阻塞同步,也可以成为乐观锁。这种方法可以简单描述为,先进行操作,在操作完成后判断是否成功(是否有并发问题),如果有,就做失败补偿,如果没有就说明操作成功。通过这样的方式,在一些场景下避免了同步锁的弊端,降低了不必要的资源消耗。

CAS就是这样的一种非阻塞同步实现方式。IA64和X86这两种指令集下,都是通过cmpxchg指令来完成CAS。其他系统中使用的命令不同。

Netty中,ChannelOutboundBuffer类,就有CAS的使用实例。为了统计发送的总字节数,类中定于了totalPendingSize变量用于记录字节数,这是一个volatile变量。我们已经明确,volatile无法保证多线程并发修改的安全性,所以在类中又定义了一个AtomicIntegerFieldUpdater类型边领:TOTAL_PENDING_SIZE_UPDATER,通过它来实现totalPendingSize的院子更新。

AtomicIntegerFieldUpdater是一个抽象类,注释说明如下:

/**
 * A reflection-based utility that enables atomic updates to
 * designated {@code volatile long} fields of designated classes.
 * This class is designed for use in atomic data structures in which
 * several fields of the same node are independently subject to atomic
 * updates.
 *
 * 

Note that the guarantees of the {@code compareAndSet} * method in this class are weaker than in other atomic classes. * Because this class cannot ensure that all uses of the field * are appropriate for purposes of atomic access, it can * guarantee atomicity only with respect to other invocations of * {@code compareAndSet} and {@code set} on the same updater. * * @since 1.5 * @author Doug Lea * @param The type of the object holding the updatable field */

另外:

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size != 0L) {
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
                this.setUnwritable(invokeLater);
            }

        }
    }

这里没有使用锁,而是用了TOTAL_PENDING_SIZE_UPDATER的addAndGet方法。继续查看这个addAndGet方法:

/**
     * Atomically adds the given value to the current value of the field of
     * the given object managed by this updater.
     *
     * @param obj An object whose field to get and set
     * @param delta the value to add
     * @return the updated value
     */
    public long addAndGet(T obj, long delta) {
        long prev, next;
        do {
            prev = get(obj);
            next = prev + delta;
        } while (!compareAndSet(obj, prev, next));
        return next;
    }

底层是通过compareAndSet()实现的。

对新旧值的操作,CAS通常的使用方法就是:

1)先对oldValue进行更新,oldValue = totalPendingSize;

2)重新对更新值进行计算: newWriteBufferSize = oldValue + size;

3)继续循环进行CAS,直到成功为止。

JDK中提供的Atomic原子类,可以避免同步锁带来的并发访问性能额外损耗的问题,所以Netty中我们在很多地方都可以看到int、long、boolean等类型的变量使用对应的原子类的情况。这也是Netty高性能保障的一个重要来源。