diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 48ada3be96..561175b015 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -96,8 +96,14 @@ public class TopicSubscription extends AbstractSubscription { } } if (maximumPendingMessages != 0) { + synchronized(matchedListMutex){ + while (matched.isFull()){ + matchedListMutex.wait(20); + } + matched.addMessageLast(node); + } synchronized (matchedListMutex) { - matched.addMessageLast(node); + // NOTE - be careful about the slaveBroker! if (maximumPendingMessages > 0) { // calculate the high water mark from which point we @@ -115,7 +121,7 @@ public class TopicSubscription extends AbstractSubscription { // only page in a 1000 at a time - else we could // blow da memory pageInSize = Math.max(1000, pageInSize); - LinkedList list = null; + LinkedList list = null; MessageReference[] oldMessages=null; synchronized(matched){ list = matched.pageInList(pageInSize); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 19225eb319..1b228a6bee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -141,7 +141,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple destroyDiskList(); } - private void destroyDiskList() { + private void destroyDiskList() throws Exception { if (!isDiskListEmpty()) { Iterator iterator = diskList.iterator(); while (iterator.hasNext()) { @@ -149,7 +149,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple iterator.remove(); } diskList.clear(); - } + } + store.deleteListContainer(name, "TopicSubscription"); } public synchronized LinkedList pageInList(int maxItems) { @@ -318,10 +319,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple last=null; } - public synchronized boolean isFull() { - // we always have space - as we can persist to disk - return false; - } + public synchronized boolean isFull() { + + return super.isFull() + || (systemUsage != null && systemUsage.getTempUsage().isFull()); + + } public boolean hasMessagesBufferedToDeliver() { return !isEmpty(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java new file mode 100644 index 0000000000..ffe3c3675c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.kaha.Store; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.IOHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TempStorageBlockedBrokerTest { + + public boolean consumeAll = false; + public int deliveryMode = DeliveryMode.PERSISTENT; + + private static final Log LOG = LogFactory.getLog(TempStorageBlockedBrokerTest.class); + private static final int MESSAGES_COUNT = 1000; + private static byte[] buf = new byte[4 * 1024]; + private BrokerService broker; + AtomicInteger messagesSent = new AtomicInteger(0); + AtomicInteger messagesConsumed = new AtomicInteger(0); + + protected long messageReceiveTimeout = 10L; + + Destination destination = new ActiveMQTopic("FooTwo"); + + @Test + public void runProducerWithHungConsumer() throws Exception { + + final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618"); + // ensure messages are spooled to disk for this consumer + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setTopicPrefetch(10); + factory.setPrefetchPolicy(prefetch); + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + + Thread producingThread = new Thread("Producing thread") { + public void run() { + try { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + Message message = session.createTextMessage(new String(buf) + idx); + + producer.send(message); + messagesSent.incrementAndGet(); + Thread.sleep(10); + LOG.info("Sent Message " + idx); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + + } + producer.close(); + session.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + producingThread.start(); + + int count = 0; + + Message m = null; + while ((m = consumer.receive(messageReceiveTimeout)) != null) { + count++; + LOG.info("Recieved Message (" + count + "):" + m); + messagesConsumed.incrementAndGet(); + try { + Thread.sleep(100); + } catch (Exception e) { + LOG.info("error sleeping"); + } + } + + LOG.info("Connection Timeout: Retrying"); + + // session.close(); + // consumerConnection.close(); + // + // consumerConnection2.start(); + // session2 = consumerConnection2.createSession(false, + // Session.AUTO_ACKNOWLEDGE); + // consumer = session2.createConsumer(destination); + + while ((m = consumer.receive(messageReceiveTimeout)) != null) { + count++; + LOG.info("Recieved Message (" + count + "):" + m); + messagesConsumed.incrementAndGet(); + try { + Thread.sleep(100); + } catch (Exception e) { + LOG.info("error sleeping"); + } + } + + LOG.info("consumer session closing: consumed count: " + count); + + consumerSession.close(); + + producingThread.join(); + + final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); + LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); + // assertTrue("some temp store has been used", tempUsageBySubscription + // != origTempUsage); + + producerConnection.close(); + consumerConnection.close(); + + LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + + broker.getSystemUsage().getTempUsage().getUsage()); + + + assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT); + assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), + MESSAGES_COUNT); + } + + @Before + public void setUp() throws Exception { + + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + + AMQPersistenceAdapter persistence = new AMQPersistenceAdapter(); + persistence.setSyncOnWrite(false); + File directory = new File("target" + File.separator + "activemq-data"); + persistence.setDirectory(directory); + File tmpDir = new File(directory, "tmp"); + IOHelper.deleteChildren(tmpDir); + Store tempStore = new org.apache.activemq.kaha.impl.KahaStore(tmpDir, "rw"); + + SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore); + MemoryUsage memUsage = new MemoryUsage(); + memUsage.setLimit((1024 * 1024)); + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit((1024 * 1024) * 38); + TempUsage tmpUsage = new TempUsage(); + tmpUsage.setLimit((1024 * 1024) * 38); + + PolicyEntry defaultPolicy = new PolicyEntry(); + // defaultPolicy.setTopic("FooTwo"); + defaultPolicy.setProducerFlowControl(false); + defaultPolicy.setMemoryLimit(10 * 1024); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + sysUsage.setMemoryUsage(memUsage); + sysUsage.setStoreUsage(storeUsage); + sysUsage.setTempUsage(tmpUsage); + + broker.setDestinationPolicy(policyMap); + broker.setSystemUsage(sysUsage); + broker.setTempDataStore(tempStore); + broker.setPersistenceAdapter(persistence); + + broker.addConnector("tcp://localhost:61618").setName("Default"); + broker.start(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/store/temp/TempStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/store/temp/TempStoreTest.java new file mode 100644 index 0000000000..6b8e12ce9c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/temp/TempStoreTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.temp; + +import java.io.File; +import java.util.LinkedList; +import java.util.List; +import junit.framework.TestCase; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.kahadb.util.IOHelper; + +public class TempStoreTest extends TestCase { + protected TempStore ts; + protected final LinkedList testList = new LinkedList(); + protected List list; + + public void testSize() throws Exception { + + this.list.addAll(testList); + + assertEquals(this.list.size(), this.testList.size()); + + } + + public void testAddFirst() throws Exception { + this.list.addAll(testList); + assertEquals(this.list.size(), this.testList.size()); + Message first = createMessage(10000); + this.list.add(0, first); + assertEquals(first, this.list.get(0)); + assertEquals(this.list.size(), this.testList.size() + 1); + } + + public void testAddLast() throws Exception { + this.list.addAll(testList); + assertEquals(this.list.size(), this.testList.size()); + Message last = createMessage(10000); + this.list.add(last); + assertEquals(last, this.list.get(this.list.size()-1)); + assertEquals(this.list.size(), this.testList.size() + 1); + } + + public void testRemoveFirst() throws Exception { + this.list.addAll(testList); + assertEquals(testList.get(0), this.list.remove(0)); + assertEquals(this.list.size(), testList.size() - 1); + System.err.println(this.list.get(0).getMessageId()); + for (int i =0; i < testList.size();i++) { + System.err.println(testList.get(i).getMessageId()); + } + for (int i = 1; i < testList.size(); i++) { + assertEquals(testList.get(i), this.list.get(i - 1)); + } + } + + private Message createMessage(int seq) { + ActiveMQMessage message = new ActiveMQMessage(); + message.setCommandId((short) 1); + message.setDestination(new ActiveMQQueue("queue")); + + message.setMessageId(new MessageId("c1:1:1", seq)); + return message; + + } + + protected void setUp() throws Exception { + super.setUp(); + this.ts = new TempStore(); + File directory = new File("target/test"); + this.ts.setDirectory(directory); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + + ts.start(); + for (int i = 0; i < 10; i++) { + + Message msg = createMessage(i); + testList.add(msg); + } + this.list = ts.getList(getName()); + } + + protected void tearDown() throws Exception { + testList.clear(); + if (ts != null) { + ts.stop(); + } + } +}