From 53b29a206bda45d6ea5aaaeaddb46e4fa0ef45ad Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 13 Apr 2012 11:47:11 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3805 - duplicate dispatch to durable sub with concurrent send transaction commit and activate. fixed up the use of audit through an activate/deactivate such that duplicate dispatch is suppressed at source in this case git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1325722 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/DurableTopicSubscription.java | 5 +- .../cursors/AbstractPendingMessageCursor.java | 1 - .../region/cursors/AbstractStoreCursor.java | 1 + .../cursors/StoreDurableSubscriberCursor.java | 3 +- .../region/cursors/TopicStorePrefetch.java | 2 +- ...ncurrentCommitActivateNoDuplicateTest.java | 942 ++++++++++++++++++ ...leSubscriberWithNetworkDisconnectTest.java | 2 +- .../DurableSubscriptionOfflineTest.java | 213 +++- .../apache/kahadb/index/BTreeIndexTest.java | 203 +++- 9 files changed, 1355 insertions(+), 17 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 4f4d1ee482..86b63eb64f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -163,7 +163,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public void deactivate(boolean keepDurableSubsActive) throws Exception { - LOG.debug("Deactivating " + this); + LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this); active.set(false); offlineTimestamp.set(System.currentTimeMillis()); this.usageManager.getMemoryUsage().removeUsageListener(this); @@ -187,9 +187,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } else { redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); } - if (keepDurableSubsActive&& pending.isTransient()) { + if (keepDurableSubsActive && pending.isTransient()) { synchronized (pending) { pending.addMessageFirst(node); + pending.rollback(node.getMessageId()); } } else { node.decrementReferenceCount(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index caa2d5ecbd..115d3b04bd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -64,7 +64,6 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs public synchronized void stop() throws Exception { started=false; - audit=null; gc(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 359b0d36a9..7546d4f7b0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -243,6 +243,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i batchList.clear(); clearIterator(false); batchResetNeeded = true; + // wonder do we need to determine size here, it may change before restart resetSize(); setCacheEnabled(false); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 842dbd2988..51c8a044ea 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -93,16 +93,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { if (isStarted()) { if (subscription.isKeepDurableSubsActive()) { super.gc(); - super.getMessageAudit().clear(); for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); - tsp.getMessageAudit().clear(); } } else { super.stop(); for (PendingMessageCursor tsp : storePrefetches) { tsp.stop(); } + getMessageAudit().clear(); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 42aa1f8e2c..4d06ecd10c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -119,6 +119,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override public String toString() { - return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString(); + return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java new file mode 100644 index 0000000000..78020e48ca --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java @@ -0,0 +1,942 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.Vector; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.ThreadTracker; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; + +public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest { + private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.class); + public static final long RUNTIME = 5 * 60 * 1000; + + public static final int SERVER_SLEEP = 500; // max + public static final int CARGO_SIZE = 600; // max + + public static final int MAX_CLIENTS = 2; + public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000); + public static final Random CLIENT_ONLINE = new Random(30 * 1000, 40 * 1000); + public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 10 * 1000); + + public static final int CLIENT_OFFLINE_DURING_COMMIT = 2; // random(x) == x + + public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB; + + public static final long BROKER_RESTART = -2 * 60 * 1000; + + public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true; + public static final boolean CHECK_REDELIVERY = true; + + private BrokerService broker; + private ActiveMQTopic topic; + + private ClientManager clientManager; + private Server server; + private HouseKeeper houseKeeper; + + private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock( + true); + private int restartCount = 0; + private final AtomicInteger onlineCount = new AtomicInteger(0); + static final Vector exceptions = new Vector(); + + // long form of test that found https://issues.apache.org/jira/browse/AMQ-3805 + @Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate") + @Test + public void testProcess() { + try { + server.start(); + clientManager.start(); + + if (ALLOW_SUBSCRIPTION_ABANDONMENT) + houseKeeper.start(); + + if (BROKER_RESTART <= 0) + Thread.sleep(RUNTIME); + else { + long end = System.currentTimeMillis() + RUNTIME; + + while (true) { + long now = System.currentTimeMillis(); + if (now > end) + break; + + now = end - now; + now = now < BROKER_RESTART ? now : BROKER_RESTART; + Thread.sleep(now); + + restartBroker(); + } + } + } catch (Throwable e) { + exit("ProcessTest.testProcess failed.", e); + } + + //allow the clients to unsubscribe before finishing + clientManager.setEnd(true); + try { + Thread.sleep(600000); + } catch (InterruptedException e) { + exit("ProcessTest.testProcess failed.", e); + } + + + processLock.writeLock().lock(); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + LOG.info("DONE."); + } + + private void restartBroker() throws Exception { + LOG.info("Broker restart: waiting for components."); + + processLock.writeLock().lock(); + try { + destroyBroker(); + startBroker(false); + + restartCount++; + LOG.info("Broker restarted. count: " + restartCount); + } finally { + processLock.writeLock().unlock(); + } + } + + /** + * Creates batch of messages in a transaction periodically. The last message + * in the transaction is always a special message what contains info about + * the whole transaction. + *

+ * Notifies the clients about the created messages also. + */ + final class Server extends Thread { + + final String url = "vm://" + + DurableSubProcessConcurrentCommitActivateNoDuplicateTest.getName() + + "?" + + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&" + + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" + + "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&" + + "jms.alwaysSyncSend=true&jms.dispatchAsync=false&" + + "jms.watchTopicAdvisories=false&" + + "waitForStart=200&create=false"; + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + final Object sendMutex = new Object(); + final String[] cargos = new String[500]; + + int transRover = 0; + int messageRover = 0; + public volatile int committingTransaction = -1; + + public Server() { + super("Server"); + setPriority(Thread.MIN_PRIORITY); + setDaemon(true); + } + + @Override + public void run() { + try { + while (true) { + + Thread.sleep(1000); + + processLock.readLock().lock(); + try { + send(); + } finally { + processLock.readLock().unlock(); + } + } + } catch (Throwable e) { + exit("Server.run failed", e); + } + } + + public void send() throws JMSException { + // do not create new clients now + // ToDo: Test this case later. + synchronized (sendMutex) { + int trans = ++transRover; + boolean relevantTrans = true; //random(2) > 1; + ClientType clientType = relevantTrans ? ClientType + .randomClientType() : null; // sends this types + //int count = random(500, 700); + int count = 1000; + + LOG.info("Sending Trans[id=" + trans + ", count=" + + count + ", clientType=" + clientType + ", firstID=" + (messageRover+1) + "]"); + + Connection con = cf.createConnection(); + Session sess = con + .createSession(true, Session.SESSION_TRANSACTED); + MessageProducer prod = sess.createProducer(null); + + for (int i = 0; i < count; i++) { + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + message.setIntProperty("TRANS", trans); + String type = clientType != null ? clientType + .randomMessageType() : ClientType + .randomNonRelevantMessageType(); + message.setStringProperty("TYPE", type); + + if (CARGO_SIZE > 0) + message.setStringProperty("CARGO", + getCargo(random(CARGO_SIZE))); + + prod.send(topic, message); + clientManager.onServerMessage(message); + } + + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + message.setIntProperty("TRANS", trans); + message.setBooleanProperty("COMMIT", true); + message.setBooleanProperty("RELEVANT", relevantTrans); + prod.send(topic, message); + clientManager.onServerMessage(message); + + committingTransaction = trans; + sess.commit(); + committingTransaction = -1; + + LOG.info("Committed Trans[id=" + trans + ", count=" + + count + ", clientType=" + clientType + "], ID=" + messageRover); + + sess.close(); + con.close(); + } + } + + private String getCargo(int length) { + if (length == 0) + return null; + + if (length < cargos.length) { + String result = cargos[length]; + if (result == null) { + result = getCargoImpl(length); + cargos[length] = result; + } + return result; + } + return getCargoImpl(length); + } + + private String getCargoImpl(int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = length; --i >= 0;) { + sb.append('a'); + } + return sb.toString(); + } + } + + /** + * Clients listen on different messages in the topic. The 'TYPE' property + * helps the client to select the proper messages. + */ + private enum ClientType { + A("a", "b", "c"), B("c", "d", "e"), C("d", "e", "f"), D("g", "h"); + + public final String[] messageTypes; + public final HashSet messageTypeSet; + public final String selector; + + ClientType(String... messageTypes) { + this.messageTypes = messageTypes; + messageTypeSet = new HashSet(Arrays.asList(messageTypes)); + + StringBuilder sb = new StringBuilder("TYPE in ("); + for (int i = 0; i < messageTypes.length; i++) { + if (i > 0) + sb.append(", "); + sb.append('\'').append(messageTypes[i]).append('\''); + } + sb.append(')'); + selector = sb.toString(); + } + + public static ClientType randomClientType() { + return values()[DurableSubProcessConcurrentCommitActivateNoDuplicateTest + .random(values().length - 1)]; + } + + public final String randomMessageType() { + return messageTypes[DurableSubProcessConcurrentCommitActivateNoDuplicateTest + .random(messageTypes.length - 1)]; + } + + public static String randomNonRelevantMessageType() { + return Integer + .toString(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(20)); + } + + public final boolean isRelevant(String messageType) { + return messageTypeSet.contains(messageType); + } + + @Override + public final String toString() { + return this.name() /* + '[' + selector + ']' */; + } + } + + /** + * Creates new cliens. + */ + private final class ClientManager extends Thread { + + private int clientRover = 0; + + private final CopyOnWriteArrayList clients = new CopyOnWriteArrayList(); + + private boolean end; + + public ClientManager() { + super("ClientManager"); + setDaemon(true); + } + + public synchronized void setEnd(boolean end) { + this.end = end; + + } + + @Override + public void run() { + try { + while (true) { + if (clients.size() < MAX_CLIENTS && !end) { + processLock.readLock().lock(); + try { + createNewClient(); + } finally { + processLock.readLock().unlock(); + } + } + + int size = clients.size(); + //sleepRandom(1000, 4000); + Thread.sleep(100); + } + } catch (Throwable e) { + exit("ClientManager.run failed.", e); + } + } + + private void createNewClient() throws JMSException { + ClientType type = ClientType.randomClientType(); + + Client client; + synchronized (server.sendMutex) { + client = new Client(++clientRover, type, CLIENT_LIFETIME, + CLIENT_ONLINE, CLIENT_OFFLINE); + clients.add(client); + } + client.start(); + + LOG.info(client.toString() + " created. " + this); + } + + public void removeClient(Client client) { + clients.remove(client); + } + + public void onServerMessage(Message message) throws JMSException { + for (Client client : clients) { + client.onServerMessage(message); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ClientManager[count="); + sb.append(clients.size()); + sb.append(", clients="); + boolean sep = false; + for (Client client : clients) { + if (sep) + sb.append(", "); + else + sep = true; + sb.append(client.toString()); + } + sb.append(']'); + return sb.toString(); + } + } + + /** + * Consumes massages from a durable subscription. Goes online/offline + * periodically. Checks the incoming messages against the sent messages of + * the server. + */ + private final class Client extends Thread { + + String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?" + + "jms.watchTopicAdvisories=false&" + + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + + "jms.producerWindowSize=20971520&" + + "jms.copyMessageOnSend=false&" + + "jms.sendAcksAsync=false&" + + "initialReconnectDelay=100&maxReconnectDelay=30000&" + + "useExponentialBackOff=true"; + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + public static final String SUBSCRIPTION_NAME = "subscription"; + + private final int id; + private final String conClientId; + + private final Random lifetime; + private final Random online; + private final Random offline; + + private final ClientType clientType; + private final String selector; + + private final ConcurrentLinkedQueue waitingList = new ConcurrentLinkedQueue(); + private final HashSet processed = CHECK_REDELIVERY ? new HashSet( + 10000) : null; + + private ActiveMQMessageConsumer consumer = null; + + public Client(int id, ClientType clientType, Random lifetime, + Random online, Random offline) throws JMSException { + super("Client" + id); + setDaemon(true); + + this.id = id; + conClientId = "cli" + id; + this.clientType = clientType; + selector = "(COMMIT = true and RELEVANT = true) or " + + clientType.selector; + + this.lifetime = lifetime; + this.online = online; + this.offline = offline; + + subscribe(); + } + + @Override + public void run() { + long end = System.currentTimeMillis() + 60000; + try { + boolean sleep = false; + while (true) { + long max = end - System.currentTimeMillis(); + if (max <= 0) + break; + + /* + if (sleep) + offline.sleepRandom(); + else + sleep = true; + */ + + Thread.sleep(100); + + processLock.readLock().lock(); + onlineCount.incrementAndGet(); + try { + process(online.next()); + } finally { + onlineCount.decrementAndGet(); + processLock.readLock().unlock(); + } + } + + if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0) + unsubscribe(); + else { + LOG.info("Client abandon the subscription. " + + this); + + // housekeeper should sweep these abandoned subscriptions + houseKeeper.abandonedSubscriptions.add(conClientId); + } + } catch (Throwable e) { + exit(toString() + " failed.", e); + } + + clientManager.removeClient(this); + LOG.info(toString() + " DONE."); + } + + private void process(long millis) throws JMSException { + //long end = System.currentTimeMillis() + millis; + long end = System.currentTimeMillis() + 200; + long hardEnd = end + 20000; // wait to finish the transaction. + boolean inTransaction = false; + int transCount = 0; + + Connection con = openConnection(); + Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = (ActiveMQMessageConsumer) sess.createDurableSubscriber(topic, + SUBSCRIPTION_NAME, selector, false); + LOG.info(toString() + " ONLINE."); + try { + do { + long max = end - System.currentTimeMillis(); + if (max <= 0) { + if (!inTransaction) { + LOG.info(toString() + " done after no work!"); + break; + } + + max = hardEnd - System.currentTimeMillis(); + if (max <= 0) + exit("" + this + + " failed: Transaction is not finished."); + } + + Message message = consumer.receive(max); + if (message == null) + continue; + + onClientMessage(message); + + if (message.propertyExists("COMMIT")) { + message.acknowledge(); // CLIENT_ACKNOWLEDGE + + int trans = message.getIntProperty("TRANS"); + LOG.info("Received Trans[id=" + + trans + ", count=" + + transCount + "] in " + this + "."); + + inTransaction = false; + transCount = 0; + + int committing = server.committingTransaction; + if (committing == trans) { + LOG.info("Going offline during transaction commit. messageID=" + message.getIntProperty("ID")); + break; + } + } else { + inTransaction = true; + transCount++; + if (1 == transCount) { + LOG.info("In Trans[id=" + message.getIntProperty("TRANS") + "] first ID=" + message.getIntProperty("ID")); + } + } + } while (true); + } finally { + sess.close(); + con.close(); + + LOG.info(toString() + " OFFLINE."); + + // Check if the messages are in the waiting + // list for long time. + Message topMessage = waitingList.peek(); + if (topMessage != null) + checkDeliveryTime(topMessage); + } + } + + public void onServerMessage(Message message) throws JMSException { + if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) { + if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT"))) + waitingList.add(message); + } else { + String messageType = message.getStringProperty("TYPE"); + if (clientType.isRelevant(messageType)) + waitingList.add(message); + } + } + + public void onClientMessage(Message message) { + Message serverMessage = waitingList.poll(); + try { + Integer receivedId = (Integer) message.getObjectProperty("ID"); + if (processed != null && processed.contains(receivedId)) + LOG.info("! Message has been processed before. " + + this + " redeliveredFlag=" + message.getJMSRedelivered() + ", message = " + message); + + if (serverMessage == null) + exit("" + + this + + " failed: There is no next server message, but received: " + + message); + + Integer serverId = (Integer) serverMessage + .getObjectProperty("ID"); + if (receivedId == null || serverId == null) + exit("" + this + " failed: message ID not found.\r\n" + + " received: " + message + "\r\n" + " server: " + + serverMessage); + + if (!serverId.equals(receivedId)) { + StringBuilder missingList = new StringBuilder(); + Object lastTrans = null; + int transCount = 0; + Message nextServerMessage = serverMessage; + do { + Integer nextServerId = (Integer) nextServerMessage.getObjectProperty("ID"); + if (nextServerId.equals(receivedId)) { + if (lastTrans != null) + missingList.append("Missing TRANS=").append(lastTrans).append(", size=").append(transCount).append("\r\n"); + break; + } + + Object trans = nextServerMessage.getObjectProperty("TRANS"); + if (!trans.equals(lastTrans)) { + if (lastTrans != null) + missingList.append("Missing TRANS=").append(lastTrans).append(", size=").append(transCount).append("\r\n"); + lastTrans = trans; + transCount = 1; + } + else + transCount++; + } while ((nextServerMessage = waitingList.poll()) != null); + + exit("Missing messages!\r\n" + missingList + + "Received message: " + message + "\r\n" + + "Expected message: " + serverMessage); + } + + checkDeliveryTime(message); + + if (processed != null) + processed.add(receivedId); + } catch (Throwable e) { + exit("" + this + ".onClientMessage failed.\r\n" + " received: " + + message + "\r\n" + " server: " + serverMessage, e); + } + } + + /** + * Checks if the message was not delivered fast enough. + */ + public void checkDeliveryTime(Message message) throws JMSException { + long creation = message.getJMSTimestamp(); + long min = System.currentTimeMillis() - (offline.max + online.min) + * (BROKER_RESTART > 0 ? 4 : 1); + + if (false && min > creation) { + SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS"); + exit("" + this + ".checkDeliveryTime failed. Message time: " + + df.format(new Date(creation)) + ", min: " + + df.format(new Date(min)) + "\r\n" + message); + } + } + + private Connection openConnection() throws JMSException { + Connection con = cf.createConnection(); + con.setClientID(conClientId); + ((ActiveMQConnection) con).setCloseTimeout(60000); + con.start(); + return con; + } + + private void subscribe() throws JMSException { + processLock.readLock().lock(); + try { + Connection con = openConnection(); + Session session = con + .createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, + true); + session.close(); + con.close(); + } + finally { + processLock.readLock().unlock(); + } + } + + private void unsubscribe() throws JMSException { + processLock.readLock().lock(); + LOG.info("Unsubscribe: " + this); + try { + Connection con = openConnection(); + Session session = con + .createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe(SUBSCRIPTION_NAME); + session.close(); + con.close(); + } + finally { + processLock.readLock().unlock(); + } + } + + @Override + public String toString() { + return "Client[id=" + id + ", type=" + clientType + "] consumerId=" + (consumer != null ? consumer.getConsumerId() : "null"); + } + } + + /** + * Sweeps out not-used durable subscriptions. + */ + private final class HouseKeeper extends Thread { + + private HouseKeeper() { + super("HouseKeeper"); + setDaemon(true); + } + + public final CopyOnWriteArrayList abandonedSubscriptions = new CopyOnWriteArrayList(); + + @Override + public void run() { + while (true) { + try { + Thread.sleep(3 * 60 * 1000); + + processLock.readLock().lock(); + try { + sweep(); + } finally { + processLock.readLock().unlock(); + } + } catch (InterruptedException ex) { + break; + } catch (Throwable e) { + Exception log = new Exception("HouseKeeper failed.", e); + log.printStackTrace(); + } + } + } + + private void sweep() throws Exception { + LOG.info("Housekeeper sweeping."); + + int closed = 0; + ArrayList sweeped = new ArrayList(); + try { + for (String clientId : abandonedSubscriptions) { + LOG.info("Sweeping out subscription of " + + clientId + "."); + broker.getAdminView().destroyDurableSubscriber(clientId, + Client.SUBSCRIPTION_NAME); + sweeped.add(clientId); + closed++; + } + } catch (Exception ignored) { + LOG.info("Ex on destroy sub " + ignored); + } finally { + abandonedSubscriptions.removeAll(sweeped); + } + + LOG.info("Housekeeper sweeped out " + closed + + " subscriptions."); + } + } + + public static int random(int max) { + return (int) (Math.random() * (max + 1)); + } + + public static int random(int min, int max) { + return random(max - min) + min; + } + + public static void sleepRandom(int maxMillis) throws InterruptedException { + Thread.sleep(random(maxMillis)); + } + + public static void sleepRandom(int minMillis, int maxMillis) + throws InterruptedException { + Thread.sleep(random(minMillis, maxMillis)); + } + + public static final class Random { + + final int min; + final int max; + + Random(int min, int max) { + this.min = min; + this.max = max; + } + + public int next() { + return random(min, max); + } + + public void sleepRandom() throws InterruptedException { + DurableSubProcessConcurrentCommitActivateNoDuplicateTest.sleepRandom(min, max); + } + } + + public static void exit(String message) { + exit(message, null); + } + + public static void exit(String message, Throwable e) { + Throwable cause = new RuntimeException(message, e); + LOG.error(message, cause); + exceptions.add(cause); + ThreadTracker.result(); + //fail(cause.toString()); + System.exit(-9); + } + + @Before + public void setUp() throws Exception { + topic = new ActiveMQTopic("TopicT"); + startBroker(); + + clientManager = new ClientManager(); + server = new Server(); + houseKeeper = new HouseKeeper(); + + } + + @After + public void tearDown() throws Exception { + destroyBroker(); + } + + private enum Persistence { + MEMORY, AMQ, KAHA, KAHADB + } + + private void startBroker() throws Exception { + startBroker(true); + } + + private void startBroker(boolean deleteAllMessages) throws Exception { + if (broker != null) + return; + + broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")"); + broker.setBrokerName(getName()); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + + switch (PERSISTENT_ADAPTER) { + case MEMORY: + broker.setPersistent(false); + break; + + case AMQ: + File amqData = new File("activemq-data/" + getName() + "-amq"); + if (deleteAllMessages) + delete(amqData); + + broker.setPersistent(true); + AMQPersistenceAdapter amq = new AMQPersistenceAdapter(); + amq.setDirectory(amqData); + broker.setPersistenceAdapter(amq); + break; + + case KAHA: + File kahaData = new File("activemq-data/" + getName() + "-kaha"); + if (deleteAllMessages) + delete(kahaData); + + broker.setPersistent(true); + KahaPersistenceAdapter kaha = new KahaPersistenceAdapter(); + kaha.setDirectory(kahaData); + broker.setPersistenceAdapter(kaha); + break; + + case KAHADB: + File kahadbData = new File("activemq-data/" + getName() + "-kahadb"); + if (deleteAllMessages) + delete(kahadbData); + + broker.setPersistent(true); + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setDirectory(kahadbData); + kahadb.setJournalMaxFileLength(5 * 1024 * 1024); + broker.setPersistenceAdapter(kahadb); + break; + } + + broker.addConnector("tcp://localhost:61656"); + + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 1024); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMaxAuditDepth(20000); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); + } + + protected static String getName() { + return "DurableSubProcessWithRestartTest"; + } + + private static boolean delete(File path) { + if (path == null) + return true; + + if (path.isDirectory()) { + for (File file : path.listFiles()) { + delete(file); + } + } + return path.delete(); + } + + private void destroyBroker() throws Exception { + if (broker == null) + return; + + broker.stop(); + broker = null; + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java index e042b30ef4..147d29566d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java @@ -214,7 +214,7 @@ public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBroke } if (failover) { - options += "?maxReconnectAttempts=1)"; + options += "?maxReconnectAttempts=0)"; } options += "?useExponentialBackOff=" + exponentialBackOff; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 64c1f28081..db50f1d619 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -16,7 +16,12 @@ */ package org.apache.activemq.usecases; +import java.util.Random; +import java.util.HashSet; import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -33,9 +38,8 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageId; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.Wait; @@ -51,7 +55,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp public boolean keepDurableSubsActive = true; private BrokerService broker; private ActiveMQTopic topic; - private Vector exceptions = new Vector(); + private Vector exceptions = new Vector(); protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); @@ -962,6 +966,95 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } + public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception { + final int messageCount = 1000; + Connection con = null; + Session session = null; + final int numConsumers = 10; + for (int i = 0; i <= numConsumers; i++) { + con = createConnection("cli" + i); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + } + + class CheckForDupsClient implements Runnable { + HashSet ids = new HashSet(); + final int id; + + public CheckForDupsClient(int id) { + this.id = id; + } + + @Override + public void run() { + try { + Connection con = createConnection("cli" + id); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + for (int j=0;j<2;j++) { + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + for (int i = 0; i < messageCount/2; i++) { + Message message = consumer.receive(4000); + assertNotNull(message); + long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId(); + assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId)); + } + consumer.close(); + } + + // verify no duplicates left + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + Message message = consumer.receive(4000); + if (message != null) { + long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId(); + assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId)); + } + assertNull(message); + + + session.close(); + con.close(); + } catch (Throwable e) { + e.printStackTrace(); + exceptions.add(e); + } + } + } + + + final String payLoad = new String(new byte[1000]); + con = createConnection(); + final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = sendSession.createProducer(topic); + for (int i = 0; i < messageCount; i++) { + producer.send(sendSession.createTextMessage(payLoad)); + } + + ExecutorService executorService = Executors.newCachedThreadPool(); + + // concurrent commit and activate + executorService.execute(new Runnable() { + @Override + public void run() { + try { + sendSession.commit(); + } catch (JMSException e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }); + for (int i = 0; i < numConsumers; i++) { + executorService.execute(new CheckForDupsClient(i)); + } + + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + con.close(); + + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception { // create offline subs 1 @@ -1292,6 +1385,120 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } } + + public void testRedeliveryFlag() throws Exception { + + Connection con; + Session session; + final int numClients = 2; + for (int i=0; i index = ((BTreeIndex)this.index); this.index.load(tx); tx.commit(); @@ -140,6 +143,8 @@ public class BTreeIndexTest extends IndexTestSupport { this.index.load(tx); tx.commit(); + exerciseAnotherIndex(tx); + // BTree should iterate it in sorted order. int counter=0; for (Iterator> i = index.iterator(tx); i.hasNext();) { @@ -189,18 +194,77 @@ public class BTreeIndexTest extends IndexTestSupport { createPageFileAndIndex(100); BTreeIndex index = ((BTreeIndex)this.index); this.index.load(tx); + + long id = tx.allocate().getPageId(); tx.commit(); - final int count = 4000; - doInsert(count); + BTreeIndex sindex = new BTreeIndex(pf, id); + sindex.setKeyMarshaller(StringMarshaller.INSTANCE); + sindex.setValueMarshaller(StringMarshaller.INSTANCE); + sindex.load(tx); + + tx.commit(); + + final int count = 5000; + + String payload = new String(new byte[2]); + for (int i = 0; i < count; i++) { + index.put(tx, key(i), (long)i); + sindex.put(tx, key(i), String.valueOf(i) + payload); + tx.commit(); + } + Random rand = new Random(System.currentTimeMillis()); int i = 0, prev = 0; - while (!index.isEmpty(tx)) { + while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) { prev = i; i = rand.nextInt(count); try { index.remove(tx, key(i)); + sindex.remove(tx, key(i)); + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e); + } + } + } + + public void testRandomAddRemove() throws Exception { + + createPageFileAndIndex(1024); + BTreeIndex index = ((BTreeIndex)this.index); + this.index.load(tx); + + long id = tx.allocate().getPageId(); + tx.commit(); + + BTreeIndex sindex = new BTreeIndex(pf, id); + sindex.setKeyMarshaller(StringMarshaller.INSTANCE); + sindex.setValueMarshaller(StringMarshaller.INSTANCE); + sindex.load(tx); + + tx.commit(); + + Random rand = new Random(System.currentTimeMillis()); + final int count = 50000; + + String payload = new String(new byte[200]); + for (int i = 0; i < count; i++) { + int insertIndex = rand.nextInt(count); + index.put(tx, key(insertIndex), (long)insertIndex); + sindex.put(tx, key(insertIndex), String.valueOf(insertIndex) + payload); + tx.commit(); + } + + + int i = 0, prev = 0; + while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) { + prev = i; + i = rand.nextInt(count); + try { + index.remove(tx, key(i)); + sindex.remove(tx, key(i)); } catch (Exception e) { e.printStackTrace(); fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e); @@ -219,13 +283,46 @@ public class BTreeIndexTest extends IndexTestSupport { index.remove(tx, key(3697)); index.remove(tx, key(1566)); + + tx.commit(); + index.clear(tx); + tx.commit(); + + doInsert(count); + + Iterator> iterator = index.iterator(tx, key(1345)); + while (iterator.hasNext()) { + Map.Entry val = iterator.next(); + } + + doRemoveBackwards(666); + Map.Entry first = index.getFirst(tx); + assertEquals(first.getValue(), Long.valueOf(666L)); + + for (int i=0; i<2000; i++) { + Map.Entry last = index.getLast(tx); + index.remove(tx, last.getKey()); + tx.commit(); + } + + exerciseAnotherIndex(tx); + + iterator = index.iterator(tx, key(100)); + while (iterator.hasNext()) { + Map.Entry val = iterator.next(); + } + + Map.Entry last = index.getLast(tx); + assertEquals(last.getValue(), Long.valueOf(1999L)); + index.clear(tx); + assertNull(index.getLast(tx)); } public void testLargeValue() throws Exception { //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024)); pf = new PageFile(directory, getClass().getName()); pf.setPageSize(4*1024); - pf.setEnablePageCaching(false); + //pf.setEnablePageCaching(false); pf.load(); tx = pf.tx(); long id = tx.allocate().getPageId(); @@ -262,6 +359,7 @@ public class BTreeIndexTest extends IndexTestSupport { tx.commit(); tx = pf.tx(); for (long i=0; i test = new ListIndex(pf, id); + test.setKeyMarshaller(StringMarshaller.INSTANCE); + test.setValueMarshaller(StringMarshaller.INSTANCE); + test.load(tx); + tx.commit(); + + final int count = 10000; + + String payload = new String(new byte[1]); + for (int i = 0; i < count; i++) { + test.put(tx, key(i), String.valueOf(i) + payload); + } + tx.commit(); + + test.clear(tx); + tx.commit(); + } + + public void testListIndexConsistancyOverTime() throws Exception { + + final int NUM_ITERATIONS = 50; + + pf = new PageFile(directory, getClass().getName()); + pf.setPageSize(4*1024); + //pf.setEnablePageCaching(false); + pf.setWriteBatchSize(1); + pf.load(); + tx = pf.tx(); + long id = tx.allocate().getPageId(); + + ListIndex test = new ListIndex(pf, id); + test.setKeyMarshaller(StringMarshaller.INSTANCE); + test.setValueMarshaller(StringMarshaller.INSTANCE); + test.load(tx); + tx.commit(); + + int expectedListEntries = 0; + int nextSequenceId = 0; + + LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences."); + + for (int i = 0; i < NUM_ITERATIONS; ++i) { + test.add(tx, String.valueOf(expectedListEntries++), new String("AA")); + + for (int j = 0; j < expectedListEntries; j++) { + + String sequenceSet = test.get(tx, String.valueOf(j)); + + int startSequenceId = nextSequenceId; + for (int ix = 0; ix < NUM_ITERATIONS; ix++) { + sequenceSet.concat(String.valueOf(nextSequenceId++)); + test.put(tx, String.valueOf(j), sequenceSet); + } + + sequenceSet = test.get(tx, String.valueOf(j)); + + for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) { + //sequenceSet.remove(startSequenceId++); + test.put(tx, String.valueOf(j), String.valueOf(j)); + } + } + } + + exerciseAnotherIndex(tx); + LOG.info("Checking if Index has the expected number of entries."); + + for (int i = 0; i < NUM_ITERATIONS; ++i) { + assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i))); + assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i))); + } + + LOG.info("Index has the expected number of entries."); + + assertEquals(expectedListEntries, test.size()); + + for (int i = 0; i < NUM_ITERATIONS; ++i) { + LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size()); + + assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i))); + assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i))); + LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size()); + + assertEquals(expectedListEntries - (i + 1), test.size()); + } + } + void doInsertReverse(int count) throws Exception { for (int i = count-1; i >= 0; i--) { index.put(tx, key(i), (long)i);