To Start ActiveMq in network broker mode:
1- Configuration
$ cd $ACTIVEMQ_HOME/
$vi conf/activemq-dynamic-network-broker1.xml
--> add balise to this config file
$ cp conf/jetty.xml conf/jetty2.xml
$ vi conf/jetty.xml conf/jetty2.xml : update port number of jetty web server
$vi conf/activemq-dynamic-network-broker2.xml
--> add
2 - Developpement (producer and consumer)
- Create consumer:
package com.test.jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsNetworkBrokerConsumer {
private static ActiveMQConnectionFactory connectionFactory;
private static Connection connection;
private static Session session;
private static Destination destination;
private static boolean transacted = false;
private static String brokerUrl = "failover:(tcp://localhost:61616,tcp://localhost:61618)";
public static void main(String ...strings) throws Exception{
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("common.queue");
MessageConsumer consumer = session.createConsumer(destination);
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException exception) {
System.err.println("Error:" + exception);
}
});
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("New Message : " + message);
}
});
}
}
- And now Create producer
package com.test.jms;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsNetworkBrokerProducer { public static void main(String[] args) throws Exception
{
// Create the connection
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(null, null, "failover:(tcp://localhost:61616,tcp://localhost:61618)");
Connection connection = connectionFactory.createConnection();
connection.start();
// Create the Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// And the Destination queue
Destination destination = session.createQueue("common.queue");
// Create the producer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("hello");
//Send 3 messages at a time
//for (int i = 0; i < 3; i++)
// producer.send(m);
producer.send(m);
//shutdown with NO Error handling
connection.close();
}
}
3- Start Servers
--> Start Server 1:
- $ bin/activemq console xbean:conf/activemq-dynamic-network-broker1.xml
--> port activeMQ 61616, port jetty 8161
--> Start Server 2 :
- $ bin/activemq console xbean:conf/activemq-dynamic-network-broker2.xml
--> port activeMQ 61618, port jetty 8162
4 - Start consumer : java JmsNetworkBrokerConsumer
5 - Start producer : java JmsNetworkBrokerProducer