ensure that non-persistent messages are cleaned up from temp storage when the Queue is deleted.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1302977 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-20 16:26:07 +00:00
parent 1d7b69d55c
commit 926fdd7e92
3 changed files with 269 additions and 16 deletions

View File

@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
/** /**
* Store based Cursor for Queues * Store based Cursor for Queues
*
*
*/ */
public class StoreQueueCursor extends AbstractPendingMessageCursor { public class StoreQueueCursor extends AbstractPendingMessageCursor {
@ -42,7 +40,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
/** /**
* Construct * Construct
* @param broker * @param broker
* @param queue * @param queue
*/ */
public StoreQueueCursor(Broker broker,Queue queue) { public StoreQueueCursor(Broker broker,Queue queue) {
@ -78,6 +76,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized void stop() throws Exception { public synchronized void stop() throws Exception {
started = false; started = false;
if (nonPersistent != null) { if (nonPersistent != null) {
nonPersistent.clear();
nonPersistent.stop(); nonPersistent.stop();
nonPersistent.gc(); nonPersistent.gc();
} }
@ -101,7 +100,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
} }
} }
} }
public synchronized void addMessageFirst(MessageReference node) throws Exception { public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) { if (node != null) {
Message msg = node.getMessage(); Message msg = node.getMessage();
@ -155,9 +154,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized void reset() { public synchronized void reset() {
nonPersistent.reset(); nonPersistent.reset();
persistent.reset(); persistent.reset();
pendingCount = persistent.size() + nonPersistent.size(); pendingCount = persistent.size() + nonPersistent.size();
} }
public void release() { public void release() {
nonPersistent.release(); nonPersistent.release();
persistent.release(); persistent.release();
@ -179,7 +178,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
/** /**
* Informs the Broker if the subscription needs to intervention to recover * Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do * it's state e.g. DurableTopicSubscriber may do
* *
* @see org.apache.activemq.broker.region.cursors.PendingMessageCursor * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
* @return true if recovery required * @return true if recovery required
*/ */
@ -208,8 +207,8 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
} }
super.setMaxBatchSize(maxBatchSize); super.setMaxBatchSize(maxBatchSize);
} }
public void setMaxProducersToAudit(int maxProducersToAudit) { public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit); super.setMaxProducersToAudit(maxProducersToAudit);
if (persistent != null) { if (persistent != null) {
@ -229,7 +228,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
nonPersistent.setMaxAuditDepth(maxAuditDepth); nonPersistent.setMaxAuditDepth(maxAuditDepth);
} }
} }
public void setEnableAudit(boolean enableAudit) { public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit); super.setEnableAudit(enableAudit);
if (persistent != null) { if (persistent != null) {
@ -239,7 +238,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
nonPersistent.setEnableAudit(enableAudit); nonPersistent.setEnableAudit(enableAudit);
} }
} }
@Override @Override
public void setUseCache(boolean useCache) { public void setUseCache(boolean useCache) {
super.setUseCache(useCache); super.setUseCache(useCache);
@ -250,7 +249,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
nonPersistent.setUseCache(useCache); nonPersistent.setUseCache(useCache);
} }
} }
@Override @Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
@ -191,6 +192,10 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
} }
} }
public Journal getJournal() {
return this.journal;
}
public File getDirectory() { public File getDirectory() {
return directory; return directory;
} }
@ -354,9 +359,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
public void run() { public void run() {
try { try {
if (isStopping()) { if (isStopping()) {
return; return;
} }
final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
final Set<Integer> candidates = journal.getFileMap().keySet(); final Set<Integer> candidates = journal.getFileMap().keySet();
LOG.trace("Full gc candidate set:" + candidates); LOG.trace("Full gc candidate set:" + candidates);
@ -370,7 +375,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
List<PList> plists = null; List<PList> plists = null;
synchronized (indexLock) { synchronized (indexLock) {
synchronized (this) { synchronized (this) {
plists = new ArrayList(persistentLists.values()); plists = new ArrayList<PList>(persistentLists.values());
} }
} }
for (PList list : plists) { for (PList list : plists) {
@ -481,5 +486,4 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "PListStore:[" + path + " ]"; return "PListStore:[" + path + " ]";
} }
} }

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TempStoreDataCleanupTest {
private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue";
private final String str = new String(
"QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
private BrokerService broker;
private String connectionUri;
private ExecutorService pool;
private String queueName;
private Random r = new Random();
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setDataDirectory("target" + File.separator + "activemq-data");
broker.setPersistent(true);
broker.setUseJmx(true);
broker.setDedicatedTaskRunner(false);
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
strategy.setProcessExpired(false);
strategy.setProcessNonPersistent(false);
PolicyEntry defaultPolicy = new PolicyEntry();
defaultPolicy.setQueue(">");
defaultPolicy.setOptimizedDispatch(true);
defaultPolicy.setDeadLetterStrategy(strategy);
defaultPolicy.setMemoryLimit(9000000);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);
broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
pool = Executors.newFixedThreadPool(10);
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
if (pool != null) {
pool.shutdown();
}
}
@Test
public void testIt() throws Exception {
for (int i = 0; i < 2; i++) {
LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
queueName = QUEUE_NAME + i;
final CountDownLatch latch = new CountDownLatch(11);
pool.execute(new Runnable() {
@Override
public void run() {
receiveAndDiscard100messages(latch);
}
});
for (int j = 0; j < 10; j++) {
pool.execute(new Runnable() {
@Override
public void run() {
send10000messages(latch);
}
});
}
LOG.info("Waiting on the send / receive latch");
latch.await(5, TimeUnit.MINUTES);
LOG.info("Resumed");
destroyQueue();
TimeUnit.SECONDS.sleep(2);
}
final PListStore pa = broker.getTempDataStore();
assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return pa.getJournal().getFileMap().size() == 1;
}
}, TimeUnit.MINUTES.toMillis(3))
);
}
public void destroyQueue() {
try {
Broker broker = this.broker.getBroker();
if (!broker.isStopped()) {
LOG.info("Removing: " + queueName);
broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10);
}
} catch (Exception e) {
LOG.warn("Got an error while removing the test queue", e);
}
}
private void send10000messages(CountDownLatch latch) {
ActiveMQConnection activeMQConnection = null;
try {
activeMQConnection = createConnection(null);
Session session = activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session
.createQueue(queueName));
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
activeMQConnection.start();
for (int i = 0; i < 10000; i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(generateBody(1000));
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(textMessage);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
producer.close();
} catch (JMSException e) {
LOG.warn("Got an error while sending the messages", e);
} finally {
if (activeMQConnection != null) {
try {
activeMQConnection.close();
} catch (JMSException e) {
}
}
}
latch.countDown();
}
private void receiveAndDiscard100messages(CountDownLatch latch) {
ActiveMQConnection activeMQConnection = null;
try {
activeMQConnection = createConnection(null);
Session session = activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(
session.createQueue(queueName));
activeMQConnection.start();
for (int i = 0; i < 100; i++) {
messageConsumer.receive();
}
messageConsumer.close();
LOG.info("Created and disconnected");
} catch (JMSException e) {
LOG.warn("Got an error while receiving the messages", e);
} finally {
if (activeMQConnection != null) {
try {
activeMQConnection.close();
} catch (JMSException e) {
}
}
}
latch.countDown();
}
private ActiveMQConnection createConnection(String id) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
if (id != null) {
factory.setClientID(id);
}
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
return connection;
}
private String generateBody(int length) {
StringBuilder sb = new StringBuilder();
int te = 0;
for (int i = 1; i <= length; i++) {
te = r.nextInt(62);
sb.append(str.charAt(te));
}
return sb.toString();
}
}