import java.io.IOException; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; public class MQSender implements Runnable { private static final String MQ_MANAGER = "QM2"; private static final String MQ_HOST_NAME = "10.0.16.15"; private static final String MQ_CHANNEL = "service2"; // "SYSTEM.DEF.SVRCONN";// private static final String MQ_QUEUE_NAME = "LQ1"; private static final int MQ_PROT = 1434; private static final int MQ_CCSID = 1208; public void run() { MQQueueManager mqQueueManager = null; MQQueue mqQueue = null; try { MQEnvironment.addConnectionPoolToken(); MQEnvironment.hostname = MQ_HOST_NAME; MQEnvironment.channel = MQ_CHANNEL ; MQEnvironment.port = MQ_PROT; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = MQ_CCSID; int sendOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; mqQueueManager = new MQQueueManager(MQ_MANAGER); mqQueue = mqQueueManager.accessQueue(MQ_QUEUE_NAME, sendOptions, null, null, null); MQPutMessageOptions mqPutMessageOptions = new MQPutMessageOptions(); MQMessage mqMessage = null; String putMessage = "你好啊"; mqMessage = new MQMessage(); mqMessage.format = MQC.MQFMT_STRING; // mqMessage.write(putMessage.getBytes()); mqMessage.writeUTF(putMessage); mqQueue.put(mqMessage, mqPutMessageOptions); } catch (MQException e) { e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } finally { if (mqQueue != null) { try { mqQueue.close(); } catch (MQException e) { e.printStackTrace(); } } if (mqQueueManager != null) { try { mqQueueManager.close(); } catch (MQException e) { e.printStackTrace(); } } } } public static void main(String arg[]) { MQSender client = new MQSender(); Thread mqClientThread = new Thread(client); mqClientThread.start(); } }
//将队列的里的消息读出来 while(depth-->0) { MQMessage msg = new MQMessage();// 要读的队列的消息 MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息) gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(如果在队列上没有消息则等待) gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败) gmo.waitInterval = 10000 ; // Sets the time limit for the wait.(设置等待的毫秒时间限制) queue.get(msg, gmo); byte[] rawData = new byte[msg.getMessageLength()]; //先转byte msg.readFully(rawData); //读出所有数据 String msgstr = new String(rawData,"gb2312"); System.out.println("readFull读取消息:"+msgstr); System.out.println("实际读取出来以后字符串长度为:"+msgstr.length()); System.out.println("getMessageLength读取消息长度为:"+msg.getMessageLength()); System.out.println("读取readUTF的字符串消息长度为:"+msg.readUTF().length());//这两个长度不获取出来都不太一样· System.out.println("---------------------------"); }收起
package com.sany.demo; import com.ibm.jms.JMSTextMessage; import com.ibm.mq.jms.JMSC; import com.ibm.mq.jms.MQQueueConnectionFactory; import javax.jms.*; import java.net.URI; import java.util.logging.Level; import java.util.logging.Logger; public class Receiver implements MessageListener { // private static Logger logger = Logger.getLogger(Receiver.class.getName()); private static final String MQ_MANAGER = "QM2"; private static final String MQ_HOST_NAME = "10.0.16.15"; private static final String MQ_CHANNEL = "service2"; // "SYSTEM.DEF.SVRCONN";// private static final String MQ_QUEUE_NAME = "LQ1"; private static final int MQ_PROT = 1434; private static final int MQ_CCSID = 1208; public static void main(String[] args) { try { MQQueueConnectionFactory rv = new MQQueueConnectionFactory(); rv.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); rv.setHostName(MQ_HOST_NAME); rv.setQueueManager(MQ_MANAGER); rv.setChannel(MQ_CHANNEL); rv.setPort(MQ_PROT); QueueConnection conn = rv.createQueueConnection("wbiadmin","wbiadmin");; QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(MQ_QUEUE_NAME); QueueReceiver receiver = session.createReceiver(queue); Receiver instance = new Receiver(); receiver.setMessageListener(instance); conn.start(); int count = 0; while (count < 1000) { Thread.sleep(30 * 1000); count++; } //shutdown receiver.close(); session.close(); conn.close(); } catch (Exception e) { // logger.log(Level.WARNING, "Exception: ", e); e.printStackTrace(); } } @Override public void onMessage(Message message) { if (message == null) return; if (message instanceof JMSTextMessage) { try { System.out.println(((JMSTextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } else { //you need defined handle for different message format. } } }收起