当前位置: 首页 > 分布式系统 > 正文

ZooKeeper Java API 使用样例

关键字:
1 星2 星3 星4 星5 星 (1 次投票, 评分: 5.00, 总分: 5)
Loading ... Loading ...
baidu_share

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合。通过这些原语言的组合使用,能够帮助我们解决更高层次的分布式问题,关于ZooKeeper的典型使用场景,请查看这个文章《ZooKeeper典型应用场景一览

本文主要针对ZooKeeper提供的Java API,通过实际代码讲述如何使用API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.taobao.taokeeper.research.sample; 
 
import java.io.IOException; 
import java.util.concurrent.CountDownLatch; 
 
import org.apache.zookeeper.CreateMode; 
import org.apache.zookeeper.KeeperException; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.Watcher.Event.KeeperState; 
import org.apache.zookeeper.ZooDefs.Ids; 
import org.apache.zookeeper.ZooKeeper; 
 
import common.toolkit.java.util.ObjectUtil; 
 
/** 
 * ZooKeeper Java Api 使用样例<br> 
 * ZK Api Version: 3.4.3 
 *  
 * @author nileader/nileader@gmail.com 
 */ 
public class JavaApiSample implements Watcher { 
 
    private static final int SESSION_TIMEOUT = 10000; 
    private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181"; 
    private static final String ZK_PATH = "/nileader"; 
    private ZooKeeper zk = null; 
 
    private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); 
 
    /** 
     * 创建ZK连接 
     * @param connectString  ZK服务器地址列表 
     * @param sessionTimeout   Session超时时间 
     */ 
    public void createConnection( String connectString, int sessionTimeout ) { 
        this.releaseConnection(); 
        try { 
            zk = new ZooKeeper( connectString, sessionTimeout, this ); 
            connectedSemaphore.await(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "连接创建失败,发生 InterruptedException" ); 
            e.printStackTrace(); 
        } catch ( IOException e ) { 
            System.out.println( "连接创建失败,发生 IOException" ); 
            e.printStackTrace(); 
        } 
    } 
 
    /** 
     * 关闭ZK连接 
     */ 
    public void releaseConnection() { 
        if ( !ObjectUtil.isBlank( this.zk ) ) { 
            try { 
                this.zk.close(); 
            } catch ( InterruptedException e ) { 
                // ignore 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    /** 
     *  创建节点 
     * @param path 节点path 
     * @param data 初始数据内容 
     * @return 
     */ 
    public boolean createPath( String path, String data ) { 
        try { 
            System.out.println( "节点创建成功, Path: " 
                    + this.zk.create( path, // 
                                              data.getBytes(), // 
                                              Ids.OPEN_ACL_UNSAFE, // 
                                              CreateMode.EPHEMERAL ) 
                    + ", content: " + data ); 
        } catch ( KeeperException e ) { 
            System.out.println( "节点创建失败,发生KeeperException" ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "节点创建失败,发生 InterruptedException" ); 
            e.printStackTrace(); 
        } 
        return true; 
    } 
 
    /** 
     * 读取指定节点数据内容 
     * @param path 节点path 
     * @return 
     */ 
    public String readData( String path ) { 
        try { 
            System.out.println( "获取数据成功,path:" + path ); 
            return new String( this.zk.getData( path, false, null ) ); 
        } catch ( KeeperException e ) { 
            System.out.println( "读取数据失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } catch ( InterruptedException e ) { 
            System.out.println( "读取数据失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
            return ""; 
        } 
    } 
 
    /** 
     * 更新指定节点数据内容 
     * @param path 节点path 
     * @param data  数据内容 
     * @return 
     */ 
    public boolean writeData( String path, String data ) { 
        try { 
            System.out.println( "更新数据成功,path:" + path + ", stat: " + 
                                                        this.zk.setData( path, data.getBytes(), -1 ) ); 
        } catch ( KeeperException e ) { 
            System.out.println( "更新数据失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "更新数据失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
        return false; 
    } 
 
    /** 
     * 删除指定节点 
     * @param path 节点path 
     */ 
    public void deleteNode( String path ) { 
        try { 
            this.zk.delete( path, -1 ); 
            System.out.println( "删除节点成功,path:" + path ); 
        } catch ( KeeperException e ) { 
            System.out.println( "删除节点失败,发生KeeperException,path: " + path  ); 
            e.printStackTrace(); 
        } catch ( InterruptedException e ) { 
            System.out.println( "删除节点失败,发生 InterruptedException,path: " + path  ); 
            e.printStackTrace(); 
        } 
    } 
 
    public static void main( String[] args ) { 
 
        JavaApiSample sample = new JavaApiSample(); 
        sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); 
        if ( sample.createPath( ZK_PATH, "我是节点初始内容" ) ) { 
            System.out.println(); 
            System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.writeData( ZK_PATH, "更新后的数据" ); 
            System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + "\n" ); 
            sample.deleteNode( ZK_PATH ); 
        } 
 
        sample.releaseConnection(); 
    } 
 
    /** 
     * 收到来自Server的Watcher通知后的处理。 
     */ 
    @Override 
    public void process( WatchedEvent event ) { 
        System.out.println( "收到事件通知:" + event.getState() +"\n"  ); 
        if ( KeeperState.SyncConnected == event.getState() ) { 
            connectedSemaphore.countDown(); 
        } 
 
    } 
 
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
收到事件通知:SyncConnected 
 
节点创建成功, Path: /nileader, content: 我是节点初始内容 
 
获取数据成功,path:/nileader 
数据内容: 我是节点初始内容 
 
更新数据成功,path:/nileader, stat: 42950186407,42950186408,1350820182392,1350820182406,1,0,0,232029990722229433,18,0,42950186407 
 
获取数据成功,path:/nileader 
数据内容: 更新后的数据 
 
删除节点成功,path:/nileader

本文固定链接: http://www.chepoo.com/zookeeper-java-api-example.html | IT技术精华网

ZooKeeper Java API 使用样例:等您坐沙发呢!

发表评论