ZeroMQ的学习和研究

分类:技术文档 - PHP文档 | 阅读(4480) | 发布于:2014-03-31 09:21

ZeroMQ的学习和研究

一、ZeroMQ的背景介绍

引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是 “成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接 字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”

近几年有关”Message Queue”的项目层出不穷,知名的就有十几种,这主要是因为后摩尔定律时代,分布式处理逐渐成为主流,业界需要一套标准来解决分布式计算环境中节点之间 的消息通信。几年的竞争下来,Apache基金会旗下的符合AMQP/1.0标准的RabbitMQ已经得到了广泛的认可,成为领先的MQ项目。

与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。

二、ZMQ是什么?

阅读了ZMQ的Guide文档后,我的理解 是,这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏 蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

三、本文的目的

在集群对外提供服务的过程中,我们有很多的配置,需要根据需要随时更新,那么这个信息如果推动到各个节点?并且保证信息的一致性和可靠性?本文在介 绍ZMQ基本理论的基础上,试图使用ZMQ实现一个配置分发中心。从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。

四、ZMQ的三个基本模型

ZMQ提供了三个基本的通信模型,分别是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”,我们从这三种模式一窥ZMQ的究竟

ZMQ的hello world!

由Client发起请求,并等待Server回应请求。请求端发送一个简单的hello,服务端则回应一个world。请求端和服务端都可以是 1:N 的模型。通常把 1 认为是 Server ,N 认为是Client 。ZMQ 可以很好的支持路由功能(实现路由功能的组件叫作Device),把 1:N 扩展为N:M (只需要加入若干路由节点)。如图1所示:

图1:ZMQ的Request-Reply 通信

服务端的php程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php
/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
$context=newZMQContext(1);
// Socket to talk to clients
$responder=newZMQSocket($context, ZMQ::SOCKET_REP);
$responder-&gt;bind("tcp://*:5555");
while(true) {
// Wait for next request from client
$request=$responder-&gt;recv();
printf ("Received request: [%s]\n",$request);
 
// Do some 'work'
sleep (1);
 
// Send reply back to client
$responder-&gt;send("World");
}

Client程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?php
 
/*
 
 *  Hello World client
 
 *  Connects REQ socket to tcp://localhost:5555
 
 *  Sends "Hello" to server, expects "World" back
 
 * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
 
 */
$context=newZMQContext();
 
//  Socket to talk to server
 
echo"Connecting to hello world server...\n";
 
$requester=newZMQSocket($context, ZMQ::SOCKET_REQ);
 
$requester-&gt;connect("tcp://localhost:5555");
 
for($request_nbr= 0;$request_nbr!= 10;$request_nbr++) {
 
    printf ("Sending request %d...\n",$request_nbr);
 
    $requester-&gt;send("Hello");
 
    $reply=$requester-&gt;recv();
 
    printf ("Received reply %d: [%s]\n",$request_nbr,$reply);
 
}

从以上的过程,我们可以了解到使用ZMQ写基本的程序的方法,需要注意的是:

a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。

b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

c) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后 Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。

d) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。

e) 虽然可以使用ZMQ实现HTTP协议,但是,这绝不是他所擅长的。

ZMQ的Publish-subscribe模式

我们可以想象一下天气预报的订阅模式,由一个节点提供信息源,由其他的节点,接受信息源的信息,如图2所示:

图2:ZMQ的Publish-subscribe

示例代码如下 :

Publisher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php
/*
* Weather update server
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
 
// Prepare our context and publisher
$context=newZMQContext();
$publisher=$context-&gt;getSocket(ZMQ::SOCKET_PUB);
$publisher-&gt;bind("tcp://*:5556");
 
while(true) {
// Get values that will fool the boss
$zipcode= mt_rand(0, 100000);
$temperature= mt_rand(-80, 135);
$relhumidity= mt_rand(10, 60);
 
// Send message to all subscribers
$update= sprintf ("%05d %d %d",$zipcode,$temperature,$relhumidity);
$publisher-&gt;send($update);
}</pre>
Subscriber
<pre>&lt;?php
/*
* Weather update client
* Connects SUB socket to tcp://localhost:5556
* Collects weather updatesandfinds avg temp in zipcode
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
 
$context=newZMQContext();
 
// Socket to talk to server
echo"Collecting updates from weather server…", PHP_EOL;
$subscriber=newZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber-&gt;connect("tcp://localhost:5556");
 
// Subscribe to zipcode, default is NYC, 10001
$filter=$_SERVER['argc'] &gt; 1 ?$_SERVER['argv'][1] :"10001";
$subscriber-&gt;setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,$filter);
 
// Process 100 updates
$total_temp= 0;
for($update_nbr= 0;$update_nbr&lt; 100;$update_nbr++) {
$string=$subscriber-&gt;recv();
sscanf ($string,"%d %d %d",$zipcode,$temperature,$relhumidity);
$total_temp+=$temperature;
}
printf ("Average temperature for zipcode '%s' was %dF\n",
$filter, (int) ($total_temp/$update_nbr));

这段代码讲的是,服务器端生成随机数zipcode、temperature、relhumidity分别代表城市代码、温度值和湿度值。然后不断的广播信息,而客户端通过设置过滤参数,接受特定城市代码的信息,收集完了以后,做一个平均值。

a) 与Hello World不同的是,Socket的类型变成SOCKET_PUB和SOCKET_SUB类型。

b) 客户端需要$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);设置一个过滤值,相当于设定一个订阅频道,否则什么信息也收不到。

c) 服务器端一直不断的广播中,如果中途有Subscriber端退出,并不影响他继续的广播,当Subscriber再连接上来的时候,收到的就是后来发送 的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的Slow joiner。稍后,会解决这个问题。

d) 但是,如果Publisher中途离开,所有的Subscriber会hold住,等待Publisher再上线的时候,会继续接受信息。

ZMQ的PipeLine模型

想象一下这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine比较适合于这种场景,他的结构图,如图3所示。

图3:ZMQ的PipeLine模型

Parallel task ventilator in PHP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?php
/*
* Task ventilator
* Binds PUSH socket to tcp://localhost:5557
* Sends batch of tasks to workers via that socket
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
 
$context=newZMQContext();
 
// Socket to send messages on
$sender=newZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender-&gt;bind("tcp://*:5557");
 
echo"Press Enter when the workers are ready: ";
$fp=fopen('php://stdin','r');
$line=fgets($fp, 512);
fclose($fp);
echo"Sending tasks to workers…", PHP_EOL;
 
// The first message is "0" and signals start of batch
$sender-&gt;send(0);
 
// Send 100 tasks
$total_msec= 0;// Total expected cost in msecs
for($task_nbr= 0;$task_nbr&lt; 100;$task_nbr++) {
// Random workload from 1 to 100msecs
$workload= mt_rand(1, 100);
$total_msec+=$workload;
$sender-&gt;send($workload);
 
}
printf ("Total expected cost: %d msec\n",$total_msec);
sleep (1);// Give 0MQ time to deliver

Parallel task worker in PHP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
/*
* Task worker
* Connects PULL socket to tcp://localhost:5557
* Collects workloads from ventilator via that socket
* Connects PUSH socket to tcp://localhost:5558
* Sends results to sink via that socket
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
 
$context=newZMQContext();
 
// Socket to receive messages on
$receiver=newZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver-&gt;connect("tcp://localhost:5557");
 
// Socket to send messages to
$sender=newZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender-&gt;connect("tcp://localhost:5558");
 
// Process tasks forever
while(true) {
$string=$receiver-&gt;recv();
 
// Simple progress indicator for the viewer
echo$string, PHP_EOL;
 
// Do the work
usleep($string* 1000);
 
// Send results to sink
$sender-&gt;send("");
}

Parallel task sink in PHP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
/*
* Task sink
* Binds PULL socket to tcp://localhost:5558
* Collects results from workers via that socket
* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt;
*/
 
// Prepare our context and socket
$context=newZMQContext();
$receiver=newZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver-&gt;bind("tcp://*:5558");
 
// Wait for start of batch
$string=$receiver-&gt;recv();
 
// Start our clock now
$tstart= microtime(true);
 
// Process 100 confirmations
$total_msec= 0;// Total calculated cost in msecs
for($task_nbr= 0;$task_nbr&lt; 100;$task_nbr++) {
$string=$receiver-&gt;recv();
if($task_nbr% 10 == 0) {
echo":";
}else{
echo".";
}
}
 
$tend= microtime(true);
 
$total_msec= ($tend-$tstart) * 1000;
echoPHP_EOL;
printf ("Total elapsed time: %d msec",$total_msec);
echoPHP_EOL;

从程序中,我们可以看到,task ventilator使用的是SOCKET_PUSH,将任务分发到Worker节点上。而Worker节点上,使用SOCKET_PULL从上游接受任 务,并使用SOCKET_PUSH将结果汇集到Slink。值得注意的是,任务的分发的时候也同样有一个负载均衡的路由功能,worker可以随时自由加 入,task ventilator可以均衡将任务分发出去。

五、其他扩展模式

通常,一个节点,即可以作为Server,同时也能作为Client,通过PipeLine模型中的Worker,他向上连接着任务分发,向下连接 着结果搜集的Sink机器。因此,我们可以借助这种特性,丰富的扩展原有的三种模式。例如,一个代理Publisher,作为一个内网的 Subscriber接受信息,同时将信息,转发到外网,其结构图如图4所示。

图4:ZMQ的扩展模式

六、多个服务器

ZMQ和Socket的区别在于,前者支持N:M的连接,而后者则只是1:1的连接,那么一个Client连接多个Server的情况是怎样的呢,我们通过图5来说明。

图5:ZMQ的N:1的连接情况

我们假设Client有R1,R2,R3,R4四个任务,我们只需要一个ZMQ的Socket,就可以连接四个服务,他能够自动均衡的分配任务。如 图5所示,R1,R4自动分配到了节点A,R2到了B,R3到了C。如果我们是N:M的情况呢?这个扩展起来,也不难,如图6所示。

图6:N:M的连接

我们通过一个中间结点(Broker)来进行负载均衡的功能。我们通过代码了解,其中的Client和我们的Hello World的Client端是一样的,而Server端的不同是,他不需要监听端口,而是需要连接Broker的端口,接受需要处理的信息。所以,我们重 点阅读Broker的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
标签:消息处理队列多线程