diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index cf984c6c66..67bbdb6953 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -18,9 +18,33 @@ package org.apache.activemq.store.kahadb; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.*; -import java.util.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -32,7 +56,18 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.data.*; +import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaCommitCommand; +import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaEntryType; +import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; +import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; +import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; +import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; @@ -46,7 +81,17 @@ import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; -import org.apache.kahadb.util.*; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.DataByteArrayInputStream; +import org.apache.kahadb.util.DataByteArrayOutputStream; +import org.apache.kahadb.util.LocationMarshaller; +import org.apache.kahadb.util.LockFile; +import org.apache.kahadb.util.LongMarshaller; +import org.apache.kahadb.util.Marshaller; +import org.apache.kahadb.util.Sequence; +import org.apache.kahadb.util.SequenceSet; +import org.apache.kahadb.util.StringMarshaller; +import org.apache.kahadb.util.VariableMarshaller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1284,6 +1329,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionCache.remove(subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey); + + if (sd.subscriptions.isEmpty(tx)) { + sd.messageIdIndex.clear(tx); + sd.locationIndex.clear(tx); + } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java new file mode 100644 index 0000000000..d3d9f9e29c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DurableSubSelectorDelayTest { + + private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayTest.class); + + public static final long RUNTIME = 3 * 60 * 1000; + + private BrokerService broker; + private ActiveMQTopic topic; + + @Test + public void testProcess() throws Exception { + + MsgProducer msgProducer = new MsgProducer(); + msgProducer.start(); + + DurableSubscriber subscribers[] = new DurableSubscriber[10]; + + for (int i = 0; i < subscribers.length; i++) { + subscribers[i] = new DurableSubscriber(i); + subscribers[i].process(); + } + + // wait for server to finish + msgProducer.join(); + + for (int j = 0; j < subscribers.length; j++) { + + LOG.info("Unsubscribing subscriber " + subscribers[j]); + + // broker.getAdminView().destroyDurableSubscriber(clientID, + // Client.SUBSCRIPTION_NAME); + + subscribers[j].unsubscribe(); + } + + // allow the clean up thread time to run + TimeUnit.MINUTES.sleep(2); + + final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + assertTrue("only one journal file should be left ", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return pa.getStore().getJournal().getFileMap().size() == 1; + } + }, TimeUnit.MINUTES.toMillis(2))); + + LOG.info("DONE."); + } + + /** + * Message Producer + */ + final class MsgProducer extends Thread { + + final String url = "vm://" + + DurableSubSelectorDelayTest.getName(); + + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + int transRover = 0; + int messageRover = 0; + + public MsgProducer() { + super("MsgProducer"); + setDaemon(true); + } + + @Override + public void run() { + long endTime = RUNTIME + System.currentTimeMillis(); + + try { + while (endTime > System.currentTimeMillis()) { + Thread.sleep(400); + send(); + } + } catch (Throwable e) { + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + + public void send() throws JMSException { + + int trans = ++transRover; + boolean relevantTrans = true; + int count = 40; + + LOG.info("Sending Trans[id=" + trans + ", count=" + + count + "]"); + + Connection con = cf.createConnection(); + + Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(null); + + for (int i = 0; i < count; i++) { + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + message.setIntProperty("TRANS", trans); + message.setBooleanProperty("RELEVANT", false); + prod.send(topic, message); + } + + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + message.setIntProperty("TRANS", trans); + message.setBooleanProperty("COMMIT", true); + message.setBooleanProperty("RELEVANT", relevantTrans); + prod.send(topic, message); + + LOG.info("Committed Trans[id=" + trans + ", count=" + + count + "], ID=" + messageRover); + + sess.close(); + con.close(); + } + } + + /** + * Consumes massages from a durable subscription. Goes online/offline + * periodically. Checks the incoming messages against the sent messages of + * the server. + */ + private final class DurableSubscriber { + + String url = "tcp://localhost:61656"; + + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + private final String subName ; + + private final int id; + private final String conClientId; + private final String selector; + + public DurableSubscriber(int id) throws JMSException { + this.id = id; + conClientId = "cli" + id; + subName = "subscription"+ id; + selector ="RELEVANT = true"; + } + + private void process() throws JMSException { + long end = System.currentTimeMillis() + 20000; + int transCount = 0; + + LOG.info(toString() + " ONLINE."); + Connection con = openConnection(); + + Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + MessageConsumer consumer = sess.createDurableSubscriber(topic, + subName, selector, false); + + //MessageConsumer consumer = sess.createDurableSubscriber(topic,SUBSCRIPTION_NAME); + try { + do { + long max = end - System.currentTimeMillis(); + + if (max <= 0) { + break; + } + + Message message = consumer.receive(max); + if (message == null) + continue; + + LOG.info("Received Trans[id=" + + message.getIntProperty("TRANS") + ", count=" + + transCount + "] in " + this + "."); + + } while (true); + } finally { + sess.close(); + con.close(); + + LOG.info(toString() + " OFFLINE."); + } + } + + private Connection openConnection() throws JMSException { + Connection con = cf.createConnection(); + con.setClientID(conClientId); + con.start(); + return con; + } + + private void unsubscribe() throws JMSException { + Connection con = openConnection(); + Session session = con + .createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe(subName); + session.close(); + con.close(); + } + + @Override + public String toString() { + return "DurableSubscriber[id=" + id + "]"; + } + } + + @Before + public void setUp() throws Exception { + topic = new ActiveMQTopic("TopicT"); + startBroker(); + } + + @After + public void tearDown() throws Exception { + destroyBroker(); + } + + private void startBroker() throws Exception { + startBroker(true); + } + + private void startBroker(boolean deleteAllMessages) throws Exception { + if (broker != null) + return; + + broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")"); + broker.setBrokerName(getName()); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + + File kahadbData = new File("activemq-data/" + getName() + "-kahadb"); + if (deleteAllMessages) + delete(kahadbData); + + broker.setPersistent(true); + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setDirectory(kahadbData); + kahadb.setJournalMaxFileLength( 500 * 1024); + broker.setPersistenceAdapter(kahadb); + + broker.addConnector("tcp://localhost:61656"); + + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024); + + broker.start(); + } + + protected static String getName() { + return "DurableSubSelectorDelayTest"; + } + + private static boolean delete(File path) { + if (path == null) + return true; + + if (path.isDirectory()) { + for (File file : path.listFiles()) { + delete(file); + } + } + return path.delete(); + } + + private void destroyBroker() throws Exception { + if (broker == null) + return; + + broker.stop(); + broker = null; + } +}