mirror of https://github.com/apache/activemq.git
reduce test duration from crazy 30 minutes, validate temp store being used
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
08ae7770bb
commit
cc7ea7f31f
|
@ -37,14 +37,21 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
|
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class UnlimitedEnqueueTest {
|
public class UnlimitedEnqueueTest {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(UnlimitedEnqueueTest.class);
|
||||||
BrokerService brokerService = null;
|
BrokerService brokerService = null;
|
||||||
final long numMessages = 50000;
|
final long numMessages = 50000;
|
||||||
final long numThreads = 10;
|
final long numThreads = 10;
|
||||||
|
final int payLoadSize = 100*1024;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEnqueueIsOnlyLimitedByDisk() throws Exception {
|
public void testEnqueueIsOnlyLimitedByDisk() throws Exception {
|
||||||
|
@ -53,8 +60,14 @@ public class UnlimitedEnqueueTest {
|
||||||
executor.execute(new Producer(numMessages/numThreads));
|
executor.execute(new Producer(numMessages/numThreads));
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.shutdown();
|
assertTrue("Temp Store is filling ", Wait.waitFor(new Wait.Condition(){
|
||||||
executor.awaitTermination(30*60, TimeUnit.SECONDS);
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
LOG.info("Temp Usage, " + brokerService.getSystemUsage().getTempUsage() + ", full=" + brokerService.getSystemUsage().getTempUsage().isFull() + ", % usage: " + brokerService.getSystemUsage().getTempUsage().getPercentUsage());
|
||||||
|
return brokerService.getSystemUsage().getTempUsage().getPercentUsage() > 1;
|
||||||
|
}
|
||||||
|
}, TimeUnit.MINUTES.toMillis(4)));
|
||||||
|
executor.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -65,6 +78,8 @@ public class UnlimitedEnqueueTest {
|
||||||
|
|
||||||
// optional, reduce the usage limit so that spooling will occur faster
|
// optional, reduce the usage limit so that spooling will occur faster
|
||||||
brokerService.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
|
brokerService.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
|
||||||
|
brokerService.getSystemUsage().getTempUsage().setLimit(numMessages * payLoadSize * 2);
|
||||||
|
|
||||||
PolicyMap policyMap = new PolicyMap();
|
PolicyMap policyMap = new PolicyMap();
|
||||||
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
|
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
|
||||||
PolicyEntry policy = new PolicyEntry();
|
PolicyEntry policy = new PolicyEntry();
|
||||||
|
@ -99,13 +114,13 @@ public class UnlimitedEnqueueTest {
|
||||||
try {
|
try {
|
||||||
Connection conn = factory.createConnection();
|
Connection conn = factory.createConnection();
|
||||||
conn.start();
|
conn.start();
|
||||||
|
byte[] bytes = new byte[payLoadSize];
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
|
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
|
||||||
Destination destination = session.createQueue("test-queue");
|
Destination destination = session.createQueue("test-queue");
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
BytesMessage message = session.createBytesMessage();
|
BytesMessage message = session.createBytesMessage();
|
||||||
byte[] bytes = new byte[1024*10];
|
|
||||||
message.writeBytes(bytes);
|
message.writeBytes(bytes);
|
||||||
try {
|
try {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
@ -115,7 +130,7 @@ public class UnlimitedEnqueueTest {
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
throw new RuntimeException(e);
|
// expect interrupted exception on shutdownNow
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue