博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JavaEE(5) - JMS实现企业Pub-Sub消息处理
阅读量:6353 次
发布时间:2019-06-22

本文共 9607 字,大约阅读时间需要 32 分钟。

1. 在Weblogic服务器上配置Pub-Sub消息目的

向已有的JMS模块中添加消息主题:

Services-->Messaging-->JMS Modules--><Module Name>-->Configuration-->New-->Topic(Name: MessageTopic)

2. 可靠的JMS订阅(NetBeans创建java project: DurablePubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;public class MessageSender {    public void sendMessage() throws NamingException, JMSException {        //定义WebLogic默认连接工厂的JNDI        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        //获取JNDI服务所需的Context        Context ctx = getInitialContext();        //通过JNDI查找获取连接工厂        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);        //通过JNDI查找获取消息目的        Destination dest = (Destination) ctx.lookup("MessageTopic");        //连接工厂创建连接        Connection conn = connFactory.createConnection();        //JMS连接创建JMS会话        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        //JMS会话创建消息生产者        MessageProducer sender = session.createProducer(dest);        //设置消息生产者生产出来的消息的传递模式、有效时间。        sender.setDeliveryMode(DeliveryMode.PERSISTENT);        sender.setTimeToLive(20000);        //通过JMS会话创建一个文本消息        TextMessage msg = session.createTextMessage();        //msg.setStringProperty("ConType","txt");        //设置消息内容        msg.setText("Hello");        //发送消息        sender.send(msg);        msg.setText("Welcome to JMS");        //再次发送消息        sender.send(msg);        //关闭资源        session.close();        conn.close();    }    //工具方法,用来获取命名服务的Context对象    private Context getInitialContext() {        // 参看(4)    }    public static void main(String[] args) throws Exception {        MessageSender sender = new MessageSender();        sender.sendMessage();    }}

#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;public class SyncConsumer {    public void receiveMessage() throws JMSException, NamingException {        //定义WebLogic默认连接工厂的JNDI        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        //获取JNDI服务所需的Context        Context ctx = getInitialContext();        //通过JNDI查找获取连接工厂        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);        //通过JNDI查找获取消息目的            Topic dest = (Topic) ctx.lookup("MessageTopic");        //连接工厂创建连接        Connection conn = connFactory.createConnection();        //将客户端ID设为crazyit.org        conn.setClientID("crazyit.org");        //启动JMS连接,让它开始传输JMS消息        conn.start();        //JMS连接创建JMS会话        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建可靠的消息订阅者        MessageConsumer receiver = session.createDurableSubscriber(dest, "crazyit.org");        //同步接收消息,如果没有接收到消息,该方法会阻塞线程        TextMessage msg = (TextMessage) receiver.receiveNoWait();        System.out.println(msg);        if (msg != null) {            System.out.println("同步接收到的消息:" + msg.getText());        }        //关闭资源        session.close();        conn.close();    }    //工具方法,用来获取命名服务的Context对象    private Context getInitialContext() {        // 参看(4)    }    public static void main(String[] args) throws Exception {        SyncConsumer sender = new SyncConsumer();        sender.receiveMessage();    }}

#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;//JMS异步消费者就是一个监听器,故实现MessageListener接口public class AsyncConsumer implements MessageListener {    public AsyncConsumer() throws NamingException, JMSException, InterruptedException {        //定义WebLogic默认连接工厂的JNDI        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        //获取JNDI服务所需的Context        Context ctx = getInitialContext();        //通过JNDI查找获取连接工厂        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);        //通过JNDI查找获取消息目的        Topic dest = (Topic) ctx.lookup("MessageTopic");        //连接工厂创建连接        Connection conn = connFactory.createConnection();        //将客户端ID设为crazyit.org        conn.setClientID("leegang.org");        //启动JMS连接,让它开始传输JMS消息        conn.start();        //JMS连接创建JMS会话            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建可靠的消息订阅者        MessageConsumer receiver = session.createDurableSubscriber(dest, "leegang.org");        //为JMS消息消费者绑定消息监听器        receiver.setMessageListener(this);        //程序暂停20s,在此期间内以异步方式接收消息        Thread.sleep(20000);        //关闭资源        session.close();        conn.close();    }    //实现消息监听器必须实现的方法。    public void onMessage(Message m) {        TextMessage msg = (TextMessage) m;        System.out.println(msg);        try {            System.out.println("异步接收的消息:" + msg.getText());        }         catch (JMSException ex) {            ex.printStackTrace();        }    }    //工具方法,用来获取命名服务的Context对象    private Context getInitialContext() {        // 参看(4)    }    public static void main(String[] args) throws Exception {        AsyncConsumer consumer = new AsyncConsumer();    }}

 

3. 不可靠的JMS订阅(NetBeans创建java project: JmsPubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;public class MessageSender {    public void sendMessage() throws NamingException, JMSException {        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        Context ctx = getInitialContext();                ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);        Destination dest = (Destination)ctx.lookup("MessageTopic");                Connection conn = connFactory.createConnection();        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);                MessageProducer sender = session.createProducer(dest);        sender.setDeliveryMode(DeliveryMode.PERSISTENT);        sender.setTimeToLive(20000);                TextMessage msg = session.createTextMessage();                msg.setText("Hello");        sender.send(msg);                msg.setText("Welcome to JMS");        sender.send(msg);        session.close();        conn.close();    }    private Context getInitialContext() {        // 参看(4)    }        public static void main(String[] args) throws Exception {        MessageSender sender = new MessageSender();        sender.sendMessage();    }}

#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;public class SyncConsumer {    public void receiveMessage() throws JMSException, NamingException {        //定义WebLogic默认连接工厂的JNDI        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        //获取JNDI服务所需的Context        Context ctx = getInitialContext();        //通过JNDI查找获取连接工厂        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);        //通过JNDI查找获取消息目的            Destination dest = (Destination) ctx.lookup("MessageTopic");        //连接工厂创建连接        Connection conn = connFactory.createConnection();        //启动JMS连接,让它开始传输JMS消息        conn.start();        //JMS连接创建JMS会话        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        //JMS会话创建消息消费者        MessageConsumer receiver = session.createConsumer(dest);        //同步接收消息,如果没有接收到消息,该方法会阻塞线程        TextMessage msg = (TextMessage) receiver.receive();        System.out.println(msg);        System.out.println("同步接收到的消息:" + msg.getText());        //关闭资源        session.close();        conn.close();    }    //工具方法,用来获取命名服务的Context对象    private Context getInitialContext() {        // 参看(4)    }    public static void main(String[] args) throws Exception {        SyncConsumer consumer = new SyncConsumer();        consumer.receiveMessage();    }}

#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;import javax.jms.*;import javax.naming.*;import java.util.Properties;//JMS异步消费者就是一个监听器,故实现MessageListener接口public class AsyncConsumer implements MessageListener {    public AsyncConsumer() throws NamingException, JMSException, InterruptedException {        //定义WebLogic默认连接工厂的JNDI        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";        //获取JNDI服务所需的Context        Context ctx = getInitialContext();        //通过JNDI查找获取连接工厂        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);        //通过JNDI查找获取消息目的        Destination dest = (Destination) ctx.lookup("MessageTopic");        //连接工厂创建连接        Connection conn = connFactory.createConnection();        //启动JMS连接,让它开始传输JMS消息        conn.start();        //JMS连接创建JMS会话            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        //JMS会话创建消息消费者        MessageConsumer receiver = session.createConsumer(dest);        //为JMS消息消费者绑定消息监听器        receiver.setMessageListener(this);        //程序暂停20s,在此期间内以异步方式接收消息        Thread.sleep(20000);        //关闭资源        session.close();        conn.close();    }    //实现消息监听器必须实现的方法。    public void onMessage(Message m) {        TextMessage msg = (TextMessage) m;        System.out.println(msg);        try {            System.out.println("异步接收的消息:" + msg.getText());        } catch (JMSException ex) {            ex.printStackTrace();        }    }    //工具方法,用来获取命名服务的Context对象    private Context getInitialContext() {        // 参看(4)    }    public static void main(String[] args) throws Exception {        AsyncConsumer consumer = new AsyncConsumer();    }}

 

转载于:https://www.cnblogs.com/thlzhf/p/4249143.html

你可能感兴趣的文章
[原][osgEarth]添加自由飞行漫游器
查看>>
代码审查 Code Review
查看>>
fastjson如何指定字段不序列化
查看>>
[日常] Go语言圣经--示例: 并发的Echo服务
查看>>
BZOJ1969: [Ahoi2005]LANE 航线规划(LCT)
查看>>
linux内存管理之malloc、vmalloc、kmalloc的区别
查看>>
GreenDao 数据库升级 连接多个DB文件 或者指定不同的model&dao目录
查看>>
M1卡破解(自从学校升级系统之后,还准备在研究下)【转】
查看>>
vue 访问子组件示例 或者子元素
查看>>
linux内核--自旋锁的理解
查看>>
银行卡的三个磁道
查看>>
OpenSSL 提取 pfx 数字证书公钥与私钥
查看>>
Keepalived详解(四):通过vrrp_script实现对集群资源的监控【转】
查看>>
CollapsingToolbarLayoutDemo【可折叠式标题栏,顺便带有CardView卡片式布局】
查看>>
CentOS7.4安装配置mysql5.7 TAR免安装版
查看>>
解决IE二级链接无法打开故障
查看>>
Windows phone应用开发[16]-数据加密
查看>>
SQL Server 迁移数据到MySQL
查看>>
通用数据压缩算法简介
查看>>
The next Industry Standard in IT Monitoring, a python implementation Nagios like tool --- Shinken
查看>>