diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java index 842c224eed..419e10b467 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java @@ -66,4 +66,15 @@ public class StoreUsage extends Usage { return super.getPercentUsage(); } } + + @Override + public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { + if (parent != null) { + if (parent.waitForSpace(timeout, highWaterMark)) { + return true; + } + } + + return super.waitForSpace(timeout, highWaterMark); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java b/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java new file mode 100644 index 0000000000..30c4e3766d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.ProducerThread; +import org.apache.activemq.util.Wait; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +public class StoreUsageTest extends EmbeddedBrokerTestSupport { + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.getSystemUsage().getStoreUsage().setLimit(10 * 1024); + broker.deleteAllMessages(); + return broker; + } + + protected boolean isPersistent() { + return true; + } + + public void testJmx() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = sess.createQueue(this.getClass().getName()); + final ProducerThread producer = new ProducerThread(sess, dest); + producer.start(); + + // wait for the producer to block + Thread.sleep(5000); + + broker.getAdminView().setStoreLimit(1024 * 1024); + + Thread.sleep(5000); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return producer.getSentCount() == producer.getMessageCount(); + } + }, 5000); + + assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount()); + + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java b/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java index b9e9ba8ae2..84e4100e56 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java @@ -32,6 +32,7 @@ public class ProducerThread extends Thread { Destination dest; Session sess; int sleep = 0; + int sentCount = 0; public ProducerThread(Session sess, Destination dest) { this.dest = dest; @@ -42,9 +43,9 @@ public class ProducerThread extends Thread { MessageProducer producer = null; try { producer = sess.createProducer(dest); - for (int i = 0; i < messageCount; i++) { - producer.send(sess.createTextMessage("test message: " + i)); - LOG.info("Sent 'test message: " + i + "'"); + for (sentCount = 0; sentCount < messageCount; sentCount++) { + producer.send(sess.createTextMessage("test message: " + sentCount)); + LOG.info("Sent 'test message: " + sentCount + "'"); if (sleep > 0) { Thread.sleep(sleep); } @@ -70,4 +71,12 @@ public class ProducerThread extends Thread { public void setSleep(int sleep) { this.sleep = sleep; } + + public int getMessageCount() { + return messageCount; + } + + public int getSentCount() { + return sentCount; + } }