rabbit-mq
发布时间:2021-12-16 12:09:09编辑:admin
- 安装rabbit
- brew install rabbit
- 启动停止rabbit
- 启动 /usr/local/Cellar/rabbitmq/3.9.3/sbin/rabbitmqctl start_app
- 停止 /usr/local/Cellar/rabbitmq/3.9.3/sbin/rabbitmqctl stop_app
- 重置 /usr/local/Cellar/rabbitmq/3.9.3/sbin/rabbitmqctl reset
- 交换机类型
- Direct Exchange
- Direct直连交换机:
- 直连交换机是一种带路由功能的交互机,一个队列通过routing_key与一个交换机绑定,当消息被发送的时候,需要指定一个routing_key,这个消息被送达交换机的时候,就会被交换机送到指定的队列里面去。同样一个routing_key也是支持应用到多个队列中的,当一个交换机绑定多个队列时,消息就会被送到对应的队列去处理。
- 适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
- Fanout Exchange
- 扇形交换机,转发消息到所有绑定队列(速度最快)
- 扇形交换机是最基本的交换机类型,它做的事情很简单--广播信息。Fanout交换机会把接收到的消息全部转发到绑定的队列上。因为广播不需要“思考”,所以Fanout交换机是四种交换机中速度最快的。
- 适用场景:需要随时增加减少业务处理的队列,例如注册、下单等功能需要增加送积分功能,只需要增加一个绑定到交换机的队列去处理新业务,无需修改旧的业务逻辑,从而达到业务解耦,非常容易扩展。
- Topic Exchange
- Topic主题交换机:
- 直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息队列的管理就会异常的困难。
- 所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
- 主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:*表示一个单词#表示任意数量(零个或多个)单词。
- 适用场景:消息需要基于多重条件进行路由到达对应队列,例如:日志系统,不仅可以根据日志的级别而且能根据日志的来源进行订阅。
- Headers Exchanges
- header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。
- 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值
示例:
composer.json(composer加载php-amqplib库)
1 2 3 4 5 | { "require" : { "php-amqplib/php-amqplib" : ">=2.6.1" } } |
rabbit-mq.config.php(rabbitMQ配置文件)
1 2 3 4 5 6 7 8 9 10 11 | <?php return [ 'rabbitmq' => [ 'host' => '127.0.0.1' , 'port' => '5672' , 'login' => 'guest' , 'password' => 'guest' , 'vhost' => '/' ] ]; ?> |
RabbitMQ.php(封装rabbitMQ)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | <?php require_once dirname(dirname( __FILE__ )). '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Exchange\AMQPExchangeType; class RabbitMQ { private $host ; private $port ; private $user ; private $password ; protected $connection ; protected $channel ; /** * RabbitMQ constructor. */ public function __construct() { $config = require dirname(dirname( __FILE__ )). "/model/rabbit-mq.config.php" ; $this ->host = $config [ 'rabbitmq' ][ 'host' ]; $this ->port = $config [ 'rabbitmq' ][ 'port' ]; $this ->user = $config [ 'rabbitmq' ][ 'login' ]; $this ->password = $config [ 'rabbitmq' ][ 'password' ]; $this ->connection = new AMQPStreamConnection( $this ->host, $this ->port, $this ->user, $this ->password); $this ->channel = $this ->connection->channel(); } /** * 创建交换机 */ public function createExchange( $exchangeName , $channelType ){ $this ->channel->exchange_declare( $exchangeName , $channelType , false, false, false); } /** * 绑定交换机 */ public function bindQueue( $queueName , $exchangeName , $severity ){ $this ->channel->queue_bind( $queueName , $exchangeName , $severity ); } /*创建队列 * queue 队列名称 * durable 是否持久化。true持久化,队列会保存磁盘。服务器重启时可以保证不丢失相关信息 已创建的队列,不在支持修改该参数 * exclusive 设置是否排他。true排他的。如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。 排它是基于连接可见的,同一个连接不同信道是可以访问同一连接创建的排它队列, “首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列,即使这个队列是持久化的, 一旦连接关闭或者客户端退出,该排它队列会被自动删除,这种队列适用于一个客户端同时发送与接口消息的场景。 * autoDelete 设置是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时, 才会自动删除生产者创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列 * */ public function createQueue( $queueName , $passive = false, $durable = false, $exclusive = false, $autoDelete = false, $nowait = false, $args = []){ $this ->channel->queue_declare( $queueName , $passive , $durable , $exclusive , $autoDelete , $nowait , $args ); } /** * 生成信息 * @param $message */ public function sendMessage( $message , $routeKey = '' , $exchange = '' , $properties = []) { $data = new AMQPMessage( $message , $properties ); $this ->channel->basic_publish( $data , $exchange , $routeKey ); } /** * 创建延时队列 * @param $ttl * @param $delayExName * @param $delayQueueName * @param $queueName */ public function createDelayQueue( $ttl , $delayExName , $delayQueueName , $queueName ) { $args = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName , 'x-message-ttl' => $ttl , //消息存活时间 'x-dead-letter-routing-key' => $queueName ]); $this ->channel->queue_declare( $queueName , false, true, false, false, false, $args ); //绑定死信queue $this ->channel->exchange_declare( $delayExName , AMQPExchangeType::DIRECT, false, true, false); $this ->channel->queue_declare( $delayQueueName , false, true, false, false); $this ->channel->queue_bind( $delayQueueName , $delayExName , $queueName , false); } /** * 消费消息 * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage( $queueName , $callback ) { /* * 创建队列 * queue 队列名 * consumer_tag 消费者标签 * no_local 这个功能属于AMQP的标准,但是rabbitMQ并没有做实现. * no_ack 收到消息后,是否不需要回复确认即被认为被消费 * exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 * nowait 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错 * callback 回调函数 * */ $this ->channel->basic_consume( $queueName , '' , false, false, false, false, $callback ); while ( $this ->channel->is_consuming()) { $this ->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this ->channel->close(); $this ->connection->close(); } } |
producer.php(生产者)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | <?php require_once './vendor/autoload.php' ; include ( './model/RabbitMQ.php' ); $data = implode( ' ' , array_slice ( $argv , 1)); $num = implode( ' ' , array_slice ( $argv , 2)); $severity = implode( ' ' , array_slice ( $argv , 3)); $exchangeName = 'direct_logs1' ; $channelType = 'direct' ; $queueName = 'direct_queue1' ; // $ttl = 1000 * 10;//10s后超时 // $delayExName = 'delay-order-exchange';//超时exchange // $delayQueueName = 'delay-order-queue';//超时queue // $queueName = 'ttl-order-queue';//订单queue $rabbitObj = new RabbitMQ(); $rabbitObj ->createExchange( $exchangeName , $channelType ); $rabbitObj ->createQueue( $queueName ,false,true); //$rabbitObj->createDelayQueue($ttl,$delayExName,$delayQueueName,$queueName); for ( $i = 1; $i <= $num ; $i ++){ $properties = [ 'delivery_mode' => 2]; $rabbitObj ->sendMessage( $i . '-' . $data , $severity , $exchangeName , $properties ); //$rabbitObj->sendMessage($i.'-'.$data,$queueName); echo " [x] Sent " . $i . '-' . $data . '-' . $severity . "\n" ; } |
consume.php(消费者)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | <?php require_once './vendor/autoload.php' ; include ( './model/RabbitMQ.php' ); $severity = implode( ' ' , array_slice ( $argv , 1)); $exchangeName = 'direct_logs1' ; $channelType = 'direct' ; $queueName = 'direct_queue1' ; $callBack = function ( $msg ) { echo " [x] Received " , $msg ->body, "\n" ; //根据"."数量个数获取延迟时间,单位秒 //sleep(2); //模拟业务执行时间延迟 echo " [x] Done" , "\n" ; $msg ->delivery_info[ 'channel' ]->basic_ack( $msg ->delivery_info[ 'delivery_tag' ]); }; $rabbitObj = new RabbitMQ(); $rabbitObj ->createExchange( $exchangeName , $channelType ); $rabbitObj ->createQueue( $queueName ,false,true); $rabbitObj ->bindQueue( $queueName , $exchangeName , $severity ); $rabbitObj ->consumeMessage( $queueName , $callBack ); |