mirror of https://github.com/apache/activemq.git
Updating tests to check prioritized messages as well
This commit is contained in:
parent
cbad8babe5
commit
485fcafcdf
|
@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
@ -48,6 +50,7 @@ import org.apache.activemq.util.Wait.Condition;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -67,6 +70,25 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
|
|||
protected String defaultQueueName = "test.queue";
|
||||
protected String defaultTopicName = "test.topic";
|
||||
protected static int maxMessageSize = 1000;
|
||||
protected boolean prioritizedMessages;
|
||||
|
||||
@Parameters(name="prioritizedMessages={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
// use priority messages
|
||||
{true},
|
||||
// don't use priority messages
|
||||
{false}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param prioritizedMessages
|
||||
*/
|
||||
public AbstractPendingMessageCursorTest(boolean prioritizedMessages) {
|
||||
super();
|
||||
this.prioritizedMessages = prioritizedMessages;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
|
@ -86,6 +108,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
|
|||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setTopicPrefetch(100);
|
||||
policy.setDurableTopicPrefetch(100);
|
||||
policy.setPrioritizedMessages(isPrioritizedMessages());
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
@ -114,6 +137,10 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
|
|||
|
||||
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
|
||||
|
||||
protected boolean isPrioritizedMessages() {
|
||||
return prioritizedMessages;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueMessageSize() throws Exception {
|
||||
AtomicLong publishedMessageSize = new AtomicLong();
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -42,6 +44,7 @@ import org.slf4j.LoggerFactory;
|
|||
* AMQ-5923
|
||||
*
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class KahaDBPendingMessageCursorTest extends
|
||||
AbstractPendingMessageCursorTest {
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
|
@ -50,6 +53,13 @@ public class KahaDBPendingMessageCursorTest extends
|
|||
@Rule
|
||||
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
||||
|
||||
/**
|
||||
* @param prioritizedMessages
|
||||
*/
|
||||
public KahaDBPendingMessageCursorTest(boolean prioritizedMessages) {
|
||||
super(prioritizedMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||
if (clearDataDir && dataFileDir.getRoot().exists())
|
||||
|
@ -159,4 +169,5 @@ public class KahaDBPendingMessageCursorTest extends
|
|||
verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
|
||||
verifyStoreStats(topic, 0, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||
import org.apache.activemq.util.SubscriptionKey;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -36,10 +38,16 @@ import org.slf4j.LoggerFactory;
|
|||
* AMQ-5748
|
||||
*
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
.getLogger(MemoryPendingMessageCursorTest.class);
|
||||
|
||||
|
||||
public MemoryPendingMessageCursorTest(boolean prioritizedMessages) {
|
||||
super(prioritizedMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initPersistence(BrokerService brokerService) throws IOException {
|
||||
broker.setPersistent(false);
|
||||
|
|
|
@ -34,6 +34,13 @@ import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
|
|||
public class MultiKahaDBPendingMessageCursorTest extends
|
||||
KahaDBPendingMessageCursorTest {
|
||||
|
||||
/**
|
||||
* @param prioritizedMessages
|
||||
*/
|
||||
public MultiKahaDBPendingMessageCursorTest(boolean prioritizedMessages) {
|
||||
super(prioritizedMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initPersistence(BrokerService brokerService)
|
||||
throws IOException {
|
||||
|
|
Loading…
Reference in New Issue