Posted By

ryantxr on 07/26/14


Tagged

php rabbitmq


Versions (?)

RabbitMQ from PHP


 / Published in: PHP
 

This is a class to encapsulate rabbit mq sending and receiving.

  1. <?php
  2. use PhpAmqpLib\Connection\AMQPConnection;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4.  
  5. class RabbitQueue
  6. {
  7. public $rabbit;
  8.  
  9. public function connect($server = RABBIT_SERVER, $port = RABBIT_PORT, $user = RABBIT_USER, $pass = RABBIT_PASS, $vhost = RABBIT_VHOST) {
  10. // TODO - this can be cleaned up once Cake supports namespaces...
  11. $path = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Connection' . DS;
  12. $channel = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Channel' . DS;
  13. $wireio = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS . 'IO' . DS;
  14. $wire = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS;
  15. $helper = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS;
  16. $helperprotocol = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS . 'Protocol' . DS;
  17. $exception = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Exception' . DS;
  18. $message = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Message' . DS;
  19. include_once($exception . 'AMQPExceptionInterface.php');
  20. include_once($exception . 'AMQPRuntimeException.php');
  21. include_once($exception . 'AMQPProtocolException.php');
  22. include_once($exception . 'AMQPProtocolChannelException.php');
  23. include_once($wire . 'GenericContent.php');
  24. include_once($message . 'AMQPMessage.php');
  25.  
  26. include_once($helperprotocol . 'MethodMap091.php');
  27. include_once($helperprotocol . 'Wait091.php');
  28. include_once($helperprotocol . 'Protocol091.php');
  29. include_once($wire . 'Constants091.php');
  30. include_once($helper . 'MiscHelper.php');
  31. include_once($wire . 'AMQPWriter.php');
  32. include_once($wire . 'AMQPReader.php');
  33. include_once($wireio . 'AbstractIO.php');
  34. include_once($wireio . 'StreamIO.php');
  35. include_once($channel . 'AbstractChannel.php');
  36. include_once($channel . 'AMQPChannel.php');
  37. include_once($path . 'AbstractConnection.php');
  38. include_once($path . 'AMQPStreamConnection.php');
  39. include_once($path . 'AMQPConnection.php');
  40.  
  41. $this->server = $server; $this->port = $port; $this->user = $user; $this->pass = $pass; $this->vhost = $vhost;
  42. $this->rabbit = new AMQPConnection($server, $port, $user, $pass, $vhost);
  43. }
  44.  
  45. public function consumeSetup($queueName, $callback, $exchangeName = RABBIT_ADAM_EXCHANGE) {
  46. $immediateChannel = $this->rabbit->channel();
  47. $immediateChannel->queue_declare($queueName, false, true, false, false);
  48. $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);
  49. $immediateChannel->queue_bind($queueName, $exchangeName, $queueName);
  50. $immediateChannel->basic_consume($queueName, 'consumer122', false, false, false, false, $callback);
  51. return $immediateChannel;
  52. }
  53.  
  54. public function delete($message) {
  55. $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
  56. }
  57.  
  58. public function getJob($message) {
  59. return unserialize($message->body);
  60. }
  61.  
  62. public function publish($queueName, $job, $delay, $exchangeName = RABBIT_ADAM_EXCHANGE, $options=array()) {
  63. $immediateChannel = $this->rabbit->channel();
  64.  
  65. // class AMQPChannel ...
  66. // function queue_declare(
  67. // @param $queue = ""
  68. // @param $passive = false
  69. // @param $durable = false
  70. // @param $exclusive = false
  71. // @param $auto_delete = true
  72. // @param $nowait = false
  73. // @param $arguments = null
  74. // @param $ticket = null
  75. $immediateChannel->queue_declare($queueName, false, true, false, false);
  76.  
  77. // function exchange_declare(
  78. // @param $exchange
  79. // @param $type
  80. // @param $passive = false
  81. // @param $durable = false
  82. // @param $auto_delete = true
  83. // @param $internal = false
  84. // @param $nowait = false
  85. // @param $arguments = null
  86. // @param $ticket = null
  87. $immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);
  88.  
  89. // function queue_bind(
  90. // @param $queue
  91. // @param $exchange
  92. // @param $routing_key = ""
  93. // @param $nowait = false
  94. // @param $arguments = null
  95. // @param $ticket = null
  96. $immediateChannel->queue_bind($queueName, $exchangeName, $queueName);
  97.  
  98. // node consumes non serialize data while we consume it serialized
  99. // that is why the following logic is needed.
  100. switch($tube){
  101. case \TYPES\JOB_TUBE_TYPE::ADAM_REPORTS:
  102. case \TYPES\JOB_TUBE_TYPE::CREATE_UPDATE_REPORTS:
  103. case \TYPES\JOB_TUBE_TYPE::PREPARE_REPORT_DB:
  104. $jobPayload = $job;
  105. break;
  106. default:
  107. $jobPayload = serialize($job);
  108. }
  109.  
  110. // If we have a delay, then set up a dead letter exchange/queue
  111. if ( $delay ){
  112. ///$delayRabbit = new AMQPConnection($this->server, $this->port, $this->user, $this->pass, $this->vhost);
  113.  
  114. $delayChannel = $immediateChannel;//$delayRabbit->channel();
  115. // CakeLog::debug('Setup a delay queue');
  116. if ( isset($options['wait-queue'])){
  117. $queueNameDelayed = $options['wait-queue'];
  118. $exchangeNameDelayed = $queueNameDelayed . '_EXCHANGE';
  119. }
  120. else{
  121. $queueNameDelayed = $queueName . '_DELAYED';
  122. $exchangeNameDelayed = $exchangeName . '_DELAYED';
  123. }
  124. $arguments = array(
  125. "x-dead-letter-exchange" => array('S', $exchangeName),
  126. "x-dead-letter-routing-key" => array('S', $queueName),
  127. "x-message-ttl" => array('I', 3600000),// 1 hour
  128. // "x-expires" => array("I", 6000)
  129. );
  130. // CakeLog::debug('Creating queue '.$queueNameDelayed);
  131. $delayChannel->queue_declare($queueNameDelayed, false, true, false, false, false, $arguments);
  132. $delayChannel->exchange_declare($exchangeNameDelayed, 'direct', false, true, false);
  133. $delayChannel->queue_bind($queueNameDelayed, $exchangeNameDelayed, $queueNameDelayed);
  134. //CakeLog::debug(sprintf('Delayed queue %s delay=%d', $queueNameDelayed, $delay));
  135. }
  136.  
  137. $msg = new \PhpAmqpLib\Message\AMQPMessage($jobPayload, array('content_type' => 'text/plain', 'delivery_mode' => 2, "expiration" => $delay));
  138.  
  139. // function basic_publish
  140. // @param AMQPMessage $msg
  141. // @param string $exchange = ""
  142. // @param string $routing_key = ""
  143. // @param bool $mandatory = false
  144. // @param bool $immediate = false
  145. // @param null $ticket = null
  146. if ( $delay ){
  147. //CakeLog::debug('Publish to '.$queueNameDelayed);
  148. $delayChannel->basic_publish($msg, $exchangeNameDelayed, $queueNameDelayed);
  149. }
  150. else{
  151. //CakeLog::debug('Publish to '.$queueName);
  152. $immediateChannel->basic_publish($msg, $exchangeName, $queueName);
  153. }
  154.  
  155. //CakeLog::debug($jobPayload);
  156.  
  157. //$immediateChannel->close();
  158. //$conn->close();
  159. }
  160. }

Report this snippet  

You need to login to post a comment.