Revision: 66967
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at July 26, 2014 05:31 by ryantxr
Initial Code
<?php
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitQueue
{
public $rabbit;
public function connect($server = RABBIT_SERVER, $port = RABBIT_PORT, $user = RABBIT_USER, $pass = RABBIT_PASS, $vhost = RABBIT_VHOST) {
// TODO - this can be cleaned up once Cake supports namespaces...
$path = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Connection' . DS;
$channel = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Channel' . DS;
$wireio = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS . 'IO' . DS;
$wire = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Wire' . DS;
$helper = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS;
$helperprotocol = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Helper' . DS . 'Protocol' . DS;
$exception = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Exception' . DS;
$message = APP . 'Vendor' . DS . 'php-amqplib-master' . DS . 'PhpAmqpLib' . DS . 'Message' . DS;
include_once($exception . 'AMQPExceptionInterface.php');
include_once($exception . 'AMQPRuntimeException.php');
include_once($exception . 'AMQPProtocolException.php');
include_once($exception . 'AMQPProtocolChannelException.php');
include_once($wire . 'GenericContent.php');
include_once($message . 'AMQPMessage.php');
include_once($helperprotocol . 'MethodMap091.php');
include_once($helperprotocol . 'Wait091.php');
include_once($helperprotocol . 'Protocol091.php');
include_once($wire . 'Constants091.php');
include_once($helper . 'MiscHelper.php');
include_once($wire . 'AMQPWriter.php');
include_once($wire . 'AMQPReader.php');
include_once($wireio . 'AbstractIO.php');
include_once($wireio . 'StreamIO.php');
include_once($channel . 'AbstractChannel.php');
include_once($channel . 'AMQPChannel.php');
include_once($path . 'AbstractConnection.php');
include_once($path . 'AMQPStreamConnection.php');
include_once($path . 'AMQPConnection.php');
$this->server = $server; $this->port = $port; $this->user = $user; $this->pass = $pass; $this->vhost = $vhost;
$this->rabbit = new AMQPConnection($server, $port, $user, $pass, $vhost);
}
public function consumeSetup($queueName, $callback, $exchangeName = RABBIT_ADAM_EXCHANGE) {
$immediateChannel = $this->rabbit->channel();
$immediateChannel->queue_declare($queueName, false, true, false, false);
$immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);
$immediateChannel->queue_bind($queueName, $exchangeName, $queueName);
$immediateChannel->basic_consume($queueName, 'consumer122', false, false, false, false, $callback);
return $immediateChannel;
}
public function delete($message) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
public function getJob($message) {
return unserialize($message->body);
}
public function publish($queueName, $job, $delay, $exchangeName = RABBIT_ADAM_EXCHANGE, $options=array()) {
$immediateChannel = $this->rabbit->channel();
// class AMQPChannel ...
// function queue_declare(
// @param $queue = ""
// @param $passive = false
// @param $durable = false
// @param $exclusive = false
// @param $auto_delete = true
// @param $nowait = false
// @param $arguments = null
// @param $ticket = null
$immediateChannel->queue_declare($queueName, false, true, false, false);
// function exchange_declare(
// @param $exchange
// @param $type
// @param $passive = false
// @param $durable = false
// @param $auto_delete = true
// @param $internal = false
// @param $nowait = false
// @param $arguments = null
// @param $ticket = null
$immediateChannel->exchange_declare($exchangeName, 'direct', false, true, false);
// function queue_bind(
// @param $queue
// @param $exchange
// @param $routing_key = ""
// @param $nowait = false
// @param $arguments = null
// @param $ticket = null
$immediateChannel->queue_bind($queueName, $exchangeName, $queueName);
// node consumes non serialize data while we consume it serialized
// that is why the following logic is needed.
switch($tube){
case \TYPES\JOB_TUBE_TYPE::ADAM_REPORTS:
case \TYPES\JOB_TUBE_TYPE::CREATE_UPDATE_REPORTS:
case \TYPES\JOB_TUBE_TYPE::PREPARE_REPORT_DB:
$jobPayload = $job;
break;
default:
$jobPayload = serialize($job);
}
// If we have a delay, then set up a dead letter exchange/queue
if ( $delay ){
///$delayRabbit = new AMQPConnection($this->server, $this->port, $this->user, $this->pass, $this->vhost);
$delayChannel = $immediateChannel;//$delayRabbit->channel();
// CakeLog::debug('Setup a delay queue');
if ( isset($options['wait-queue'])){
$queueNameDelayed = $options['wait-queue'];
$exchangeNameDelayed = $queueNameDelayed . '_EXCHANGE';
}
else{
$queueNameDelayed = $queueName . '_DELAYED';
$exchangeNameDelayed = $exchangeName . '_DELAYED';
}
$arguments = array(
"x-dead-letter-exchange" => array('S', $exchangeName),
"x-dead-letter-routing-key" => array('S', $queueName),
"x-message-ttl" => array('I', 3600000),// 1 hour
// "x-expires" => array("I", 6000)
);
// CakeLog::debug('Creating queue '.$queueNameDelayed);
$delayChannel->queue_declare($queueNameDelayed, false, true, false, false, false, $arguments);
$delayChannel->exchange_declare($exchangeNameDelayed, 'direct', false, true, false);
$delayChannel->queue_bind($queueNameDelayed, $exchangeNameDelayed, $queueNameDelayed);
//CakeLog::debug(sprintf('Delayed queue %s delay=%d', $queueNameDelayed, $delay));
}
$msg = new \PhpAmqpLib\Message\AMQPMessage($jobPayload, array('content_type' => 'text/plain', 'delivery_mode' => 2, "expiration" => $delay));
// function basic_publish
// @param AMQPMessage $msg
// @param string $exchange = ""
// @param string $routing_key = ""
// @param bool $mandatory = false
// @param bool $immediate = false
// @param null $ticket = null
if ( $delay ){
//CakeLog::debug('Publish to '.$queueNameDelayed);
$delayChannel->basic_publish($msg, $exchangeNameDelayed, $queueNameDelayed);
}
else{
//CakeLog::debug('Publish to '.$queueName);
$immediateChannel->basic_publish($msg, $exchangeName, $queueName);
}
//CakeLog::debug($jobPayload);
//$immediateChannel->close();
//$conn->close();
}
}
Initial URL
Initial Description
This is a class to encapsulate rabbit mq sending and receiving.
Initial Title
RabbitMQ from PHP
Initial Tags
php
Initial Language
PHP