ActiveMQ学习入门-快速安装与测试

一. ActiveMQ简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

⒍ 支持通过JDBC和journal提供高速的消息持久化

⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点

⒏ 支持Ajax

⒐ 支持与Axis的整合

⒑ 可以很容易得调用内嵌JMS provider,进行测试

二.ActiveMQ安装

1. 下载ActiveMQ

打开浏览器,输入如下地址http://activemq.apache.org/activemq-590-release.html, 下载mq。

2. 解压缩文件到一个目录中, 如下图

3. 启动ActiveMQ

如上图, 运行绿色区域中相关程序,如下图

如上图, 启动了ActiveMQ程序。

三.编写生产者程序

1. 创建java项目

如上图, 创建一个java项目, 在项目中创建两个类, 如上图

package cn.iigrowing.www.mq.producer;

import java.util.ArrayList;

import org.apache.activemq.util.IntrospectionSupport;

public final class CommandLineSupport {

private CommandLineSupport() {

}

public static String[] setOptions(Object target, String[] args) {

ArrayList<String> rc = new ArrayList<String>();

for (int i = 0; i < args.length; i++) {

if (args[i] == null) {

continue;

}

if (args[i].startsWith("--")) {

String value = "true";

String name = args[i].substring(2);

int p = name.indexOf("=");

if (p > 0) {

value = name.substring(p + 1);

name = name.substring(0, p);

}

if (name.length() == 0) {

rc.add(args[i]);

continue;

}

String propName = convertOptionToPropertyName(name);

if (!IntrospectionSupport.setProperty(target, propName, value)) {

rc.add(args[i]);

continue;

}

}

}

String r[] = new String[rc.size()];

rc.toArray(r);

return r;

}

private static String convertOptionToPropertyName(String name) {

String rc = "";

int p = name.indexOf("-");

while (p > 0) {

// strip

rc += name.substring(0, p);

name = name.substring(p + 1);

if (name.length() > 0) {

rc += name.substring(0, 1).toUpperCase();

name = name.substring(1);

}

p = name.indexOf("-");

}

return rc + name;

}

}

package cn.iigrowing.www.mq.producer;

import java.util.Arrays;

import java.util.ArrayList;

import java.util.Date;

import java.util.Iterator;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.util.IndentPrinter;

public class ProducerTool extends Thread {

private Destination destination;

private int messageCount = 10;

private long sleepTime;

private boolean verbose = true;

private int messageSize = 255;

private static int parallelThreads = 1;

private long timeToLive;

private String user = ActiveMQConnection.DEFAULT_USER;

private String password = ActiveMQConnection.DEFAULT_PASSWORD;

private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private String subject = "TOOL.DEFAULT";

private boolean topic;

private boolean transacted;

private boolean persistent;

private static Object lockResults = new Object();

public static void main(String[] args) {

System.out

.println("useage:\r\n"

+ "--url=tcp://192.168.65.188:61616 --topic=false --subject=TEST.FOO --persistent=false --message-count=2000 --message-size=1000 --parallel-threads=1 --time-to-live=0 --sleep-time=0 --transacted=false --verbose=true --user= --password=\r\n");

ArrayList<ProducerTool> threads = new ArrayList();

ProducerTool producerTool = new ProducerTool();

String[] unknown = CommandLineSupport.setOptions(producerTool, args);

if (unknown.length > 0) {

System.out.println("Unknown options: " + Arrays.toString(unknown));

System.exit(-1);

}

producerTool.showParameters();

for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {

producerTool = new ProducerTool();

CommandLineSupport.setOptions(producerTool, args);

producerTool.start();

threads.add(producerTool);

}

while (true) {

Iterator<ProducerTool> itr = threads.iterator();

int running = 0;

while (itr.hasNext()) {

ProducerTool thread = itr.next();

if (thread.isAlive()) {

running++;

}

}

if (running <= 0) {

System.out.println("All threads completed their work");

break;

}

try {

Thread.sleep(1000);

} catch (Exception e) {

}

}

producerTool.showParameters();

}

public void showParameters() {

System.out.println("Connecting to URL: " + url + " (" + user + ":"

+ password + ")");

System.out.println("Publishing a Message with size " + messageSize

+ " to " + (topic ? "topic" : "queue") + ": " + subject);

System.out.println("Using "

+ (persistent ? "persistent" : "non-persistent") + " messages");

System.out.println("Sleeping between publish " + sleepTime + " ms");

System.out.println("Running " + parallelThreads + " parallel threads");

if (timeToLive != 0) {

System.out.println("Messages time to live " + timeToLive + " ms");

}

}

public void run() {

Connection connection = null;

try {

// Create the connection.

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

user, password, url);

connection = connectionFactory.createConnection();

connection.start();

// Create the session

Session session = connection.createSession(transacted,

Session.AUTO_ACKNOWLEDGE);

if (topic) {

destination = session.createTopic(subject);

} else {

destination = session.createQueue(subject);

}

// Create the producer.

MessageProducer producer = session.createProducer(destination);

if (persistent) {

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

} else {

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

}

if (timeToLive != 0) {

producer.setTimeToLive(timeToLive);

}

// Start sending messages

sendLoop(session, producer);

System.out.println("[" + this.getName() + "] Done.");

synchronized (lockResults) {

ActiveMQConnection c = (ActiveMQConnection) connection;

System.out.println("[" + this.getName() + "] Results:\n");

c.getConnectionStats().dump(new IndentPrinter());

}

} catch (Exception e) {

System.out.println("[" + this.getName() + "] Caught: " + e);

e.printStackTrace();

} finally {

try {

connection.close();

} catch (Throwable ignore) {

}

}

}

static int total_count1 = 0;

public int incCount() {

synchronized (ProducerTool.class) {

total_count1++;

return total_count1;

}

}

public int getCount() {

synchronized (ProducerTool.class) {

return total_count1;

}

}

protected void sendLoop(Session session, MessageProducer producer)

throws Exception {

for (int i = 0; i < messageCount || messageCount == 0; i++) {

TextMessage message = session

.createTextMessage(createMessageText(i));

String msg = message.getText();

if (verbose) {

if (msg.length() > 50) {

msg = msg.substring(0, 50) + "...";

}

}

producer.send(message);

int c = incCount();

System.out.println(c + "\t\t[" + this.getName()

+ "] Sending message: '" + msg + "'");

if (transacted) {

System.out

.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> commit message <<<<<<<<<<<<<");

session.commit();

}

if (sleepTime > 0) {

Thread.sleep(sleepTime);

}

}

}

private String createMessageText(int index) {

StringBuffer buffer = new StringBuffer(messageSize);

buffer.append("Message: " + index + " sent at: " + new Date());

if (buffer.length() > messageSize) {

return buffer.substring(0, messageSize);

}

for (int i = buffer.length(); i < messageSize; i++) {

buffer.append(' ');

}

return buffer.toString();

}

public void setPersistent(boolean durable) {

this.persistent = durable;

}

public void setMessageCount(int messageCount) {

this.messageCount = messageCount;

}

public void setMessageSize(int messageSize) {

this.messageSize = messageSize;

}

public void setPassword(String pwd) {

this.password = pwd;

}

public void setSleepTime(long sleepTime) {

this.sleepTime = sleepTime;

}

public void setSubject(String subject) {

this.subject = subject;

}

public void setTimeToLive(long timeToLive) {

this.timeToLive = timeToLive;

}

public void setParallelThreads(int parallelThreads) {

if (parallelThreads < 1) {

parallelThreads = 1;

}

this.parallelThreads = parallelThreads;

}

public void setTopic(boolean topic) {

this.topic = topic;

}

public void setQueue(boolean queue) {

this.topic = !queue;

}

public void setTransacted(boolean transacted) {

this.transacted = transacted;

}

public void setUrl(String url) {

this.url = url;

}

public void setUser(String user) {

this.user = user;

}

public void setVerbose(boolean verbose) {

this.verbose = verbose;

}

}

2. 采用fatjar插件, 将项目打包

如上图, 请特别注意,选择上图中绿色区域的 选择, 这样确保程序启动时启动这个类.

如上图, 采用easy explore插件, 快速到达项目的目录中,如下图

启动一个windows下的控制台程序

输入如下命令, java -jar cn.iigrowing.www.mq01_fat.jar --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --persistent=false --message-count=20000000 --message-size=1000 --parallel-threads=1 --time-to-live=0 --sleep-time=0 --transacted=false --verbose=true --user= --password=

启动生产程序, 开始发送消息, 如下图

四.编写消费者程序

1. 创建一个java项目,如下图

2. 创建消费者类ConsumerTool

package cn.iigrowing.www.mq.consumer;

import java.io.IOException;

import java.util.Arrays;

import java.util.ArrayList;

import java.util.Iterator;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool extends Thread implements MessageListener,

ExceptionListener {

private boolean running;

private Session session;

private Destination destination;

private MessageProducer replyProducer;

private boolean pauseBeforeShutdown = false;

private boolean verbose = true;

private int maxiumMessages;

private static int parallelThreads = 1;

private String subject = "TOOL.DEFAULT";

private boolean topic;

private String user = ActiveMQConnection.DEFAULT_USER;

private String password = ActiveMQConnection.DEFAULT_PASSWORD;

private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private boolean transacted;

private boolean durable;

private String clientId;

private int ackMode = Session.AUTO_ACKNOWLEDGE;

private String consumerName = "James";

private long sleepTime;

private long receiveTimeOut;

private long batch = 10; // Default batch size for CLIENT_ACKNOWLEDGEMENT or

// SESSION_TRANSACTED

private long messagesReceived = 0;

public static void main(String[] args) {

System.out.println(" --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --durable=false --maxium-messages=2000 --client-id=consumer1 --parallel-threads=1 --transacted=false --sleep-time=0 --verbose=true --ack-mode=AUTO_ACKNOWLEDGE --receive-time-out=0 --batch=10 --user= --password= ");

ArrayList<ConsumerTool> threads = new ArrayList();

ConsumerTool consumerTool = new ConsumerTool();

String[] unknown = CommandLineSupport.setOptions(consumerTool, args);

if (unknown.length > 0) {

System.out.println("Unknown options: " + Arrays.toString(unknown));

System.exit(-1);

}

consumerTool.showParameters();

for (int threadCount = 1; threadCount <= parallelThreads; threadCount++) {

consumerTool = new ConsumerTool();

CommandLineSupport.setOptions(consumerTool, args);

consumerTool.start();

threads.add(consumerTool);

}

while (true) {

Iterator<ConsumerTool> itr = threads.iterator();

int running = 0;

while (itr.hasNext()) {

ConsumerTool thread = itr.next();

if (thread.isAlive()) {

running++;

}

}

if (running <= 0) {

System.out.println("All threads completed their work");

break;

}

try {

Thread.sleep(1000);

} catch (Exception e) {

}

}

Iterator<ConsumerTool> itr = threads.iterator();

while (itr.hasNext()) {

ConsumerTool thread = itr.next();

}

consumerTool.showParameters();

}

public void showParameters() {

System.out.println("Connecting to URL: " + url + " (" + user + ":"

+ password + ")");

System.out.println("Consuming " + (topic ? "topic" : "queue") + ": "

+ subject);

System.out.println("Using a " + (durable ? "durable" : "non-durable")

+ " subscription");

System.out.println("Running " + parallelThreads + " parallel threads");

}

public void run() {

try {

running = true;

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

user, password, url);

Connection connection = connectionFactory.createConnection();

if (durable && clientId != null && clientId.length() > 0

&& !"null".equals(clientId)) {

connection.setClientID(clientId);

}

connection.setExceptionListener(this);

connection.start();

session = connection.createSession(transacted, ackMode);

if (topic) {

destination = session.createTopic(subject);

} else {

destination = session.createQueue(subject);

}

replyProducer = session.createProducer(null);

replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

MessageConsumer consumer = null;

if (durable && topic) {

consumer = session.createDurableSubscriber((Topic) destination,

consumerName);

} else {

consumer = session.createConsumer(destination);

}

// consumer.receive();

if (maxiumMessages > 0) {

consumeMessagesAndClose(connection, session, consumer);

} else {

if (receiveTimeOut == 0) {

consumer.setMessageListener(this);

} else {

consumeMessagesAndClose(connection, session, consumer,

receiveTimeOut);

}

}

} catch (Exception e) {

System.out.println("[" + this.getName() + "] Caught: " + e);

e.printStackTrace();

}

}

static int total_count1 = 0;

public int incCount() {

synchronized (ConsumerTool.class) {

total_count1++;

return total_count1;

}

}

public int getCount() {

synchronized (ConsumerTool.class) {

return total_count1;

}

}

public void onMessage(Message message) {

messagesReceived++;

try {

int c = incCount(); // total_count++;

if (message instanceof TextMessage) {

TextMessage txtMsg = (TextMessage) message;

if (verbose) {

String msg = txtMsg.getText();

int length = msg.length();

if (length > 50) {

msg = msg.substring(0, 50) + "...";

}

System.out.println(c + "\t\t[" + this.getName()

+ "] Received: '" + msg + "' (length " + length

+ ")");

}

} else {

if (verbose) {

System.out.println(c + "\t\t[" + this.getName()

+ "] Received: '" + message + "'");

}

}

if (message.getJMSReplyTo() != null) {

replyProducer.send(

message.getJMSReplyTo(),

session.createTextMessage("Reply: "

+ message.getJMSMessageID()));

}

if (transacted) {

if ((messagesReceived % batch) == 0) {

System.out.println("Commiting transaction for last "

+ batch + " messages; messages so far = "

+ messagesReceived);

session.commit();

}

} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {

if ((messagesReceived % batch) == 0) {

System.out.println("Acknowledging last " + batch

+ " messages; messages so far = "

+ messagesReceived);

message.acknowledge();

}

}

} catch (JMSException e) {

System.out.println("[" + this.getName() + "] Caught: " + e);

e.printStackTrace();

} finally {

if (sleepTime > 0) {

try {

Thread.sleep(sleepTime);

} catch (InterruptedException e) {

}

}

}

}

public synchronized void onException(JMSException ex) {

System.out.println("[" + this.getName()

+ "] JMS Exception occured. Shutting down client.");

running = false;

}

synchronized boolean isRunning() {

return running;

}

protected void consumeMessagesAndClose(Connection connection,

Session session, MessageConsumer consumer) throws JMSException,

IOException {

System.out.println("[" + this.getName()

+ "] We are about to wait until we consume: " + maxiumMessages

+ " message(s) then we will shutdown");

for (int i = 0; i < maxiumMessages && isRunning();) {

Message message = consumer.receive(1000);

if (message != null) {

i++;

onMessage(message);

}

}

System.out.println("[" + this.getName() + "] Closing connection");

consumer.close();

session.close();

connection.close();

if (pauseBeforeShutdown) {

System.out.println("[" + this.getName()

+ "] Press return to shut down");

System.in.read();

}

}

protected void consumeMessagesAndClose(Connection connection,

Session session, MessageConsumer consumer, long timeout)

throws JMSException, IOException {

System.out

.println("["

+ this.getName()

+ "] We will consume messages while they continue to be delivered within: "

+ timeout + " ms, and then we will shutdown");

Message message;

while ((message = consumer.receive(timeout)) != null) {

onMessage(message);

}

System.out.println("[" + this.getName() + "] Closing connection");

consumer.close();

session.close();

connection.close();

if (pauseBeforeShutdown) {

System.out.println("[" + this.getName()

+ "] Press return to shut down");

System.in.read();

}

}

public void setAckMode(String ackMode) {

if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {

this.ackMode = Session.CLIENT_ACKNOWLEDGE;

}

if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {

this.ackMode = Session.AUTO_ACKNOWLEDGE;

}

if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {

this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;

}

if ("SESSION_TRANSACTED".equals(ackMode)) {

this.ackMode = Session.SESSION_TRANSACTED;

}

}

public void setClientId(String clientID) {

this.clientId = clientID;

}

public void setConsumerName(String consumerName) {

this.consumerName = consumerName;

}

public void setDurable(boolean durable) {

this.durable = durable;

}

public void setMaxiumMessages(int maxiumMessages) {

this.maxiumMessages = maxiumMessages;

}

public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {

this.pauseBeforeShutdown = pauseBeforeShutdown;

}

public void setPassword(String pwd) {

this.password = pwd;

}

public void setReceiveTimeOut(long receiveTimeOut) {

this.receiveTimeOut = receiveTimeOut;

}

public void setSleepTime(long sleepTime) {

this.sleepTime = sleepTime;

}

public void setSubject(String subject) {

this.subject = subject;

}

public void setParallelThreads(int parallelThreads) {

if (parallelThreads < 1) {

parallelThreads = 1;

}

this.parallelThreads = parallelThreads;

}

public void setTopic(boolean topic) {

this.topic = topic;

}

public void setQueue(boolean queue) {

this.topic = !queue;

}

public void setTransacted(boolean transacted) {

this.transacted = transacted;

}

public void setUrl(String url) {

this.url = url;

}

public void setUser(String user) {

this.user = user;

}

public void setVerbose(boolean verbose) {

this.verbose = verbose;

}

public void setBatch(long batch) {

this.batch = batch;

}

}

3. 打包项目形成一个可以运行的jar

4. 启动控制台程序(run命令行中输入cmd命令)

5. 运行如下脚本java -jar cn.iigrowing.www.consumer01.jar --url=tcp://localhost:61616 --topic=false --subject=TEST.FOO --durable=false --maxium-messages=200000000 --client-id=consumer1 --parallel-threads=1 --transacted=false --sleep-time=0 --verbose=true --ack-mode=AUTO_ACKNOWLEDGE --receive-time-out=0 --batch=10 --user= --password=

五.用消费这个消费消息

1. 启动消费者程序, 让消费者程序接受100000条以后推出

2. 启动生产程序

3. 观察相关工作情况

程序下载地址

http://pan.baidu.com/s/1pJHcww7

发表评论