好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

zeromq_传说中最快的消息队列

zeromq_传说中最快的消息队列

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zeromq 中文简介:

http://blog.csdn.net/program_think/article/details/6687076

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

?MQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

?MQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ?MQ is from iMatix and is LGPL open source.

Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

?

<?php

     /*

     *  Hello World server

     *  Binds REP socket to tcp:// *:5555

     *  Expects "Hello" from client, replies with "World"

     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

     */

      

     $context = new ZMQContext(1);

      

     //  Socket to talk to clients

     $responder = new ZMQSocket( $context , ZMQ::SOCKET_REP);

     $responder ->bind( " tcp:// *:5555" );

      

     while (true) {

         //  Wait for next request from client

         $request = $responder ->recv();

         printf ( "Received request: [%s]\n" , $request );

      

         //  Do some 'work'

         sleep (1);

      

         //  Send reply back to client

         $responder ->send( "World" );   

 

}

3 客户端:

(客户端发送消息)

?

<?php

     /*

     *  Hello World client

     *  Connects REQ socket to tcp://localhost:5555

     *  Sends "Hello" to server, expects "World" back

     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

     */

      

     $context = new ZMQContext();

      

     //  Socket to talk to server

     echo "Connecting to hello world server…\n" ;

     $requester = new ZMQSocket( $context , ZMQ::SOCKET_REQ);

     $requester ->connect( " tcp://localhost:5555 " );

      

     for ( $request_nbr = 0; $request_nbr != 10; $request_nbr ++) {

         printf ( "Sending request %d…\n" , $request_nbr );

         $requester ->send( "Hello" );

          

         $reply = $requester ->recv();

         printf ( "Received reply %d: [%s]\n" , $request_nbr , $reply );

 

}

?

 

天气气候订阅系统:(pub-sub)

1 server端:

?

<?php

     /*

     *  Weather update server

     *  Binds PUB socket to tcp:// *:5556

     *  Publishes random weather updates

     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

     */

      

     //  Prepare our context and publisher

     $context = new ZMQContext();

     $publisher = $context ->getSocket(ZMQ::SOCKET_PUB);

     $publisher ->bind( " tcp:// *:5556" );

     $publisher ->bind( " ipc://weather.ipc " );

      

     while (true) {

         //  Get values that will fool the boss

         $zipcode      = mt_rand(0, 100000);

         $temperature = mt_rand(-80, 135);

         $relhumidity = mt_rand(10, 60);

      

         //  Send message to all subscribers

         $update = sprintf ( "%05d %d %d" , $zipcode , $temperature , $relhumidity );

         $publisher ->send( $update );

     }

2 client端:

?

<?php

     /*

     *  Weather update client

     *  Connects SUB socket to tcp://localhost:5556

     *  Collects weather updates and finds avg temp in zipcode

     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

     */

      

     $context = new ZMQContext();

      

     //  Socket to talk to server

     echo "Collecting updates from weather server…" , PHP_EOL;

     $subscriber = new ZMQSocket( $context , ZMQ::SOCKET_SUB);

     $subscriber ->connect( " tcp://localhost:5556 " );

      

     //  Subscribe to zipcode, default is NYC, 10001

     $filter = $_SERVER [ 'argc' ] > 1 ? $_SERVER [ 'argv' ][1] : "10001" ;

     $subscriber ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter );

      

     //  Process 100 updates

     $total_temp = 0;

     for ( $update_nbr = 0; $update_nbr < 100; $update_nbr ++) {

         $string = $subscriber ->recv();

         sscanf ( $string , "%d %d %d" , $zipcode , $temperature , $relhumidity );

         $total_temp += $temperature ;

     }

     printf ( "Average temperature for zipcode '%s' was %dF\n" ,

         $filter , (int) ( $total_temp / $update_nbr ));

?

------------------------

?

pub-sub的proxy模式:

?

图示是:

Proxy节点的代码:

?

<?php

     /*

     *  Weather proxy device

     * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

     */

      

     $context = new ZMQContext();

      

     //  This is where the weather server sits

     $frontend = new ZMQSocket( $context , ZMQ::SOCKET_SUB);

     $frontend ->connect( " tcp://192.168.55.210:5556 " );

      

     //  This is our public endpoint for subscribers

     $backend = new ZMQSocket( $context , ZMQ::SOCKET_PUB);

     $backend ->bind( " tcp://10.1.1.0:8100 " );

      

     //  Subscribe on everything

     $frontend ->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "" );

      

     //  Shunt messages out to our own subscribers

     while (true) {

         while (true) {

             //  Process all parts of the message

             $message = $frontend ->recv();

             $more = $frontend ->getSockOpt(ZMQ::SOCKOPT_RCVMORE);

             $backend ->send( $message , $more ? ZMQ::SOCKOPT_SNDMORE : 0);

             if (! $more ) {

                 break ; // Last message part

             }

         }

 

}

其实就是proxy同时是作为pub又作为sub的

----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于zeromq_传说中最快的消息队列的详细内容...

  阅读:77次