一. ActiveMQ简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
⒐ 支持与Axis的整合
⒑ 可以很容易得调用内嵌JMS provider,进行测试
二.ActiveMQ安装
1. 下载ActiveMQ
打开浏览器,输入如下地址http://activemq.apache.org/activemq-590-release.html, 下载mq。
2. 解压缩文件到一个目录中, 如下图
3. 启动ActiveMQ
如上图, 运行绿色区域中相关程序,如下图
如上图, 启动了ActiveMQ程序。
三.编写生产者程序
1. 创建java项目
如上图, 创建一个java项目, 在项目中创建两个类, 如上图
package cn.iigrowing.www.mq.producer;
import java.util.ArrayList;
import org.apache.activemq.util.IntrospectionSupport;
public final class CommandLineSupport {
private CommandLineSupport() {
}
public static String[] setOptions(Object target, String[] args) {
ArrayList<String> rc = new ArrayList<String>();
for (int i = 0; i < args.length; i++) {
if (args[i] == null) {
continue;
}
if (args[i].startsWith("--")) {
String value = "true";
String name = args[i].substring(2);
int p = name.indexOf("=");
if (p > 0) {
value = name.substring(p + 1);
name = name.substring(0, p);
}
if (name.length() == 0) {
rc.add(args[i]);
continue;
}
String propName = convertOptionToPropertyName(name);
if (!IntrospectionSupport.setProperty(target, propName, value)) {
rc.add(args[i]);
continue;
}
}
}
String r[] = new String[rc.size()];
rc.toArray(r);
return r;
}
private static String convertOptionToPropertyName(String name) {
String rc = "";
int p = name.indexOf("-");
while (p > 0) {
// strip
rc += name.substring(0, p);
name = name.substring(p + 1);
if (name.length() > 0) {
rc += name.substring(0, 1).toUpperCase();
name = name.substring(1);
}
p = name.indexOf("-");
}
return rc + name;
}
}
package cn.iigrowing.www.mq.producer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
public class ProducerTool extends Thread {
private Destination destination;
private int messageCount = 10;
private long sleepTime;
private boolean verbose = true;
private int messageSize = 255;
private static int parallelThreads = 1;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private boolean topic;
private boolean transacted;
private boolean persistent;
private static Object lockResults = new Object();
public static void main(String[] args) {
System.out
.println("useage:\r\n"
+ "--url=tcp://192.168.65.188:61616 --topic=false --subject=TEST.FOO --persistent=false --message-count=2000 --message-size=1000 --parallel-threads=1 --time-to-live=0 --sleep-time=0 --transacted=false --verbose=true --user= --password=\r\n");
ArrayList<ProducerTool> threads = new ArrayList();
ProducerTool producerTool = new ProducerTool();
String[] unknown = CommandLineSupport.setOptions(producerTool, args);
if (unknown.length > 0) {
System.out.println("Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
producerTool.showParameters();
for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {
producerTool = new ProducerTool();
CommandLineSupport.setOptions(producerTool, args);
producerTool.start();
threads.add(producerTool);
}
while (true) {
Iterator<ProducerTool> itr = threads.iterator();
int running = 0;
while (itr.hasNext()) {
ProducerTool thread = itr.next();
if (thread.isAlive()) {
running++;
}
}
if (running <= 0) {
System.out.println("All threads completed their work");
break;
}
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
producerTool.showParameters();
}
public void showParameters() {
System.out.println("Connecting to URL: " + url + " (" + user + ":"
+ password + ")");
System.out.println("Publishing a Message with size " + messageSize
+ " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using "
+ (persistent ? "persistent" : "non-persistent") + " messages");
System.out.println("Sleeping between publish " + sleepTime + " ms");
System.out.println("Running " + parallelThreads + " parallel threads");
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + " ms");
}
}
public void run() {
Connection connection = null;
try {
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
// Start sending messages
sendLoop(session, producer);
System.out.println("[" + this.getName() + "] Done.");
synchronized (lockResults) {
ActiveMQConnection c = (ActiveMQConnection) connection;
System.out.println("[" + this.getName() + "] Results:\n");
c.getConnectionStats().dump(new IndentPrinter());
}
} catch (Exception e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
static int total_count1 = 0;
public int incCount() {
synchronized (ProducerTool.class) {
total_count1++;
return total_count1;
}
}
public int getCount() {
synchronized (ProducerTool.class) {
return total_count1;
}
}
protected void sendLoop(Session session, MessageProducer producer)
throws Exception {
for (int i = 0; i < messageCount || messageCount == 0; i++) {
TextMessage message = session
.createTextMessage(createMessageText(i));
String msg = message.getText();
if (verbose) {
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
}
producer.send(message);
int c = incCount();
System.out.println(c + "\t\t[" + this.getName()
+ "] Sending message: '" + msg + "'");
if (transacted) {
System.out
.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit message <<<<<<<<<<<<<");
session.commit();
}
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}
}
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
public void setPersistent(boolean durable) {
this.persistent = durable;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setParallelThreads(int parallelThreads) {
if (parallelThreads < 1) {
parallelThreads = 1;
}
this.parallelThreads = parallelThreads;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}
2. 采用fatjar插件, 将项目打包
如上图, 请特别注意,选择上图中绿色区域的 选择, 这样确保程序启动时启动这个类.
如上图, 采用easy explore插件, 快速到达项目的目录中,如下图
启动一个windows下的控制台程序
输入如下命令, java -jar cn.iigrowing.www.mq01_fat.jar --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --persistent=false --message-count=20000000 --message-size=1000 --parallel-threads=1 --time-to-live=0 --sleep-time=0 --transacted=false --verbose=true --user= --password=
启动生产程序, 开始发送消息, 如下图
四.编写消费者程序
1. 创建一个java项目,如下图
2. 创建消费者类ConsumerTool
package cn.iigrowing.www.mq.consumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool extends Thread implements MessageListener,
ExceptionListener {
private boolean running;
private Session session;
private Destination destination;
private MessageProducer replyProducer;
private boolean pauseBeforeShutdown = false;
private boolean verbose = true;
private int maxiumMessages;
private static int parallelThreads = 1;
private String subject = "TOOL.DEFAULT";
private boolean topic;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private boolean transacted;
private boolean durable;
private String clientId;
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private String consumerName = "James";
private long sleepTime;
private long receiveTimeOut;
private long batch = 10; // Default batch size for CLIENT_ACKNOWLEDGEMENT or
// SESSION_TRANSACTED
private long messagesReceived = 0;
public static void main(String[] args) {
System.out.println(" --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --durable=false --maxium-messages=2000 --client-id=consumer1 --parallel-threads=1 --transacted=false --sleep-time=0 --verbose=true --ack-mode=AUTO_ACKNOWLEDGE --receive-time-out=0 --batch=10 --user= --password= ");
ArrayList<ConsumerTool> threads = new ArrayList();
ConsumerTool consumerTool = new ConsumerTool();
String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
if (unknown.length > 0) {
System.out.println("Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
consumerTool.showParameters();
for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {
consumerTool = new ConsumerTool();
CommandLineSupport.setOptions(consumerTool, args);
consumerTool.start();
threads.add(consumerTool);
}
while (true) {
Iterator<ConsumerTool> itr = threads.iterator();
int running = 0;
while (itr.hasNext()) {
ConsumerTool thread = itr.next();
if (thread.isAlive()) {
running++;
}
}
if (running <= 0) {
System.out.println("All threads completed their work");
break;
}
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
Iterator<ConsumerTool> itr = threads.iterator();
while (itr.hasNext()) {
ConsumerTool thread = itr.next();
}
consumerTool.showParameters();
}
public void showParameters() {
System.out.println("Connecting to URL: " + url + " (" + user + ":"
+ password + ")");
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": "
+ subject);
System.out.println("Using a " + (durable ? "durable" : "non-durable")
+ " subscription");
System.out.println("Running " + parallelThreads + " parallel threads");
}
public void run() {
try {
running = true;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientId != null && clientId.length() > 0
&& !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination,
consumerName);
} else {
consumer = session.createConsumer(destination);
}
// consumer.receive();
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer,
receiveTimeOut);
}
}
} catch (Exception e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
}
}
static int total_count1 = 0;
public int incCount() {
synchronized (ConsumerTool.class) {
total_count1++;
return total_count1;
}
}
public int getCount() {
synchronized (ConsumerTool.class) {
return total_count1;
}
}
public void onMessage(Message message) {
messagesReceived++;
try {
int c = incCount(); // total_count++;
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
if (verbose) {
String msg = txtMsg.getText();
int length = msg.length();
if (length > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println(c + "\t\t[" + this.getName()
+ "] Received: '" + msg + "' (length " + length
+ ")");
}
} else {
if (verbose) {
System.out.println(c + "\t\t[" + this.getName()
+ "] Received: '" + message + "'");
}
}
if (message.getJMSReplyTo() != null) {
replyProducer.send(
message.getJMSReplyTo(),
session.createTextMessage("Reply: "
+ message.getJMSMessageID()));
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last "
+ batch + " messages; messages so far = "
+ messagesReceived);
session.commit();
}
} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
if ((messagesReceived % batch) == 0) {
System.out.println("Acknowledging last " + batch
+ " messages; messages so far = "
+ messagesReceived);
message.acknowledge();
}
}
} catch (JMSException e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
} finally {
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
}
public synchronized void onException(JMSException ex) {
System.out.println("[" + this.getName()
+ "] JMS Exception occured. Shutting down client.");
running = false;
}
synchronized boolean isRunning() {
return running;
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException,
IOException {
System.out.println("[" + this.getName()
+ "] We are about to wait until we consume: " + maxiumMessages
+ " message(s) then we will shutdown");
for (int i = 0; i < maxiumMessages && isRunning();) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("[" + this.getName() + "] Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("[" + this.getName()
+ "] Press return to shut down");
System.in.read();
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer, long timeout)
throws JMSException, IOException {
System.out
.println("["
+ this.getName()
+ "] We will consume messages while they continue to be delivered within: "
+ timeout + " ms, and then we will shutdown");
Message message;
while ((message = consumer.receive(timeout)) != null) {
onMessage(message);
}
System.out.println("[" + this.getName() + "] Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("[" + this.getName()
+ "] Press return to shut down");
System.in.read();
}
}
public void setAckMode(String ackMode) {
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.CLIENT_ACKNOWLEDGE;
}
if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.AUTO_ACKNOWLEDGE;
}
if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
if ("SESSION_TRANSACTED".equals(ackMode)) {
this.ackMode = Session.SESSION_TRANSACTED;
}
}
public void setClientId(String clientID) {
this.clientId = clientID;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public void setMaxiumMessages(int maxiumMessages) {
this.maxiumMessages = maxiumMessages;
}
public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
this.pauseBeforeShutdown = pauseBeforeShutdown;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setReceiveTimeOut(long receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setParallelThreads(int parallelThreads) {
if (parallelThreads < 1) {
parallelThreads = 1;
}
this.parallelThreads = parallelThreads;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public void setBatch(long batch) {
this.batch = batch;
}
}
3. 打包项目形成一个可以运行的jar
4. 启动控制台程序(run命令行中输入cmd命令)
5. 运行如下脚本java -jar cn.iigrowing.www.consumer01.jar --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --durable=false --maxium-messages=200000000 --client-id=consumer1 --parallel-threads=1 --transacted=false --sleep-time=0 --verbose=true --ack-mode=AUTO_ACKNOWLEDGE --receive-time-out=0 --batch=10 --user= --password=
五.用消费这个消费消息
1. 启动消费者程序, 让消费者程序接受100000条以后推出
2. 启动生产程序
3. 观察相关工作情况
程序下载地址
http://pan.baidu.com/s/1pJHcww7