From e02dfdeaadb710e1662f8576566330dd3d1ba7e5 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 19 Jan 2012 13:33:19 +0000 Subject: [PATCH] move test to KahaDb and add simple validation of sendFailIfNoSpace and temp usage for non persistent messages. Producer stops on the exception, consumer picks up all the messages git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1233367 13f79535-47bb-0310-9956-ffa450edef68 --- .../bugs/TempStorageBlockedBrokerTest.java | 94 ++++++++++++------- 1 file changed, 61 insertions(+), 33 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java index 110d4849c0..463f4c3af9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.bugs; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -29,29 +26,26 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; -import org.apache.activemq.store.kahadb.plist.PListStore; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.TempUsage; -import org.apache.activemq.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -public class TempStorageBlockedBrokerTest { +public class TempStorageBlockedBrokerTest extends TestSupport { - public boolean consumeAll = false; public int deliveryMode = DeliveryMode.PERSISTENT; private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class); @@ -61,12 +55,11 @@ public class TempStorageBlockedBrokerTest { AtomicInteger messagesSent = new AtomicInteger(0); AtomicInteger messagesConsumed = new AtomicInteger(0); - protected long messageReceiveTimeout = 10L; + protected long messageReceiveTimeout = 10000L; Destination destination = new ActiveMQTopic("FooTwo"); - @Test - public void runProducerWithHungConsumer() throws Exception { + public void testRunProducerWithHungConsumer() throws Exception { final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); @@ -135,7 +128,7 @@ public class TempStorageBlockedBrokerTest { while ((m = consumer.receive(messageReceiveTimeout)) != null) { count++; - if (count != 0 && count%10 == 0) { + if (count != 0 && count%100 == 0) { LOG.info("Recieved Message (" + count + "):" + m); } messagesConsumed.incrementAndGet(); @@ -154,8 +147,6 @@ public class TempStorageBlockedBrokerTest { final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); - // assertTrue("some temp store has been used", tempUsageBySubscription - // != origTempUsage); producerConnection.close(); consumerConnection.close(); @@ -173,7 +164,56 @@ public class TempStorageBlockedBrokerTest { MESSAGES_COUNT); } - @Before + public void testFillTempAndConsume() throws Exception { + + broker.getSystemUsage().setSendFailIfNoSpace(true); + destination = new ActiveMQQueue("Foo"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618"); + final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + // so we can easily catch the ResourceAllocationException on send + producerConnection.setAlwaysSyncSend(true); + producerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + try { + while (true) { + Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); + producer.send(message); + messagesSent.incrementAndGet(); + if (messagesSent.get() % 100 == 0) { + LOG.info("Sent Message " + messagesSent.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + } catch (ResourceAllocationException ex) { + LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); + } + + // consume all sent + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + + while (consumer.receive(messageReceiveTimeout) != null) { + messagesConsumed.incrementAndGet(); + if (messagesConsumed.get() % 1000 == 0) { + LOG.info("received Message " + messagesConsumed.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + + assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), + messagesSent.get()); + } + + @Override public void setUp() throws Exception { broker = new BrokerService(); @@ -183,18 +223,8 @@ public class TempStorageBlockedBrokerTest { broker.setAdvisorySupport(false); broker.setDeleteAllMessagesOnStartup(true); - AMQPersistenceAdapter persistence = new AMQPersistenceAdapter(); - persistence.setSyncOnWrite(false); - File directory = new File("target" + File.separator + "activemq-data"); - persistence.setDirectory(directory); - File tmpDir = new File(directory, "tmp"); - IOHelper.deleteChildren(tmpDir); - PListStore tempStore = new PListStore(); - tempStore.setDirectory(tmpDir); - tempStore.setJournalMaxFileLength(50*1024); - tempStore.start(); - - SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore); + setDefaultPersistenceAdapter(broker); + SystemUsage sysUsage = broker.getSystemUsage(); MemoryUsage memUsage = new MemoryUsage(); memUsage.setLimit((1024 * 1024)); StoreUsage storeUsage = new StoreUsage(); @@ -216,14 +246,12 @@ public class TempStorageBlockedBrokerTest { broker.setDestinationPolicy(policyMap); broker.setSystemUsage(sysUsage); - broker.setTempDataStore(tempStore); - broker.setPersistenceAdapter(persistence); broker.addConnector("tcp://localhost:61618").setName("Default"); broker.start(); } - @After + @Override public void tearDown() throws Exception { if (broker != null) { broker.stop();