To configure a Durable Subscriber Topic you can follow the steps provided in the post Steps to Configure Durable Subscriber Topic on Weblogic Server.
In this post I would be using a stand-alone java code for producing messages as a Producer/Clint and for consuming the messages as a Consumer/Listener. By which we can make sure that if something goes wrong then we can narrow down the issue if its an issue with WLS or the code, as many times their an issue with the Producer or a Consumers.
.
Steps of test the Durable Subscriber Topic
- Create a Directory somewhere in your file system like: “D:OracleJMSTopicDurable_Subscribers” to write the DurableTopicSend.java and DurableTopicReceive.java programs.
- In DurableTopicSend.java copy the below command
- In DurableTopicReceive.java copy the below command
- Now open 2 – command prompt windows and run “setWLSEn.cmd” in both the command windows to set the environment (PATH & CLASSPATH).
- Then compile the DurableTopicSend.java and DurableTopicReceive.java programs.
- Now run the code with the following command
- Now try sending messages from prompt-1 which should get received on prompt-2 as shown below
- Once you are able to receive the messages properly then STOP the prompt-2 and still send few messages from prompt-1, now if this was a normal Topic then the messages should have been lost as no one is subscribed/listening on this topic, but if you check the below path from console you will able to see the messages sent to the particular subscriber.
- Also if you again start the prompt-2 you will receive the same messages which was sent by prompt-1 when prompt-2 was down.
/** * @DurableTopicSend Demo for Durable Subscriber * * @author Middleware Magic (c) 2010 */ import java.io.*; import java.util.*; import javax.transaction.*; import javax.naming.*; import javax.jms.*; import javax.rmi.*; public class DurableTopicSend { public final static String JNDI_FACTORY="weblogic.jndi.WLInitialContextFactory"; // Defines the JMS connection factory JNDI name. public final static String CONN_FACTORY="CF"; // Defines the Topic JNDI name. public final static String TOPIC="T-1"; protected TopicConnectionFactory dutconFactory; protected TopicConnection dutcon; protected TopicSession dutsession; protected TopicPublisher dutpublisher; protected Topic dutopic; protected TextMessage msg; public static void main(String[] args) throws Exception { if (args.length != 1) { System.out.println("Usage: java examples.jms.dutopic.DurableTopicSend WebLogicURL"); return; } InitialContext ic = getInitialContext(args[0]); DurableTopicSend duts = new DurableTopicSend(); duts.init(ic, TOPIC); readAndSendMsg(duts); duts.close(); } public void init(Context ctx, String topicName)throws NamingException, JMSException { dutconFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY),TopicConnectionFactory.class); dutcon = dutconFactory.createTopicConnection(); dutsession = dutcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); dutopic = (Topic) PortableRemoteObject.narrow(ctx.lookup(topicName), Topic.class); dutpublisher = dutsession.createPublisher(dutopic); msg = dutsession.createTextMessage(); dutcon.start(); } protected static InitialContext getInitialContext(String url)throws NamingException { Hashtable<String,String> env = new Hashtable<String,String>(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, url); env.put("weblogic.jndi.createIntermediateContexts", "true"); return new InitialContext(env); } public void sendmsg(String message) throws JMSException { msg.setText(message); dutpublisher.publish(msg); } protected static void readAndSendMsg(DurableTopicSend duts)throws IOException, JMSException { BufferedReader msgStream = new BufferedReader (new InputStreamReader(System.in)); String line=null; do { System.out.print("Enter your message ("quit" to quit): n"); line = msgStream.readLine(); if (line != null && line.trim().length() != 0) { duts.sendmsg(line); System.out.println("Sent JMS Message: "+line+"n"); } } while (line != null && ! line.equalsIgnoreCase("quit")); } public void close() throws JMSException { dutpublisher.close(); dutsession.close(); dutcon.close(); } }
/** * @DurableTopicReceive Demo for Durable Subscriber * * @author Middleware Magic (c) 2010 */ import java.io.*; import java.util.*; import javax.transaction.*; import javax.naming.*; import javax.jms.*; import javax.rmi.*; public class DurableTopicReceive implements MessageListener { public final static String JNDI_FACTORY="weblogic.jndi.WLInitialContextFactory"; // Defines the JMS connection factory JNDI name. public final static String CONN_FACTORY="CF"; // Defines the Topic JNDI name. public final static String TOPIC="T-1"; private TopicConnectionFactory dutconFactory; private TopicConnection dutcon; private TopicSession dutsession; private TopicSubscriber dutsubscriber; private Topic dutopic; private boolean quit = false; public void onMessage(Message msg) { try { String msgText; if (msg instanceof TextMessage) { msgText = ((TextMessage)msg).getText(); } else { msgText = msg.toString(); } System.out.println("Received JMS Message: "+ msgText ); if (msgText.equalsIgnoreCase("quit")) { synchronized(this) { quit = true; this.notifyAll(); } } } catch (JMSException jmse) { System.err.println("An exception has occurred: "+jmse.getMessage()); } } public static void main(String[] args) throws Exception { if (args.length != 1) { System.out.println("Usage: java examples.jms.dutopic.DurableTopicReceive WebLogicURL"); return; } InitialContext ic = getInitialContext(args[0]); DurableTopicReceive tr = new DurableTopicReceive(); tr.init(ic, TOPIC); System.out.println("JMS Is Now Ready To Receive Messages (To quit, send a "quit" message)."); synchronized(tr) { while (! tr.quit) { try { tr.wait(); } catch (InterruptedException ie) {} } } tr.close(); } private static InitialContext getInitialContext(String url)throws NamingException { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); env.put(Context.PROVIDER_URL, url); env.put("weblogic.jndi.createIntermediateContexts", "true"); return new InitialContext(env); } public void close() throws JMSException { dutsubscriber.close(); dutsession.close(); dutcon.close(); } /** * * Below are the code which makes it a Durable Subscriber by giving the Client ID and Subscription Name * */ public void init(Context ctx, String topicName)throws NamingException, JMSException { dutconFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY),TopicConnectionFactory.class); dutcon = dutconFactory.createTopicConnection(); // ############## Below the Clinet ID is been given which is "Ravish" ##################### dutcon.setClientID("Ravish"); dutsession = dutcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); dutopic = (Topic)PortableRemoteObject.narrow(ctx.lookup(topicName),Topic.class); // ############## Below the Subscription Name is been given which is "Test" ##################### dutsubscriber = dutsession.createDurableSubscriber(dutopic, "Test"); dutsubscriber.setMessageListener(this); dutcon.start(); } }
Prompt-1
java DurableTopicSend t3://localhost:7003
Prompt-2
java DurableTopicReceive t3://localhost:7003
Note:
With the “java DurableTopicSend” and “java DurableTopicReceive” command you have to give the <Protocol>://<IP-address>:<Port number> of the managed servers in different prompts.
Console Path:
JMS Modules -> MySystemModule-1 -> T-1 -> Monitoring (Tab) -> Durable Subscribers (Sub-Tab)
Conclusion:
Thus this demo makes sure that our configured durable subscriber is working fine. As we got the messages which were sent to a particular subscriber when it was down and not listing, because those messages got stored in the persistence store and were sent again by weblogic server once the subscriber again started listing.
Regards,
Ravish Mody
April 12th, 2013 on 2:59 pm
Hi Ravish,
thanks for this article, its of very useful.
we have Durable subscriber configured for the topic. When ever the managed server associated with the Topic is restarted, we have to restart subscriber to receive the messages again.
we did tried using the reconnect policy to all , still we have same issue, could you please provide your suggestions.
thanks
ARun