[AMQ-6547] revert mod from AMQ-3143 such that waitForSpace respects child usage, fix test and validate mKahadb blocking send

This commit is contained in:
gtully 2017-02-14 11:33:50 +00:00
parent f5baebb001
commit fad50812af
3 changed files with 102 additions and 19 deletions

View File

@ -78,16 +78,6 @@ public class StoreUsage extends PercentLimitUsage<StoreUsage> {
}
}
@Override
public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
if (parent != null) {
if (parent.waitForSpace(timeout, highWaterMark)) {
return true;
}
}
return super.waitForSpace(timeout, highWaterMark);
}
@Override
protected void updateLimitBasedOnPercent() {

View File

@ -21,6 +21,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
@ -32,8 +34,14 @@ import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
@ -160,6 +168,72 @@ public class MKahaDBStoreLimitTest {
}
@Test
public void testExplicitAdapterBlockingProducer() throws Exception {
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024*8);
kahaStore.setIndexDirectory(new File(IOHelper.getDefaultDataDirectory()));
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
StoreUsage storeUsage = new StoreUsage();
storeUsage.setLimit(40*1024);
filtered.setUsage(storeUsage);
filtered.setDestination(queueA);
filtered.setPersistenceAdapter(kahaStore);
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
stores.add(filtered);
persistenceAdapter.setFilteredPersistenceAdapters(stores);
BrokerService brokerService = createBroker(persistenceAdapter);
brokerService.start();
final AtomicBoolean done = new AtomicBoolean();
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new Runnable() {
@Override
public void run() {
try {
produceMessages(queueA, 20);
done.set(true);
} catch (Exception ignored) {
}
}
});
assertTrue("some messages got to dest", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
return baseDestinationA != null && baseDestinationA.getDestinationStatistics().getMessages().getCount() > 4l;
}
}));
BaseDestination baseDestinationA = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(queueA);
// loop till producer stalled
long enqueues = 0l;
do {
enqueues = baseDestinationA.getDestinationStatistics().getEnqueues().getCount();
LOG.info("Dest Enqueues: " + enqueues);
TimeUnit.MILLISECONDS.sleep(500);
} while (enqueues != baseDestinationA.getDestinationStatistics().getEnqueues().getCount());
assertFalse("expect producer to block", done.get());
LOG.info("Store global u: " + broker.getSystemUsage().getStoreUsage().getUsage() + ", %:" + broker.getSystemUsage().getStoreUsage().getPercentUsage());
assertTrue("some usage", broker.getSystemUsage().getStoreUsage().getUsage() > 0);
LOG.info("Store A u: " + baseDestinationA.getSystemUsage().getStoreUsage().getUsage() + ", %: " + baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage());
assertTrue("limited store has more % usage than parent", baseDestinationA.getSystemUsage().getStoreUsage().getPercentUsage() > broker.getSystemUsage().getStoreUsage().getPercentUsage());
executor.shutdownNow();
}
private void consume(Destination queue) throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
@ -180,7 +254,7 @@ public class MKahaDBStoreLimitTest {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(new byte[2*1024]);
bytesMessage.writeBytes(new byte[1*1024]);
for (int i = 0; i < count; ++i) {
producer.send(bytesMessage);
}

View File

@ -20,21 +20,24 @@ package org.apache.activemq.usage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Session;
import java.util.concurrent.TimeUnit;
public class StoreUsageTest extends EmbeddedBrokerTestSupport {
final int WAIT_TIME_MILLS = 20*1000;
private static final Logger LOG = LoggerFactory.getLogger(StoreUsageTest.class);
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.getSystemUsage().getStoreUsage().setLimit(10 * 1024);
broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
broker.deleteAllMessages();
return broker;
}
@ -52,20 +55,36 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport {
final ProducerThread producer = new ProducerThread(sess, dest);
producer.start();
assertTrue("some messages sent", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
BaseDestination baseDestination = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(dest);
return baseDestination != null && baseDestination.getDestinationStatistics().getEnqueues().getCount() > 0;
}
}));
BaseDestination baseDestination = (BaseDestination) broker.getRegionBroker().getDestinationMap().get(dest);
LOG.info("Sent u: " + baseDestination.getDestinationStatistics().getEnqueues());
// wait for the producer to block
Thread.sleep(WAIT_TIME_MILLS / 2);
int sent = 0;
do {
sent = producer.getSentCount();
TimeUnit.SECONDS.sleep(1);
LOG.info("Sent: " + sent);
} while (sent != producer.getSentCount());
LOG.info("Increasing limit! enqueues: " + baseDestination.getDestinationStatistics().getEnqueues().getCount());
broker.getAdminView().setStoreLimit(1024 * 1024);
Thread.sleep(WAIT_TIME_MILLS);
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, WAIT_TIME_MILLS * 2);
});
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount());
assertEquals("Enqueues match sent", producer.getSentCount(), baseDestination.getDestinationStatistics().getEnqueues().getCount());
}
}