php rabbitmq的开发体验(二)
一、前言
在上一篇rabbitmq开发体验,我们大致介绍和安装上了rabbitmq和php扩展和界面操作rabbitmq的方法,下面正是正式的用我们php来操作消息队列的生产和消费。附上参考的网站:
- rabbitmq官网 https://www.rabbitmq.com/ 全英文看起来吃力 官方入门文档:https://www.rabbitmq.com/getstarted.html
- 官方入门文档 https://www.cnblogs.com/grimm/p/5728736.html 此博客主对官方php案例的解释
- RabbitMQ发布订阅实战-实现延时重试队列 代码项目:https://github.com/mylxsw/rabbitmq-pubsub-php
- rabbitmq高级特性 B站视频 https://www.bilibili.com/video/BV1S5411H7ef?from=search&seid=8055368004001009131
二、开发经历
对于rabbitmq的php类库,我开发是使用PHP amqplib,composer解决依赖管理。
添加composer.json:
{
"require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接运行包引入 composer require php-amqplib/php-amqplib
我的php开发框架是yii1.1,代码如下
1.rabbitmq的连接底层类
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * rabbitmq工具类 */ class RabbitMq { protected $connection; protected $channel; protected $exchange_name; protected $query_name; protected $route_key_name; /** * 构造器 */ public function __construct() { //读取文件会导致并发性高时连接失败故写在配置文件 $config = Yii::app()->params['rabbitmq_config']; if (!$config) throw new \AMQPConnectionException('config error!'); $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password'], $config['vhost']); if (!$this->connection) { throw \AMQPConnectionException("Cannot connect to the broker!\n"); }
//链接后的信道,生产方和消费方都需要,消息的通道 $this->channel = $this->connection->channel(); } /** * close link */ public function close() { $this->channel->close(); $this->connection->close(); } /** * RabbitMQ destructor */ public function __destruct() { $this->close(); } }
2.消息的封装类
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class SubMessage { private $message; private $routingKey; private $params; /** * SubMessage constructor. * * @param AMQPMessage $message * @param string $routingKey * @param array $params */ public function __construct(AMQPMessage $message, $routingKey, $params = []) { $this->params = $params; //额外的参数这里主要存储重试的次数 $this->message = $message; $this->routingKey = $routingKey; } /** * Get AMQP Message * * @return AMQPMessage */ public function getAMQPMessage() { return $this->message; } /** * Get original Message * * @return Message */ public function getMessage() { return $this->message->body; } /** * Get meta params * * @return array */ public function getParams() { return is_array($this->params) ? $this->params : []; } /** * Get meta param * * @param string $key * * @return mixed|null */ public function getParam(string $key) { return isset($this->params[$key]) ? $this->params[$key] : null; } /** * Get routing key * * @return string */ public function getRoutingKey() { return $this->routingKey; } }
3.消息的核心类
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * vm独立站推送 */ class VmMq extends RabbitMq { protected $exchange_name = 'master'; //主Exchange,发布消息时发布到该Exchange protected $exchange_retry_name = 'master.retry'; //重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange protected $exchange_failed_name = 'master.failed'; //失败Exchange,超过三次重试失败后,消息投递到该Exchange protected $query_name = 'query_vm'; //消费服务需要declare三个队列[queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识 protected $query_retry_name = 'query_vm@retry'; protected $query_failed_name = 'query_vm@fail'; protected $route_key_name = 'route_key_vm'; /** * 构造器 */ public function __construct() { parent::__construct(); $this->channel->basic_qos(null, 10, false); //声明topic类型交换器 //指定交换机持久第四个参数设置为true: passive如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。 $this->channel->exchange_declare($this->exchange_name, 'topic', false, true, false); $this->channel->exchange_declare($this->exchange_retry_name, 'topic', false, true, false); $this->channel->exchange_declare($this->exchange_failed_name, 'topic', false, true, false); } /** * 生产消息 */ public function product($data){ $unique_messageId = $this->create_guid(); //生成消息的唯一标识,用来幂等性 if(!is_array($data)){ $data = array('msg' => $data); } $data['unique_messageId'] = $unique_messageId; $data = json_encode($data,JSON_UNESCAPED_UNICODE); //存入到表中,保证生产的消息100%到mq队列 $newModel = DynamicAR::model('nt_vm_message_idempotent'); $newModel->message_id = $unique_messageId; $newModel->message_content = $data; $newModel->product_status = 0; $newModel->consume_status = 0; $newModel->create_time = $newModel->update_time = time(); $newModel->isNewRecord = true; if(!$newModel->save()){ file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '数据库保存失败' . $data .PHP_EOL, FILE_APPEND); } //推送成功的ack回调 $this->channel->set_ack_handler( function(AMQPMessage $msg){ $msgBody = json_decode($msg->getBody(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '获取消费ID为空!' . $msg->getBody().PHP_EOL, FILE_APPEND); return; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."'"); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); if (!$messageIdempotent) { file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '该消息数据库里不存在' . $msg->getBody().PHP_EOL, FILE_APPEND); return; }else{ $connection = Yii::app()->db; $command = $connection->createCommand(" UPDATE nt_vm_message_idempotent SET product_status=1 WHERE message_id = '$unique_messageId' "); $re = $command->execute(); if($re) { // file_put_contents(ROOT_PATH.'runtime/vm_product_log.log', $messageIdempotent->message_id . $msg->getBody() .PHP_EOL, FILE_APPEND); return; }else{ file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', '数据库保存失败' . $msg->getBody() .PHP_EOL, FILE_APPEND); return; } } } ); //推送失败的nack回调 $this->channel->set_nack_handler( function(AMQPMessage $message){ file_put_contents(ROOT_PATH.'runtime/vm_product_failed.log', "消息生产到mq nack ".$message->body.PHP_EOL, FILE_APPEND); } ); //监听交换机或者路由键是否存在 $returnListener = function ( $replyCode, $replyText, $exchange, $routingKey, $message ) { file_put_contents(ROOT_PATH.'runtime/vm.log', 'replyCode ='.$replyCode.';replyText='.$replyText.';exchange='.$exchange.';routingKey='.$routingKey.';body='.$message->body.PHP_EOL, FILE_APPEND); }; //开启发送消息的return机制 $this->channel->set_return_listener($returnListener); //开启发送消息的ack回调 $this->channel->confirm_select(); //设置消息持久化。AMQPMessage的属性delivery_mode =2 $msg = new AMQPMessage($data,array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); $msg->set('application_headers', new AMQPTable([])); $this->channel->basic_publish($msg, $this->exchange_name,$this->query_name,true); $this->channel->wait_for_pending_acks(); $this->close(); } protected function create_guid($namespace = '') { static $guid = ''; $uid = uniqid("", true); $data = $namespace; $data .= $_SERVER['REQUEST_TIME']; $data .= $_SERVER['HTTP_USER_AGENT']; $data .= $_SERVER['SERVER_ADDR']; $data .= $_SERVER['SERVER_PORT']; $data .= $_SERVER['REMOTE_ADDR']; $data .= $_SERVER['REMOTE_PORT']; $hash = strtoupper(hash('ripemd128', $uid . $guid . md5($data))); $guid = '{' . substr($hash, 0, 8) . '-' . substr($hash, 8, 4) . '-' . substr($hash, 12, 4) . '-' . substr($hash, 16, 4) . '-' . substr($hash, 20, 12) . '}'; return $guid; } /** * 消费消息 */ public function consume(\Closure $callback,\Closure $shouldExitCallback = null){ $this->declareRetryQueue(); $this->declareConsumeQueue(); $this->declareFailedQueue(); $queueName = $this->query_name; $exchangeRetryName = $this->exchange_retry_name; $exchangeFailedName = $this->exchange_failed_name; // 发起延时重试 $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) { /** @var AMQPTable $headers */ if ($msg->has('application_headers')) { $headers = $msg->get('application_headers'); } else { $headers = new AMQPTable(); } $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg)); $properties = $msg->get_properties(); $properties['application_headers'] = $headers; $newMsg = new AMQPMessage($msg->getBody(), $properties); $this->channel->basic_publish( $newMsg, $exchangeRetryName, $queueName ); //发送ack信息应答当前消息处理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; // 将消息发送到失败队列 $publishFailed = function ($msg) use ($queueName,$exchangeFailedName) { $this->channel->basic_publish( $msg, $exchangeFailedName, $queueName ); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->channel->basic_consume( $this->query_name, '', //customer_tag false, //no_local false, //no_ack false, //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 false, //nowait function(AMQPMessage $msg) use ($callback, $publishRetry, $publishFailed) { /* * 需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话, * 说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息, * 并且将消息记录进行入库。 */ $msgBody = json_decode($msg->getBody(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '获取消费ID为空!' . $msg->getBody().PHP_EOL, FILE_APPEND); return; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."' and consume_status = 0 "); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); //如果找不到,则进行消费此消息 if ($messageIdempotent) { $callback($msg, $publishRetry, $publishFailed); } else { //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '该消息已消费,无须重复消费!' . $msg->getBody().PHP_EOL, FILE_APPEND); return; } } ); while (count($this->channel->callbacks)) { if ($shouldExitCallback()) { return; } try { $this->channel->wait(); } catch (AMQPTimeoutException $e) { } catch (AMQPIOWaitException $e) { } } $this->close(); } /** * 重试失败的消息 * 注意: 该方法会堵塞执行 * @param \Closure $callback 回调函数,可以为空,返回true则重新发布,false则丢弃 */ public function retryFailed($callback = null) { $this->declareConsumeQueue(); $this->declareFailedQueue(); $queueName = $this->query_name; $exchangeName = $this->exchange_name; $this->channel->basic_consume( $this->query_failed_name, '', //customer_tag false, //no_local false, //no_ack true, //exclusive false, //nowait function ($msg) use ($queueName, $exchangeName, $callback) { if (is_null($callback) || $callback($msg)) { // 重置header中的x-death属性 $msg->set('application_headers', new AMQPTable([ 'x-orig-routing-key' => $this->getOrigRoutingKey($msg), ])); $this->channel->basic_publish( $msg, $exchangeName, $queueName ); } $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } ); while (count($this->channel->callbacks)) { try { $this->channel->wait(); } catch (AMQPTimeoutException $e) { return; } catch (AMQPIOWaitException $e) { } } } /** * 获取重发的次数 */ protected function getOrigRoutingKey($msg){ $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; } /** * 声明重试队列 */ private function declareRetryQueue() { $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array( 'x-dead-letter-exchange' => $this->exchange_name, 'x-dead-letter-routing-key' => $this->query_name, 'x-message-ttl' => 30 * 1000, ))); $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name); } /** * 声明消费队列 */ private function declareConsumeQueue() { //声明队列 $this->channel->queue_declare( $this->query_name, //队列名称 false, //passive true, //durable false, //exclusive false, //auto_delete false //nowait ); //绑定交换机和队列 $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->route_key_name); $this->channel->queue_bind($this->query_name, $this->exchange_name, $this->query_name); } /** * 声明消费失败队列 */ private function declareFailedQueue() { $this->channel->queue_declare($this->query_failed_name, false, true, false, false, false); $this->channel->queue_bind($this->query_failed_name, $this->exchange_failed_name, $this->query_name); } }
我们将会实现如下功能
- 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
- 结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现消息的延时重试功能
- 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费
具体流程见下图
- 生产者发布消息到主Exchange
- 主Exchange根据Routing Key将消息分发到对应的消息队列
- 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费
- 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
- 如果重试次数小于设定的最大重试次数(3次),则将消息重新投递到Retry Exchange的重试队列
- 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
- 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
- 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了
外部确认消息表结构
CREATE TABLE `nt_vm_message_idempotent` (
`message_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT ” COMMENT ‘消息ID’,
`message_content` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT ” COMMENT ‘消息内容’,
`product_status` tinyint(1) UNSIGNED NOT NULL DEFAULT 0 COMMENT ‘是否生产成功到mq’,
`consume_status` tinyint(1) NOT NULL COMMENT ‘是否消费成功’,
`create_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
`update_time` int(10) UNSIGNED NOT NULL DEFAULT 0,
`retry_time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT ‘重新发送次数’,
PRIMARY KEY (`message_id`) USING BTREE,
UNIQUE INDEX `unique_message_id`(`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
4.消息的消费脚本
<?php include_once(ROOT_PATH . 'protected/extensions/rabbitmq/autoload.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Created by PhpStorm. * User: tangkeji * Date: 21-4-26 * Time: 下午3:31 */ class VmMqCommand extends CConsoleCommand { private function _Output($data, $isEnd = 0) { if (is_array($data) || is_object($data)) { var_dump($data); echo "\n"; } else { echo $data . "\n"; } if ($isEnd) { Yii::app()->end(); } } /** * 消息进程 */ public function actionRun(){ $stopped = false; $autoExitCounter = 200; $mq = new VmMq(); $callback = function (AMQPMessage $msg, $publishRetry, $publishFailed) use ( &$autoExitCounter ) { $retry = $this->getRetryCount($msg); try { $routingKey = $this->getOrigRoutingKey($msg); $subMessage = new SubMessage($msg, $routingKey , [ 'retry_count' => $retry, // 重试次数 ]); $this->subscribe($subMessage); // 发送确认消息 $msg->delivery_info['channel']->basic_ack( $msg->delivery_info['delivery_tag'] ); } catch (\Exception $ex) {
if ($retry > 3) { // 超过最大重试次数,消息无法处理 $publishFailed($msg); return; } // 消息处理失败,稍后重试 $publishRetry($msg); } }; $mq->consume( $callback, function () use (&$stopped, &$autoExitCounter) { return $stopped || $autoExitCounter < 1; } ); } /** * 获取消息重试次数 * * @param AMQPMessage $msg * * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; } /** * 订阅消息处理 * * @param \Aicode\RabbitMQ\SubMessage $msg * * @return bool 处理成功返回true(返回true后将会对消息进行处理确认),失败throw 异常 */ public function subscribe($msg) { // TODO 业务逻辑实现 //throw new Exception("消费异常!!!"); //消费失败需要抛出异常来重新处理此消息 echo sprintf( "subscriber:<%s> %s\n", $msg->getRoutingKey(), $msg->getMessage() ); echo "----------------------------------------\n"; //存入到表中,标识该消息已消费 $msgBody = json_decode($msg->getMessage(),true); if(!isset($msgBody['unique_messageId']) || !$msgBody['unique_messageId']){ file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '获取消费ID为空!' . $msg->getMessage().PHP_EOL, FILE_APPEND); return true; } $unique_messageId = $msgBody['unique_messageId']; $criteria = new CDbCriteria; $criteria->addCondition("message_id = '".$unique_messageId."' and consume_status = 0 "); $messageIdempotent = DynamicAR::model('nt_vm_message_idempotent')->find($criteria); //如果找不到,则进行消费此消息 if (!$messageIdempotent) { //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; file_put_contents(ROOT_PATH . 'runtime/vm_consume_failed.log', '该消息已消费,无须重复消费!' . $msg->getMessage() . PHP_EOL, FILE_APPEND); } else { $update_time = time(); $connection = Yii::app()->db; $command = $connection->createCommand(" UPDATE nt_vm_message_idempotent SET consume_status=1,update_time='$update_time' WHERE message_id = '$unique_messageId' "); $re = $command->execute(); if($re) { // file_put_contents(ROOT_PATH.'runtime/vm_consume_log.log', $messageIdempotent->message_id . $msg->getMessage() .PHP_EOL, FILE_APPEND); return; }else{ file_put_contents(ROOT_PATH.'runtime/vm_consume_failed.log', '数据库保存失败' . $msg->getMessage() .PHP_EOL, FILE_APPEND); return; } } return true; } private function getOrigRoutingKey(AMQPMessage $msg) { $retry = null; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-orig-routing-key'])) { $retry = $headers['x-orig-routing-key']; } } return $retry?$retry:$msg->get('routing_key'); } }
三、总结
以上是我的rabbitmq从0到有的经历,可能里面有不完美或者错误请大家指出,必会好好纠正,主要我这个消息要保证消息的可靠性,不容许丢失。里面用到rabbitmq的高级特性如ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。
原文地址:https://www.cnblogs.com/xia-na/p/14781203.html