如何从openstack消息队列中获取消息?

openstack 消息队列中,用rabbitmqctl list_queues命令,看到队列notifications.info,它是不是包含openstack操作过程的一些信息,如何用python提取队列里的消息呢??? 显示全部
openstack 消息队列中,用rabbitmqctl list_queues命令,看到队列notifications.info,它是不是包含openstack操作过程的一些信息,如何用python提取队列里的消息呢??? 收起
参与4

查看其它 2 个回答博飞信息科技 的回答

博飞信息科技 博飞信息科技 CTO 博飞信息科技(上海)有限公司
您好,可以通过任意语言SDK订阅该队列内容,以下是node.js的实现,代码为截取的,没有经过测试,您可以根据情况进行调整。
/**
 *  Module: rabbitmq.js
 *
 * Collection of functions for dealing with amqp.
 */

var amqp = require('amqp');

var log = require('./logger');
var logger = log.setLogger();

var setLogger = function(exLogger) {
    logger = log.setLogger(exLogger);
};
exports.setLogger = setLogger;

var subscribe = function(options) {
    var conn = options['conn'];
    var consumerQueue = options['consumer_queue'];
    var exchange = options['exchange'];
    var topicName = options['topic_name'];
    var topicTypes = options['topic_types'];
    var implOptions = {
        'reconnect': true
    };

    var connection = amqp.createConnection(conn, implOptions);

    connection.on('error', function(e) {
        options.error({'type': e.name, 'message': e});
    });

    topicTypes.forEach(function(topicType) {
        connection.on('ready', function() {
            var cQueue = consumerQueue + '.' + topicType;
            connection.queue(cQueue, function(queue) {
                topic = topicName + '.' + topicType;
                queue.bind(exchange, topic);
                queue.subscribe(function(message, contentType, params) {
                    options.success(message, params.exchange);
                });
            }); // end connection.queue
        }); // end connection on ready
    });
} // end var subscribe
exports.subscribe = subscribe;

    "exchange_names": ["nova", "glance", "cinder", "neutron"],
    "topic_name": "notifications",
    "topic_types": ["info", "warn", "error"]

// Get message from MQ and send update message
function queueSync() {
    var exchangeNames = config.get('exchange_names');
    exchangeNames.forEach(function(exchange) {
        logger.debug('Trying to connect rabbitmq %s:%s',
                config.get('rabbitmq:host'), config.get('rabbitmq:port'));
        var options = {
            'conn': config.get('rabbitmq'),
            'consumer_queue': config.get('consumer_queue'),
            'exchange': exchange,
            'topic_name': config.get('topic_name'),
            'topic_types': config.get('topic_types'),
            success: function(message, exc) {
                handleMessage(message, exc);
            },
            error: function(error) {
                logger.error(error);
            }
        };
        rabbitmq.subscribe(options);
    });
}
软件开发 · 2015-03-09
浏览2655

回答者

博飞信息科技
CTO 博飞信息科技(上海)有限公司
擅长领域: 云计算私有云云管平台
评论133

博飞信息科技 最近回答过的问题

回答状态

  • 发布时间:2015-03-09
  • 关注会员:1 人
  • 回答浏览:2655
  • X社区推广