mirror of https://github.com/apache/activemq.git
additional test for: https://issues.apache.org/jira/browse/AMQ-3775
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1302641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0af0021d0b
commit
13e5777efb
|
@ -0,0 +1,401 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DurableSubProcessMultiRestartTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessMultiRestartTest.class);
|
||||
|
||||
public static final long RUNTIME = 1 * 60 * 1000;
|
||||
|
||||
private BrokerService broker;
|
||||
private ActiveMQTopic topic;
|
||||
|
||||
private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
|
||||
|
||||
private int restartCount = 0;
|
||||
private final int SUBSCRIPTION_ID = 1;
|
||||
|
||||
static final Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||
|
||||
/**
|
||||
* The test creates a durable subscriber and producer with a broker that is
|
||||
* continually restarted.
|
||||
*
|
||||
* Producer creates a message every .5 seconds -creates a new connection for
|
||||
* each message
|
||||
*
|
||||
* durable subscriber - comes online for 10 seconds, - then goes offline for
|
||||
* a "moment" - repeats the cycle
|
||||
*
|
||||
* approx every 10 seconds the broker restarts. Subscriber and Producer
|
||||
* connections will be closed BEFORE the restart.
|
||||
*
|
||||
* The Durable subscriber is "unsubscribed" before the the end of the test.
|
||||
*
|
||||
* checks for number of kahaDB files left on filesystem.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testProcess() throws Exception {
|
||||
|
||||
DurableSubscriber durableSubscriber = new DurableSubscriber(SUBSCRIPTION_ID);
|
||||
MsgProducer msgProducer = new MsgProducer();
|
||||
|
||||
try {
|
||||
// register the subscription & start messages
|
||||
durableSubscriber.start();
|
||||
msgProducer.start();
|
||||
|
||||
long endTime = System.currentTimeMillis() + RUNTIME;
|
||||
|
||||
while (endTime > System.currentTimeMillis()) {
|
||||
Thread.sleep(10000);
|
||||
restartBroker();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
exit("ProcessTest.testProcess failed.", e);
|
||||
}
|
||||
|
||||
// wait for threads to finish
|
||||
try {
|
||||
msgProducer.join();
|
||||
durableSubscriber.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace(System.out);
|
||||
}
|
||||
|
||||
// restart broker one last time
|
||||
restartBroker();
|
||||
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
|
||||
final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||
assertTrue("only one journal file should be left: " + pa.getStore().getJournal().getFileMap().size(),
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return pa.getStore().getJournal().getFileMap().size() == 1;
|
||||
}
|
||||
}, TimeUnit.MINUTES.toMillis(3))
|
||||
);
|
||||
|
||||
LOG.info("DONE.");
|
||||
}
|
||||
|
||||
private void restartBroker() throws Exception {
|
||||
LOG.info("Broker restart: waiting for components.");
|
||||
|
||||
processLock.writeLock().lock();
|
||||
try {
|
||||
destroyBroker();
|
||||
startBroker(false);
|
||||
|
||||
restartCount++;
|
||||
LOG.info("Broker restarted. count: " + restartCount);
|
||||
} finally {
|
||||
processLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Producers messages
|
||||
*
|
||||
*/
|
||||
final class MsgProducer extends Thread {
|
||||
|
||||
String url = "vm://" + DurableSubProcessMultiRestartTest.getName();
|
||||
|
||||
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||
|
||||
private long msgCount;
|
||||
int messageRover = 0;
|
||||
|
||||
public MsgProducer() {
|
||||
super("MsgProducer");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
long endTime = RUNTIME + System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
while (endTime > System.currentTimeMillis()) {
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
processLock.readLock().lock();
|
||||
try {
|
||||
send();
|
||||
} finally {
|
||||
processLock.readLock().unlock();
|
||||
}
|
||||
LOG.info("MsgProducer msgCount=" + msgCount);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
exit("Server.run failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void send() throws JMSException {
|
||||
|
||||
LOG.info("Sending ... ");
|
||||
|
||||
Connection con = cf.createConnection();
|
||||
|
||||
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer prod = sess.createProducer(null);
|
||||
Message message = sess.createMessage();
|
||||
message.setIntProperty("ID", ++messageRover);
|
||||
message.setBooleanProperty("COMMIT", true);
|
||||
prod.send(topic, message);
|
||||
|
||||
msgCount++;
|
||||
LOG.info("Message Sent.");
|
||||
|
||||
sess.close();
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes massages from a durable subscription. Goes online/offline
|
||||
* periodically.
|
||||
*/
|
||||
private final class DurableSubscriber extends Thread {
|
||||
|
||||
String url = "tcp://localhost:61656";
|
||||
|
||||
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||
|
||||
public static final String SUBSCRIPTION_NAME = "subscription";
|
||||
|
||||
private final int id;
|
||||
private final String conClientId;
|
||||
private long msgCount;
|
||||
|
||||
public DurableSubscriber(int id) throws JMSException {
|
||||
super("DurableSubscriber" + id);
|
||||
setDaemon(true);
|
||||
|
||||
this.id = id;
|
||||
conClientId = "cli" + id;
|
||||
|
||||
subscribe();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
long end = System.currentTimeMillis() + RUNTIME;
|
||||
|
||||
try {
|
||||
|
||||
// while (true) {
|
||||
while (end > System.currentTimeMillis()) {
|
||||
|
||||
processLock.readLock().lock();
|
||||
try {
|
||||
process(5000);
|
||||
} finally {
|
||||
processLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
unsubscribe();
|
||||
|
||||
} catch (Throwable e) {
|
||||
exit(toString() + " failed.", e);
|
||||
}
|
||||
|
||||
LOG.info(toString() + " DONE. MsgCout=" + msgCount);
|
||||
}
|
||||
|
||||
private void process(long duration) throws JMSException {
|
||||
LOG.info(toString() + " ONLINE.");
|
||||
|
||||
Connection con = openConnection();
|
||||
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
|
||||
|
||||
long end = System.currentTimeMillis() + duration;
|
||||
|
||||
try {
|
||||
while (end > System.currentTimeMillis()) {
|
||||
Message message = consumer.receive(100);
|
||||
if (message != null) {
|
||||
LOG.info(toString() + "received message...");
|
||||
msgCount++;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
sess.close();
|
||||
con.close();
|
||||
LOG.info(toString() + " OFFLINE.");
|
||||
}
|
||||
}
|
||||
|
||||
private Connection openConnection() throws JMSException {
|
||||
Connection con = cf.createConnection();
|
||||
con.setClientID(conClientId);
|
||||
con.start();
|
||||
return con;
|
||||
}
|
||||
|
||||
private void subscribe() throws JMSException {
|
||||
Connection con = openConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
|
||||
LOG.info(toString() + " SUBSCRIBED");
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
private void unsubscribe() throws JMSException {
|
||||
Connection con = openConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.unsubscribe(SUBSCRIPTION_NAME);
|
||||
LOG.info(toString() + " UNSUBSCRIBED");
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DurableSubscriber[id=" + id + "]";
|
||||
}
|
||||
}
|
||||
|
||||
// -------- helper methods -----------
|
||||
|
||||
public static void exit(String message) {
|
||||
exit(message, null);
|
||||
}
|
||||
|
||||
public static void exit(String message, Throwable e) {
|
||||
Throwable cause = new RuntimeException(message, e);
|
||||
LOG.error(message, cause);
|
||||
exceptions.add(cause);
|
||||
fail(cause.toString());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
topic = new ActiveMQTopic("TopicT");
|
||||
startBroker();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
destroyBroker();
|
||||
}
|
||||
|
||||
private void startBroker() throws Exception {
|
||||
startBroker(true);
|
||||
}
|
||||
|
||||
private void startBroker(boolean deleteAllMessages) throws Exception {
|
||||
if (broker != null)
|
||||
return;
|
||||
|
||||
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
|
||||
broker.setBrokerName(getName());
|
||||
broker.setAdvisorySupport(false);
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
|
||||
broker.setKeepDurableSubsActive(true);
|
||||
|
||||
File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
|
||||
if (deleteAllMessages)
|
||||
delete(kahadbData);
|
||||
|
||||
broker.setPersistent(true);
|
||||
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
|
||||
kahadb.setDirectory(kahadbData);
|
||||
kahadb.setJournalMaxFileLength(20 * 1024);
|
||||
broker.setPersistenceAdapter(kahadb);
|
||||
|
||||
broker.addConnector("tcp://localhost:61656");
|
||||
|
||||
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
|
||||
broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
|
||||
|
||||
broker.start();
|
||||
}
|
||||
|
||||
protected static String getName() {
|
||||
return "DurableSubProcessMultiRestartTest";
|
||||
}
|
||||
|
||||
private static boolean delete(File path) {
|
||||
if (path == null)
|
||||
return true;
|
||||
|
||||
if (path.isDirectory()) {
|
||||
for (File file : path.listFiles()) {
|
||||
delete(file);
|
||||
}
|
||||
}
|
||||
return path.delete();
|
||||
}
|
||||
|
||||
private void destroyBroker() throws Exception {
|
||||
if (broker == null)
|
||||
return;
|
||||
|
||||
broker.stop();
|
||||
broker = null;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue