博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zeromq_传说中最快的消息队列
阅读量:5972 次
发布时间:2019-06-19

本文共 4675 字,大约阅读时间需要 15 分钟。

Zeromq的资源:

Zeromq模式:

zeromq主页:

Zeromq Guild:

Zeromq 中文简介:

Zero wiki:

zeromq系列:

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

*/ $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder->bind("tcp://*:5555"); while(true) { // Wait for next request from client $request = $responder->recv(); printf ("Received request: [%s]\n", $request); // Do some 'work' sleep (1); // Send reply back to client $responder->send("World"); }

3 客户端:

(客户端发送消息)

*/ $context = new ZMQContext(); // Socket to talk to server echo "Connecting to hello world server…\n"; $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ); $requester->connect("tcp://localhost:5555"); for($request_nbr = 0; $request_nbr != 10; $request_nbr++) { printf ("Sending request %d…\n", $request_nbr); $requester->send("Hello"); $reply = $requester->recv(); printf ("Received reply %d: [%s]\n", $request_nbr, $reply);}
 

天气气候订阅系统:(pub-sub)

1 server端:

*/ // Prepare our context and publisher $context = new ZMQContext(); $publisher = $context->getSocket(ZMQ::SOCKET_PUB); $publisher->bind("tcp://*:5556"); $publisher->bind("ipc://weather.ipc"); while (true) { // Get values that will fool the boss $zipcode = mt_rand(0, 100000); $temperature = mt_rand(-80, 135); $relhumidity = mt_rand(10, 60); // Send message to all subscribers $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity); $publisher->send($update); }

2 client端:

*/ $context = new ZMQContext(); // Socket to talk to server echo "Collecting updates from weather server…", PHP_EOL; $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB); $subscriber->connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001"; $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); // Process 100 updates $total_temp = 0; for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) { $string = $subscriber->recv(); sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity); $total_temp += $temperature; } printf ("Average temperature for zipcode '%s' was %dF\n", $filter, (int) ($total_temp / $update_nbr));
------------------------
pub-sub的proxy模式:
图示是:

Proxy节点的代码:

*/ $context = new ZMQContext(); // This is where the weather server sits $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB); $frontend->connect("tcp://192.168.55.210:5556"); // This is our public endpoint for subscribers $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB); $backend->bind("tcp://10.1.1.0:8100"); // Subscribe on everything $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, ""); // Shunt messages out to our own subscribers while(true) { while(true) { // Process all parts of the message $message = $frontend->recv(); $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE); $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0); if(!$more) { break; // Last message part } }}
其实就是proxy同时是作为pub又作为sub的

----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

你可能感兴趣的文章
我的友情链接
查看>>
java中的匿名内部类总结
查看>>
多线程(一、线程安全案例)
查看>>
mysql之DDL操作--数据库
查看>>
java json格式的转换和读取
查看>>
find的命令的使用和文件名的后缀
查看>>
恢复WORD2010的默认模板2011-05-03
查看>>
Test2 unit2
查看>>
shell脚本逻辑判断,文件目录属性判断,if,case用法
查看>>
我的友情链接
查看>>
一个用了统计CPU 内存 硬盘 使用率的shell脚本
查看>>
如何恢复默认域策略和默认域控制器策略
查看>>
Nginx配置文件nginx.conf (Apache)
查看>>
jquery和JavaScript区别
查看>>
pxe方式安装gentoo
查看>>
Project Management Library项目管理甘特图控件
查看>>
MySQL存储过程详解
查看>>
解决查看框架源码时 class file editor source not found
查看>>
JDBC接口
查看>>
脏读,不可重复读,幻读
查看>>