使用Zookeeper实现Leader(Master)选举

使用Zookeeper实现Leader(Master)选举

http://blog.csdn.net/MassiveStars/article/details/53894551

应用场景

分布式系统最典型的架构就是一主多从。在很多时候,虽然处理大规模的数据、图像和文件等,这种工作极其耗资源而且数据、文件等都是共享的,若全部机器都计算处理一次会浪费保贵的计算资源;我们可以把这些工作交给一台机器处理,其它机器则通过数据库、分布式文件系统等方式共享计算成果Leader(Master)。另外,对于数据库、缓存等组件读写分离是惯用的提高性能的方式;读写分离是把写全部给leader(master),查询则使用follower的机器。使用Zookeeper提供的API可轻松实现leader选举。

具体步骤

1、客户端连接时,在指定的目录(这里假定为"/leader")创建一个EPHEMERAL_SEQUENTIAL的节点,把内网的IP数据存入创建节点。

2、获取目录的子点节,并取得序列号最小的节点,我们把这个节点设置为leader。当此节点被删除时,证明leader断线。

3、其它机器监听leader节点,当leader节点的删除时,再取目录的最小子点节作为leader

 

详细代码

zk客户端工具类

[java] view plain copy print?

package org.massive.common;  

  

import org.apache.zookeeper.WatchedEvent;  

import org.apache.zookeeper.Watcher;  

import org.apache.zookeeper.ZooKeeper;  

  

import java.io.IOException;  

import java.util.concurrent.CountDownLatch;  

import java.util.concurrent.TimeUnit;  

  

/** 

 * Created by Massive on 2016/12/18. 

 */  

public class ZooKeeperClient {  

  

    private static String connectionString = "localhost:2181";  

    private static int sessionTimeout = 10000;  

  

  

    public static ZooKeeper getInstance() throws IOException, InterruptedException {  

        //--------------------------------------------------------------  

        // 为避免连接还未完成就执行zookeeperget/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)  

        // 这里等Zookeeper的连接完成才返回实例  

        //--------------------------------------------------------------  

        final CountDownLatch connectedSignal = new CountDownLatch(1);  

        ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {  

            @Override  

            public void process(WatchedEvent event) {  

                if (event.getState() == Event.KeeperState.SyncConnected) {  

                    connectedSignal.countDown();  

                }  

            }  

        });  

        connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);  

        return zk;  

    }  

  

    public static int getSessionTimeout() {  

        return sessionTimeout;  

    }  

  

    public static void setSessionTimeout(int sessionTimeout) {  

        ZooKeeperClient.sessionTimeout = sessionTimeout;  

    }  

  

}  


LeaderElection

[java] view plain copy print?

package org.massive.group;  

  

import org.apache.zookeeper.*;  

import org.apache.zookeeper.data.Stat;  

import org.massive.common.ZooKeeperClient;  

  

import java.io.IOException;  

import java.io.UnsupportedEncodingException;  

import java.net.*;  

import java.util.*;  

import java.util.concurrent.CountDownLatch;  

  

/** 

 * Created by Massive on 2016/12/25. 

 */  

public class LeaderElection {  

  

    private ZooKeeper zk;  

    private int sessionTimeout;  

  

    private static byte[] DEFAULT_DATA = {0x12,0x34};  

    private static Object mutex = new Object();  

  

    private static String ROOT = "/leader";  

  

    private byte[] localhost = getLocalIpAdressBytes();  

  

    private String znode;  

  

    private static CountDownLatch firstElectionSignal = new CountDownLatch(1);  

  

    //----------------------------------------------------  

    // leaderIP地址  

    //----------------------------------------------------  

    private static String leader;  

  

    public LeaderElection() throws IOException, InterruptedException, KeeperException {  

        this.zk = ZooKeeperClient.getInstance();  

        this.sessionTimeout = zk.getSessionTimeout();  

        ensureExists(ROOT);  

        ensureLocalNodeExists();  

  

        System.out.println("-------------------------------------");  

        System.out.println("local IP: " + getLocalIpAddress());  

        System.out.println("local created node: " + znode);  

        System.out.println("-------------------------------------");  

  

  

    }  

  

    /** 

     * 检查本机是否已创建节点,不存在则创建 

     * @throws KeeperException 

     * @throws InterruptedException 

     */  

    public void ensureLocalNodeExists() throws KeeperException, InterruptedException {  

        List<String> list = zk.getChildren(ROOT,new NodeDeleteWatcher());  

        for (String node : list) {  

            Stat stat = new Stat();  

            String path = ROOT + "/" + node;  

            byte[] data = zk.getData(path,false,stat);  

  

            if (Arrays.equals(data,localhost)) {  

                znode = path;  

                return;  

            }  

        }  

        znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);  

  

    }  

  

    public void ensureExists(String path) {  

        try {  

            Stat stat = zk.exists(path, false);  

            if (stat == null) {  

                zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  

            }  

        } catch (KeeperException e) {  

            e.printStackTrace();  

        } catch (InterruptedException e) {  

            e.printStackTrace();  

        }  

    }  

  

    public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {  

  

        do{  

            synchronized (mutex) {  

  

                System.out.println("begin leader election...");  

  

                List<String> nodes = zk.getChildren(ROOT, null);  

                SortedSet<String> sortedNode = new TreeSet<String>();  

                for (String node : nodes) {  

                    sortedNode.add(ROOT + "/" + node);  

                }  

                //----------------------------------------------------  

                // 取出序列号最小的消息  

                //----------------------------------------------------  

                String first = sortedNode.first();  

                leader = first;  

                //----------------------------------------------------  

                // 监控序列最小节点(非本机创建节点)  

                //----------------------------------------------------  

                NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();  

                byte[] data = zk.getData(first, watcher, null);  

                leader = new String(data, "UTF-8");  

  

                System.out.println("leader election end, the leader is : " + leader);  

  

                if (firstElectionSignal.getCount() != 0) {  

                    firstElectionSignal.countDown();  

                }  

  

                if (znode.equals(first)) {  

                    return;  

                }  

  

                mutex.wait();  

            }  

        } while (true);  

  

  

    }  

  

    class NodeDeleteWatcher implements Watcher {  

        @Override  

        public void process(WatchedEvent event) {  

            if (event.getType() == Event.EventType.NodeDeleted) {  

                synchronized (mutex) {  

                    mutex.notify();  

                }  

            }  

        }  

    }  

  

    /** 

     * 获取本地内网IP地址 

     * @return 

     * @throws SocketException 

     */  

    public static String getLocalIpAddress() throws SocketException {  

        // 获得本机的所有网络接口  

        Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();  

  

        while (nifs.hasMoreElements()) {  

            NetworkInterface nif = nifs.nextElement();  

  

            // 获得与该网络接口绑定的 IP 地址,一般只有一个  

            Enumeration<InetAddress> addresses = nif.getInetAddresses();  

            while (addresses.hasMoreElements()) {  

                InetAddress addr = addresses.nextElement();  

  

                String ip = addr.getHostAddress();  

                // 只关心 IPv4 地址  

                if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {  

                    return ip;  

                }  

            }  

        }  

        return null;  

    }  

  

    public static byte[] getLocalIpAdressBytes() throws SocketException {  

        String ip = getLocalIpAddress();  

        return ip == null ? null : ip.getBytes();  

    }  

  

    public static String getLeader() throws InterruptedException {  

        //----------------------------------------------------  

        // 第一次leader选举完成后才释放阀门  

        //----------------------------------------------------  

        firstElectionSignal.await();  

        return leader;  

    }  

  

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {  

  

        //-------------------------------------------------------  

        // 启动一条线程用处理leader选举  

        //-------------------------------------------------------  

        new Thread(new Runnable() {  

            @Override  

            public void run() {  

                try {  

                    LeaderElection leaderElection = new LeaderElection();  

                    leaderElection.start();  

                } catch (KeeperException e) {  

                    e.printStackTrace();  

                } catch (InterruptedException e) {  

                    e.printStackTrace();  

                } catch (UnsupportedEncodingException e) {  

                    e.printStackTrace();  

                } catch (IOException e) {  

                    e.printStackTrace();  

                }  

            }  

        }).start();  

  

        String leader = LeaderElection.getLeader();  

  

    }  

}  

 

测试

运行main方法,本机器的某次输出结果

[plain] view plain copy print?

-------------------------------------  

local IP: 192.168.1.103  

local created node: /leader/0000000017  

-------------------------------------  

begin leader election...  

leader election end, the leader is : 192.168.1.103  


注:本文只进行了简单的测试,要在生产环境中使用请先同时多在个JVM上进行测试。

 

 

参考文章:  

http://zookeeper.apache.org/doc/r3.4.9/recipes.html

 

本文为原创,转载请注明出处http://blog.csdn.net/massivestars/article/details/53894551

 

 

ZooKeeper实现分布式锁和队列可参考:

使用Zookeeper实现分布式锁

使用ZooKeeper实现队列

相关文章

相关标签/搜索