当前位置: 首页 > 分布式系统 > 正文

Zookeeper C系列:应用示例

1 星2 星3 星4 星5 星 (2 次投票, 评分: 5.00, 总分: 5)
Loading ... Loading ...
baidu_share

设想如下场景:

假设程序 A 需要 7* 24 小时在线对外提供服务,但是 A 程序在生产环境下总是不稳定,时常崩溃,不过幸运的是解决方案很简单,在 A 程序崩溃以后只需要重启它就可以了。当然如此简单的问题你可以提出多种解决方案,比方说自己实现一个服务程序,每隔一定时间去轮询 A 的状态,如果发现 A 崩溃了,立即重启它,并向管理人员报告问题。不过我们并不打算这么做,毕竟本文主题是讲 Zookeeper C API 的应用,所以我们采用 Zookeeper 服务来解决该问题。

若采用 Zookeeper 服务可以按照如下方案解决问题,程序 A 在启动时创建一个临时(ZOO_EPHEMERAL) znode 节点 /A,然后按照正常流程对外提供服务。另外监控程序对 /A 节点设置监视,当 /A 节点消失(说明 A 程序已经崩溃)时,重启 A 程序。假设 A 的名称是 QueryServer,即对外提供查询服务的程序,具体提供什么查询服务由应用自身决定,我们这里只是简单地模拟一下。QueryServer 在启动时创建一个 /QueryServer 的临时节点(ZOO_EPHEMERAL),然后,程序 QueryServerd 监控 /QueryServer 节点,当 /QueryServer 节点消失(说明 A 程序已经崩溃)时,重启 QueryServer 程序。

下面是 QueryServer 的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>
 
void QueryServer_watcher_g(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            printf("[[[QueryServer]]] Connected to zookeeper service successfully!\n");
        } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
            printf("Zookeeper session expired!\n");
        }
    }  
}
 
void QueryServer_string_completion(int rc, const char *name, const void *data)
{
    fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc);
    if (!rc) {
        fprintf(stderr, "\tname = %s\n", name);
    }
}
 
void QueryServer_accept_query()
{
    printf("QueryServer is running...\n");
}
 
int main(int argc, const char *argv[])
{
    const char* host = "127.0.0.1:2181,127.0.0.1:2182,"
        "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
    int timeout = 30000;
 
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zhandle_t* zkhandle = zookeeper_init(host,
            QueryServer_watcher_g, timeout, 0, "hello zookeeper.", 0);
    if (zkhandle == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        exit(EXIT_FAILURE);
    }
 
    // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}};
    // struct ACL_vector ALL_PERMS = {1, ALL_ACL};
    int ret = zoo_acreate(zkhandle, "/QueryServer", "alive", 5,
           &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
           QueryServer_string_completion, "zoo_acreate");
    if (ret) {
        fprintf(stderr, "Error %d for %s\n", ret, "acreate");
        exit(EXIT_FAILURE);
    }
 
    do {
        // 模拟 QueryServer 对外提供服务.
        // 为了简单起见, 我们在此调用一个简单的函数来模拟 QueryServer.
        // 然后休眠 5 秒,程序主动退出(即假设此时已经崩溃).
        QueryServer_accept_query();
        sleep(5);
    } while(false);
 
    zookeeper_close(zkhandle);
}

Makefile如下:

1
2
3
4
5
6
7
8
9
10
11
12
all:QueryServer
 
QueryServer:QueryServer.o
    gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 
 
QueryServer.o:QueryServer.c
    gcc -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^
 
.PHONY:clean
 
clean:
    rm QueryServer.o QueryServer

QueryServerd 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>
 
void QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                            const char *path, void *watcherCtx);
static void QueryServerd_dump_stat(const struct Stat *stat);
void QueryServerd_stat_completion(int rc, const struct Stat *stat,
                             const void *data);
void QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                              const char *path, void *watcherCtx);
static void QueryServerd_awexists(zhandle_t *zh);
 
void
QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                            const char *path, void *watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            printf("Connected to zookeeper service successfully!\n");
        } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
            printf("Zookeeper session expired!\n");
        }
    }
}
 
static void
QueryServerd_dump_stat(const struct Stat *stat)
{
    char tctimes[40];
    char tmtimes[40];
    time_t tctime;
    time_t tmtime;
 
    if (!stat) {
        fprintf(stderr, "null\n");
        return;
    }
    tctime = stat->ctime / 1000;
    tmtime = stat->mtime / 1000;
 
    ctime_r(&tmtime, tmtimes);
    ctime_r(&tctime, tctimes);
 
    fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
            "\tmtime=%s\tmzxid=%llx\n"
            "\tversion=%x\taversion=%x\n"
            "\tephemeralOwner = %llx\n",
            tctimes, stat->czxid,
            tmtimes, stat->mzxid,
            (unsigned int) stat->version, (unsigned int) stat->aversion,
            stat->ephemeralOwner);
}
 
void
QueryServerd_stat_completion(int rc, const struct Stat *stat,
                             const void *data)
{
    // fprintf(stderr, "%s: rc = %d Stat:\n", (char *) data, rc);
    // QueryServerd_dump_stat(stat);
}
 
void
QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                              const char *path, void *watcherCtx)
{
    if (state == ZOO_CONNECTED_STATE) {
        if (type == ZOO_DELETED_EVENT) {
            printf("QueryServer gone away, restart now...\n");
            // re-exists and set watch on /QueryServer again.
            QueryServerd_awexists(zh);
            pid_t pid = fork();
            if (pid < 0) {
                fprintf(stderr, "Error when doing fork.\n");
                exit(EXIT_FAILURE);
            }
            if (pid == 0) { /* child process */
                // 重启 QueryServer 服务.
                execl("/tmp/QueryServer/QueryServer", "QueryServer", NULL);
                exit(EXIT_SUCCESS);
            }
            sleep(1); /* sleep 1 second for purpose. */
        } else if (type == ZOO_CREATED_EVENT) {
            printf("QueryServer started...\n");
        }
    }
 
    // re-exists and set watch on /QueryServer again.
    QueryServerd_awexists(zh);
}
 
static void
QueryServerd_awexists(zhandle_t *zh)
{
    int ret =
        zoo_awexists(zh, "/QueryServer",
                     QueryServerd_watcher_awexists,
                     "QueryServerd_awexists.",
                     QueryServerd_stat_completion,
                     "zoo_awexists");
    if (ret) {
        fprintf(stderr, "Error %d for %s\n", ret, "aexists");
        exit(EXIT_FAILURE);
    }
}
 
int
main(int argc, const char *argv[])
{
    const char *host = "127.0.0.1:2181,127.0.0.1:2182,"
        "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
    int timeout = 30000;
 
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zhandle_t *zkhandle = zookeeper_init(host,
                                         QueryServerd_watcher_global,
                                         timeout,
                                         0, "QueryServerd", 0);
    if (zkhandle == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        exit(EXIT_FAILURE);
    }
 
    QueryServerd_awexists(zkhandle);
    // Wait for asynchronous zookeeper call done.
    getchar();
 
    zookeeper_close(zkhandle);
 
    return 0;
}

Makefile 如下:

1
2
3
4
5
6
7
8
9
10
11
12
all:QueryServerd
 
QueryServerd:QueryServerd.o
    gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 
 
QueryServerd.o:QueryServerd.c
    gcc -g -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^
 
.PHONY:clean
 
clean:
    rm QueryServerd.o QueryServerd

首先执行 QueryServerd

1
2
forhappy@haiping-ict:/tmp/QueryServerd$ ./QueryServerd 
Connected to zookeeper service successfully!

然后执行 QueryServer,

1
2
3
4
5
forhappy@haiping-ict:/tmp/QueryServer$ ./QueryServer 
QueryServer is running...
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer

可见 Queryerver 创建了 /QueryServer 节点,5 秒后 QueryServer 模拟程序崩溃而退出,那么此时在 QueryServerd 端输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Connected to zookeeper service successfully!
QueryServer started... # QueryServerd 感知到 QueryServer 已正常启动.
QueryServer gone away, restart now... # 5 秒钟后,QueryServer 崩溃,QueryServerd 准备重启 QueryServer.
QueryServer is running... # QueryServer 正在运行,以下 3 行是 QueryServer 输出结果。
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started... # QueryServerd 感知到 QueryServer 已正常启动.
QueryServer gone away, restart now...# 又过了 5 秒钟后,QueryServer 崩溃,QueryServerd 准备重启 QueryServer.
QueryServer is running... # QueryServer 再次运行,以下 3 行是 QueryServer 输出结果。
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started... # QueryServerd 再次感知到 QueryServer 已正常启动,如此反复.
QueryServer gone away, restart now...
QueryServer is running...
[[[QueryServer]]] Connected to zookeeper service successfully!
[zoo_acreate]: rc = 0
    name = /QueryServer
QueryServer started...

本文固定链接: http://www.chepoo.com/zookeeper-c-apply-example.html | IT技术精华网

Zookeeper C系列:应用示例:等您坐沙发呢!

发表评论