Bootstrap

Java并发编程系列——分布式锁

之前的文章写了并发下的锁控制,但是这仅能针对一个应用内部,如果是在分布式环境下,这样的锁机制是达不到效果的,所以这一篇写写如何实现分布式锁。

分布式锁有几种常见的实现方式,比如借助数据库,使用redis,还有就是zookeeper一类。数据库的方式性能通常要低不少,Redis有一致性问题,就暂时不研究了,本文介绍如何利用zookeeper实现分布式锁,现在一些库已经有完整的实现,可以直接使用,而本文希望能说明其实现的基本原理,通过简易的代码来展示。

先假设一个需求,在文件服务器中有一个顺序文件,现在有多个服务涉及使用该服务读写文件的需求。接下来就对这个问题进行拆解,一步步的来实现一下。

利用zookeeper可以有两种实现方式。至于zookeeper的安装可以查阅相关文档,在本机实验的话,使用docker会很方便。

一种是利用插入重复节点会抛出异常这个特性。实现步骤如下:

1、当要获取锁时,尝试创建同名节点

2_1、如果创建成功则获得锁

2_2、如果创建不成功(即收到异常),则创建对该节点的监听器,监听该节点的删除事件,当该节点删除事件发生时,则重复步骤1

3、锁使用后删除节点

另一种是利用插入同名有序节点时会生成有序号的子节点。实现步骤如下:

1、创建根节点

2、当切获取锁时,创建当前线程下的有序节点,获得节点路径

3_1、如果当前节点路径与根节点下的第一个有序节点同名,则获得锁

3_2、如果当前节点路径与根节点下的第一个有序节点不同名,则判断当前该节点前一个节点是否存存,存在则监听其删除事件,删除事件发生时重复3_1,如果前一节点不存在,重复3_1

4、锁使用后删除当前线程节点

上述两段可用流程表示如下

可以看出第一种实现要简单的多,但是监听都发生在同一个节点上,又是使用异常,性能必定不会太好,本机测试后者性能起码是前者的两倍。

现在开始来设计这个实现,因为要实现两个不同的锁,所以在结构上要设计得相对灵活些,容易扩展多种实现,而对外提供服务上,和一般的锁差不多,只需要对外提供lock()和unlock()方法即可。我们采用模板模式来设计一下锁的框架。如下图:

接口实现:

public interface ZkLock {
    public void lock();
    public void unLock();
}

模板类实现:

public abstract class ZkAbstrackLock implements ZkLock {
    private static final String ZK_HOST = "127.0.0.1:2181";
    protected ZkClient zkClient = new ZkClient(ZK_HOST);

    @Override
    public void lock() {
        if (tryLock()) {
            System.out.println("got lock");
        } else {
            await();
            lock();
        }
    }

    protected abstract boolean tryLock();
    protected abstract void await();
}

模板类中定义了lock的骨架,不管哪种锁实现,lock的形式统一,这也就是采用模板的意义所在。

接下来是异常方式的实现:

public class ZkLockWithException extends ZkAbstrackLock {

    private static final String LOCK_PATH = "/lockWithException";

    private CountDownLatch latch = null;

    @Override
    protected boolean tryLock() {
        try {
            zkClient.createEphemeral(LOCK_PATH);
            return true;
        } catch (Exception e) {
            System.out.println("exception thrown and wait");
            return false;
        }
    }

    @Override
    protected void await() {
        IZkDataListener dataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (latch != null) {
                    System.out.printf("%s removed\n", s);
                    latch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(LOCK_PATH, dataListener);
        if (zkClient.exists(LOCK_PATH)) {
            latch = new CountDownLatch(1);
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(LOCK_PATH, dataListener);
    }

    @Override
    public void unLock() {
        zkClient.delete(LOCK_PATH);
        zkClient.close();
        System.out.println("lock released");
    }
}

这里注意监听器中有两个事件,因为我们最终都会采用将节点删除的方式,所以这里使用删除事件来进行通知,至于CountDownLatch的作用之前的文章有介绍,作用就是当计数为零时自动唤起等待的线程。

接下来是使用有序节点的实现方式:

public class ZkLockWithSort extends ZkAbstrackLock {
    private static final String LOCK_PATH = "/lockWithSort";
    private String currentPath;
    private String prePath;
    private CountDownLatch latch;

    public ZkLockWithSort() {
        try {
            zkClient.createPersistent(LOCK_PATH);
        } catch (ZkNodeExistsException e) {
            System.out.println(LOCK_PATH + " is already existed");
        }
    }

    @Override
    protected boolean tryLock() {
        if (currentPath == null || currentPath.length() <= 0) {
            currentPath = zkClient.createEphemeralSequential(LOCK_PATH + "/", null);
        }
        List children = zkClient.getChildren(LOCK_PATH);
        Collections.sort(children);
        if (currentPath.equals(LOCK_PATH + "/" + children.get(0))) {
            System.out.printf("path:%s got lock\n", currentPath);
            return true;
        } else {
            int sequence = Collections.binarySearch(children, currentPath.substring(LOCK_PATH.length() + 1));
            prePath = LOCK_PATH + "/" + children.get(sequence - 1);
        }
        System.out.printf("path:%s wait pre:%s\n", currentPath, prePath);
        return false;
    }

    @Override
    protected void await() {
        IZkDataListener dataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (latch != null) {
                    System.out.printf("path:%s deleted\n", s);
                    latch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(prePath, dataListener);
        if (zkClient.exists(prePath)) {
            latch = new CountDownLatch(1);
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(prePath, dataListener);
    }

    @Override
    public void unLock() {
        zkClient.delete(currentPath);
        zkClient.close();
        System.out.printf("path:%s lock released\n", currentPath);
    }
}

回过头来我们实现之前提出的需求,读写文件,这里为体现出锁的作用,在文件中每行记录的当前的行号,最后我们启多个应用来同时执行,最后来看文件的内容是否是正确的。

文件顺序写入实现:

public class Sequence {
    private static final String FILE_PATH = "/xxx/sequence.txt";
    private static int sequence = 0;
    private ZkLock lock;

    public Sequence(ZkLock lock) {
        this.lock = lock;
    }

    public void writeSequenceToFile() {
        try {
            lock.lock();

            try (LineNumberReader lnr = new LineNumberReader(new FileReader(FILE_PATH));
                 FileWriter fw = new FileWriter(FILE_PATH, true)) {
                int lineNumber = 0;
                while (lnr.readLine() != null) {
                    lineNumber++;
                }
                fw.write(lineNumber + 1 + "\r\n");
                fw.flush();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } finally {
            lock.unLock();
        }
    }
}

最后来测试一下:

public class Main1 {
    public static void main(String[] args) throws InterruptedException {
        int count = 100;
        CountDownLatch latch = new CountDownLatch(count);
        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                //指定不同的锁进行测试
                new Sequence(new ZkLockWithException()).writeSequenceToFile();
                latch.countDown();
            }).start();
        }
        latch.await();
        System.out.printf("time elapsed:%d", System.currentTimeMillis() - start);
    }
}

为了模拟分布式访问,可以同时多启动几个测试程序。最后来看下文件写入的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
……

到这里,在zookeeper下实现分布式锁的两种方式就完成了。

本系列其他文章: