您好,可以通过任意语言SDK订阅该队列内容,以下是node.js的实现,代码为截取的,没有经过测试,您可以根据情况进行调整。
/**
* Module: rabbitmq.js
*
* Collection of functions for dealing with amqp.
*/
var amqp = require('amqp');
var log = require('./logger')...
显示全部您好,可以通过任意语言SDK订阅该队列内容,以下是node.js的实现,代码为截取的,没有经过测试,您可以根据情况进行调整。
/**
* Module: rabbitmq.js
*
* Collection of functions for dealing with amqp.
*/
var amqp = require('amqp');
var log = require('./logger');
var logger = log.setLogger();
var setLogger = function(exLogger) {
logger = log.setLogger(exLogger);
};
exports.setLogger = setLogger;
var subscribe = function(options) {
var conn = options['conn'];
var consumerQueue = options['consumer_queue'];
var exchange = options['exchange'];
var topicName = options['topic_name'];
var topicTypes = options['topic_types'];
var implOptions = {
'reconnect': true
};
var connection = amqp.createConnection(conn, implOptions);
connection.on('error', function(e) {
options.error({'type': e.name, 'message': e});
});
topicTypes.forEach(function(topicType) {
connection.on('ready', function() {
var cQueue = consumerQueue + '.' + topicType;
connection.queue(cQueue, function(queue) {
topic = topicName + '.' + topicType;
queue.bind(exchange, topic);
queue.subscribe(function(message, contentType, params) {
options.success(message, params.exchange);
});
}); // end connection.queue
}); // end connection on ready
});
} // end var subscribe
exports.subscribe = subscribe;
"exchange_names": ["nova", "glance", "cinder", "neutron"],
"topic_name": "notifications",
"topic_types": ["info", "warn", "error"]
// Get message from MQ and send update message
function queueSync() {
var exchangeNames = config.get('exchange_names');
exchangeNames.forEach(function(exchange) {
logger.debug('Trying to connect rabbitmq %s:%s',
config.get('rabbitmq:host'), config.get('rabbitmq:port'));
var options = {
'conn': config.get('rabbitmq'),
'consumer_queue': config.get('consumer_queue'),
'exchange': exchange,
'topic_name': config.get('topic_name'),
'topic_types': config.get('topic_types'),
success: function(message, exc) {
handleMessage(message, exc);
},
error: function(error) {
logger.error(error);
}
};
rabbitmq.subscribe(options);
});
}
收起