JMS消息生产者:
import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageSendTest {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
private String subject = "Blob Queue";
private Destination destination = null;
private ActiveMQConnection connection = null;
private ActiveMQSession session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = (ActiveMQConnection) connectionFactory.createConnection();
/*
* !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
* (default) the uploader is lost in translation ;)
* !!!!!!!!!!!!!!!!!!!!!!!!!
*/
connection.setCopyMessageOnSend(false);
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(File file) throws JMSException, Exception {
initialize();
BlobMessage msg = session.createBlobMessage(file);
connection.start();
System.out.println("Producer:->Sending message: " + file.getName());
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
JMS消息消费者:
package iprai.ace.activemq;
import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageSendTest {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
private String subject = "Blob Queue";
private Destination destination = null;
private ActiveMQConnection connection = null;
private ActiveMQSession session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = (ActiveMQConnection) connectionFactory.createConnection();
/*
* !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
* (default) the uploader is lost in translation ;)
* !!!!!!!!!!!!!!!!!!!!!!!!!
*/
connection.setCopyMessageOnSend(false);
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(File file) throws JMSException, Exception {
initialize();
BlobMessage msg = session.createBlobMessage(file);
connection.start();
System.out.println("Producer:->Sending message: " + file.getName());
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
测试代码:
import java.io.File;
public class BlobMessageTest {
/**
* topic方式,必须先启动消费者,然后是生产者,否则接收不到消息。 queue方式,最好先启动生产者,然后启动消费者,否则也容易收不到消息。
*
* @param args
*/
public static void main(String[] args) throws Exception {
BlobMessageSendTest producer = new BlobMessageSendTest();
BlobMessageReceiveTest consumer = new BlobMessageReceiveTest();
String fileName = "D:/装Win7后装XP.txt";
// String fileName = "d:/JAVA+开发视频会议系统详细设计.doc";
File file = new File(fileName);
producer.produceMessage(file);
producer.close();
// 延时500毫秒之后停止接受消息
Thread.sleep(2000);
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(2000);
consumer.close();
}
}
分享到:
相关推荐
ActiveMQ 5.2 Reference Guide v1.8 英文版
activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。
ActiveMQ新版本的指导手册,讲述内容涵盖了activeMQ的配置到实现原理,包括持久化等特性。
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
ActiveMQ接受和发送工具.rar,亲测测试可以使用,非常的好用,欢迎下载
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
activemq 发送,接受,监控样例程序
项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。
activeMQ的发送消息后接收者返回信息
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
activemq消息的发送与接受封装的工具类,只要你导入jar包
NULL 博文链接:https://sswh.iteye.com/blog/1974169
这是我精力整理的ActiveMQ发送和接收protobuf协议消息的实例。 也对ActiveMQ进行了简化封装,也配置了自动重连机制,亲测可用!
接受ActiveMQ信息,通过openfire公告发送给指定用户,xml串解析
springboot集成activemq实现消息接收demo
本文来自于csdn,本文重点分析使用JMS向activeMQ中间件发送消息的过程分析,希望对您的学习有所帮助。 activeMQ发送消息客户端发送消息分为同步发送与异步发送 同步发送,发送者发送一条消息会阻塞直到broker反馈一...
根据文章中集群搭建对应的延时发送的工具类,其中包括连接池的使用。
本代码关于activemq-cpp的核心代码参考的chenxun2009的博客园,其他部分包括:从配置文件中读取消息通道,过滤条件等信息。
activeMq的使用以及简介,还有短信功能的代码逻辑分析