JMS消息生产者:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProducer {
public final static int MAX_SEND_TIMES = 100;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
private void initialize() {
ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
user, password, url);
try {
conn = connFac.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = session.createQueue(subject);
producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void produceMessage(String message) {
initialize();
try {
TextMessage tm = session.createTextMessage(message);
long startTime = System.currentTimeMillis();
System.out.println("-----------------Producer-->Sending Message---------------");
for(int i=0; i<MAX_SEND_TIMES; i++) {
producer.send(tm);
if((i+1)%1000 == 0) {
System.out.println("This is the" + i + " message!");
}
}
System.out.println("-------------------Producer--->Message Sent Complete!----------------");
long endTime = System.currentTimeMillis();
long executeTime = endTime - startTime;
System.out.println("ActiveMQ send" + MAX_SEND_TIMES + " messages used " + executeTime + "ms");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() throws JMSException {
System.out.println("--------------------Producer--->Closing Connection----------------");
if(producer != null) {
producer.close();
}
if(session != null) {
session.close();
}
if(conn != null) {
conn.close();
}
}
}
JMS消息消费者:
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumer implements MessageListener {
public static int RECEIVED_MSG_NUM = 0;
long startReceiveTime = 0;
long endReceiveTime = 0;
long receiveDuringTime = 0;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL:DEFAULT";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private void initialize() {
ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
user, password, url);
try {
conn = connFac.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = session.createQueue(subject);
consumer = session.createConsumer(dest);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void consumeMessage() {
initialize();
try {
conn.start();
System.out.println("-------------------Consumer--->Starting Listening----------------");
consumer.setMessageListener(this);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() throws JMSException {
System.out.println("------------Consumer--->Closing Connection--------------");
if(consumer != null) {
consumer.close();
}
if(session != null) {
session.close();
}
if(conn != null) {
conn.close();
}
}
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
if(message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
String msg = tm.getText();
if(RECEIVED_MSG_NUM == 0) {
startReceiveTime = System.currentTimeMillis();
}
RECEIVED_MSG_NUM++;
if((RECEIVED_MSG_NUM+1) % 1000 == 0) {
System.out.println("-----------------Consumer--->Received:" + RECEIVED_MSG_NUM + "--------------");
}
//Receive the last message
if(RECEIVED_MSG_NUM == ActiveMQProducer.MAX_SEND_TIMES - 1) {
endReceiveTime = System.currentTimeMillis();
receiveDuringTime = endReceiveTime - startReceiveTime;
System.out.println("---------------ActiveMQ Receive " + ActiveMQProducer.MAX_SEND_TIMES +
" messages used:" + receiveDuringTime + " ms");
} else {
System.out.println(System.currentTimeMillis() + "---Consumer--->Received:" + message);
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
测试代码:
import javax.jms.JMSException;
public class ActiveMQTest {
public static void main(String[] args) throws JMSException, InterruptedException {
ActiveMQConsumer consumer = new ActiveMQConsumer();
ActiveMQProducer producer = new ActiveMQProducer();
char[] tempChars = new char[1024];
for(int i=0; i<1024; i++) {
tempChars[i] = 'a';
}
String tempMsg = String.valueOf(tempChars);
//开始监听
consumer.consumeMessage();
producer.produceMessage(tempMsg);
producer.close();
//延时5000ms后关闭连接
Thread.sleep(5000);
consumer.close();
}
}
(说明:以上三个类放在同一个包下)
分享到:
相关推荐
ActiveMQ 5.2 Reference Guide v1.8 英文版
activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。
ActiveMQ新版本的指导手册,讲述内容涵盖了activeMQ的配置到实现原理,包括持久化等特性。
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
ActiveMQ接受和发送工具.rar,亲测测试可以使用,非常的好用,欢迎下载
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
activemq 发送,接受,监控样例程序
项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。
activeMQ的发送消息后接收者返回信息
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
activemq消息的发送与接受封装的工具类,只要你导入jar包
NULL 博文链接:https://sswh.iteye.com/blog/1974169
这是我精力整理的ActiveMQ发送和接收protobuf协议消息的实例。 也对ActiveMQ进行了简化封装,也配置了自动重连机制,亲测可用!
接受ActiveMQ信息,通过openfire公告发送给指定用户,xml串解析
springboot集成activemq实现消息接收demo
本文来自于csdn,本文重点分析使用JMS向activeMQ中间件发送消息的过程分析,希望对您的学习有所帮助。 activeMQ发送消息客户端发送消息分为同步发送与异步发送 同步发送,发送者发送一条消息会阻塞直到broker反馈一...
根据文章中集群搭建对应的延时发送的工具类,其中包括连接池的使用。
本代码关于activemq-cpp的核心代码参考的chenxun2009的博客园,其他部分包括:从配置文件中读取消息通道,过滤条件等信息。
activeMq的使用以及简介,还有短信功能的代码逻辑分析