`
Riddick
  • 浏览: 633476 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

ActiveMQ5.2发送和接受TextMessage

    博客分类:
  • JMS
阅读更多

 JMS消息生产者:

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProducer {
	
	public final static int MAX_SEND_TIMES = 100;
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	private String subject = "TOOL.DEFAULT";
	private Destination dest = null;
	private Connection conn = null;
	private Session session = null;
	
	private MessageProducer producer = null;
	
	private void initialize() {
		ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
				user, password, url); 
		try {
			conn = connFac.createConnection();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			dest = session.createQueue(subject);
			producer = session.createProducer(dest);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			conn.start();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void produceMessage(String message) {
		initialize();
		try {
			TextMessage tm = session.createTextMessage(message);
			long startTime = System.currentTimeMillis();
			System.out.println("-----------------Producer-->Sending Message---------------");
			for(int i=0; i<MAX_SEND_TIMES; i++) {
				producer.send(tm);
				if((i+1)%1000 == 0) {
					System.out.println("This is the" + i + " message!");
				}
			}
			System.out.println("-------------------Producer--->Message Sent Complete!----------------");
			long endTime = System.currentTimeMillis();
			long executeTime = endTime - startTime;
			System.out.println("ActiveMQ send" + MAX_SEND_TIMES + " messages used " + executeTime + "ms");
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void close() throws JMSException {
		System.out.println("--------------------Producer--->Closing Connection----------------");
		if(producer != null) {
			producer.close();
		}
		if(session != null) {
			session.close();
		}
		if(conn != null) {
			conn.close();
		}
	}
}

 JMS消息消费者:

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer implements MessageListener {
	
	public static int RECEIVED_MSG_NUM = 0;
	long startReceiveTime = 0;
	long endReceiveTime = 0;
	long receiveDuringTime = 0;
	
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	private String subject = "TOOL:DEFAULT";
	private Destination dest = null;
	private Connection conn = null;
	private Session session = null;
	private MessageConsumer consumer = null;
	
	private void initialize() {
		ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
				user, password, url);
		try {
			conn = connFac.createConnection();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			dest = session.createQueue(subject);
			consumer = session.createConsumer(dest);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void consumeMessage() {
		initialize();
		try {
			conn.start();
			System.out.println("-------------------Consumer--->Starting Listening----------------");
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void close() throws JMSException {
		System.out.println("------------Consumer--->Closing Connection--------------");
		if(consumer != null) {
			consumer.close();
		}
		if(session != null) {
			session.close();
		}
		if(conn != null) {
			conn.close();
		}
	}

	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		try {
			if(message instanceof TextMessage) {
				TextMessage tm = (TextMessage) message;
				String msg = tm.getText();
				if(RECEIVED_MSG_NUM == 0) {
					startReceiveTime = System.currentTimeMillis();
				}
				RECEIVED_MSG_NUM++;
				
				if((RECEIVED_MSG_NUM+1) % 1000 == 0) {
					System.out.println("-----------------Consumer--->Received:" + RECEIVED_MSG_NUM + "--------------");
				}
				
				//Receive the last message
				 if(RECEIVED_MSG_NUM == ActiveMQProducer.MAX_SEND_TIMES - 1) {
					 endReceiveTime = System.currentTimeMillis();
					 receiveDuringTime = endReceiveTime - startReceiveTime;
					 System.out.println("---------------ActiveMQ Receive " + ActiveMQProducer.MAX_SEND_TIMES + 
							 " messages used:" + receiveDuringTime + " ms");
				 } else {
					 System.out.println(System.currentTimeMillis() + "---Consumer--->Received:" + message);
				 }
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}	
	}
}

 

测试代码:

import javax.jms.JMSException;

public class ActiveMQTest {
	
	public static void main(String[] args) throws JMSException, InterruptedException {
		ActiveMQConsumer consumer = new ActiveMQConsumer();
		ActiveMQProducer producer = new ActiveMQProducer();
		char[] tempChars = new char[1024];
		for(int i=0; i<1024; i++) {
			tempChars[i] = 'a';
		}
		
		String tempMsg = String.valueOf(tempChars);
		//开始监听
		consumer.consumeMessage();
		producer.produceMessage(tempMsg);
		producer.close();
		
		//延时5000ms后关闭连接
		Thread.sleep(5000);
		consumer.close();
	}
}

 (说明:以上三个类放在同一个包下)

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics