Convert to JUnit 4 while fixing sporadic failure due to the expiry
setting being left at default of 30 seconds, which on a fast machine
means the expiry tests sometime finish before the task kicks in.
This commit is contained in:
Timothy Bish 2015-05-07 18:30:02 -04:00
parent 188434c6ee
commit c10e6fa8f0
2 changed files with 71 additions and 52 deletions

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -31,8 +35,6 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import javax.jms.Topic; import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -41,16 +43,21 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AdvisoryTempDestinationTests extends TestCase { public class AdvisoryTempDestinationTests {
protected static final int MESSAGE_COUNT = 2000; protected static final int MESSAGE_COUNT = 2000;
protected static final int EXPIRE_MESSAGE_PERIOD = 10000;
protected BrokerService broker; protected BrokerService broker;
protected Connection connection; protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount; protected int topicCount;
@Test(timeout = 60000)
public void testNoSlowConsumerAdvisory() throws Exception { public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue(); TemporaryQueue queue = s.createTemporaryQueue();
@ -60,8 +67,8 @@ public class AdvisoryTempDestinationTests extends TestCase {
public void onMessage(Message message) { public void onMessage(Message message) {
} }
}); });
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer // start throwing messages at the consumer
@ -75,14 +82,14 @@ public class AdvisoryTempDestinationTests extends TestCase {
assertNull(msg); assertNull(msg);
} }
@Test(timeout = 60000)
public void testSlowConsumerAdvisory() throws Exception { public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue(); TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue); MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer); assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer // start throwing messages at the consumer
@ -96,6 +103,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
@Test(timeout = 60000)
public void testMessageDeliveryAdvisory() throws Exception { public void testMessageDeliveryAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue(); TemporaryQueue queue = s.createTemporaryQueue();
@ -104,7 +112,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
@ -115,6 +123,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
@Test(timeout = 60000)
public void testTempMessageConsumedAdvisory() throws Exception { public void testTempMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue(); TemporaryQueue queue = s.createTemporaryQueue();
@ -122,7 +131,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
@ -141,6 +150,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
assertEquals(originalId, id); assertEquals(originalId, id);
} }
@Test(timeout = 60000)
public void testMessageExpiredAdvisory() throws Exception { public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
@ -149,7 +159,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1); producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
@ -158,34 +168,30 @@ public class AdvisoryTempDestinationTests extends TestCase {
producer.send(m); producer.send(m);
} }
Message msg = advisoryConsumer.receive(5000); Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
assertNotNull(msg); assertNotNull(msg);
} }
@Override @Before
protected void setUp() throws Exception { public void setUp() throws Exception {
if (broker == null) { if (broker == null) {
broker = createBroker(); broker = createBroker();
} }
ConnectionFactory factory = createConnectionFactory(); ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection(); connection = factory.createConnection();
connection.start(); connection.start();
super.setUp();
} }
@Override @After
protected void tearDown() throws Exception { public void tearDown() throws Exception {
super.tearDown();
connection.close(); connection.close();
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
} }
} }
protected ActiveMQConnectionFactory createConnectionFactory() protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf; return cf;
} }
@ -218,6 +224,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD);
policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -27,29 +31,34 @@ import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.*; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic; import org.junit.After;
import org.apache.activemq.util.Wait; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/** /**
* * Test for advisory messages sent under the right circumstances.
*/ */
public class AdvisoryTests extends TestCase { public class AdvisoryTests {
protected static final int MESSAGE_COUNT = 2000; protected static final int MESSAGE_COUNT = 2000;
protected BrokerService broker; protected BrokerService broker;
protected Connection connection; protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount; protected int topicCount;
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
@Test(timeout = 60000)
public void testNoSlowConsumerAdvisory() throws Exception { public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
@ -59,8 +68,8 @@ public class AdvisoryTests extends TestCase {
public void onMessage(Message message) { public void onMessage(Message message) {
} }
}); });
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer // start throwing messages at the consumer
@ -74,14 +83,14 @@ public class AdvisoryTests extends TestCase {
assertNull(msg); assertNull(msg);
} }
@Test(timeout = 60000)
public void testSlowConsumerAdvisory() throws Exception { public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue); MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer); assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer // start throwing messages at the consumer
@ -95,6 +104,7 @@ public class AdvisoryTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
@Test(timeout = 60000)
public void testMessageDeliveryAdvisory() throws Exception { public void testMessageDeliveryAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
@ -103,7 +113,7 @@ public class AdvisoryTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
@ -114,6 +124,7 @@ public class AdvisoryTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
@Test(timeout = 60000)
public void testMessageConsumedAdvisory() throws Exception { public void testMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
@ -121,7 +132,7 @@ public class AdvisoryTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
@ -140,6 +151,7 @@ public class AdvisoryTests extends TestCase {
assertEquals(originalId, id); assertEquals(originalId, id);
} }
@Test(timeout = 60000)
public void testMessageExpiredAdvisory() throws Exception { public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName()); Queue queue = s.createQueue(getClass().getName());
@ -148,7 +160,7 @@ public class AdvisoryTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue); MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1); producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) { for (int i = 0; i < MESSAGE_COUNT; i++) {
@ -157,33 +169,36 @@ public class AdvisoryTests extends TestCase {
producer.send(m); producer.send(m);
} }
Message msg = advisoryConsumer.receive(2000); Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
assertNotNull(msg); assertNotNull(msg);
} }
@Test(timeout = 60000)
public void testMessageDLQd() throws Exception { public void testMessageDLQd() throws Exception {
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setTopicPrefetch(2); policy.setTopicPrefetch(2);
((ActiveMQConnection)connection).setPrefetchPolicy(policy); ((ActiveMQConnection) connection).setPrefetchPolicy(policy);
Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName()); Topic topic = s.createTopic(getClass().getName());
Topic advisoryTopic = s.createTopic(">"); Topic advisoryTopic = s.createTopic(">");
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); s.createConsumer(advisoryTopic);
} }
MessageProducer producer = s.createProducer(topic); MessageProducer producer = s.createProducer(topic);
int count = 10; int count = 10;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
producer.send(m); producer.send(m);
} }
// we should get here without StackOverflow // we should get here without StackOverflow
} }
public void xtestMessageDiscardedAdvisory() throws Exception { @Ignore
@Test(timeout = 60000)
public void testMessageDiscardedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName()); Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createConsumer(topic); MessageConsumer consumer = s.createConsumer(topic);
@ -191,7 +206,7 @@ public class AdvisoryTests extends TestCase {
Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic); Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(topic); MessageProducer producer = s.createProducer(topic);
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -203,30 +218,26 @@ public class AdvisoryTests extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
@Override @Before
protected void setUp() throws Exception { public void setUp() throws Exception {
if (broker == null) { if (broker == null) {
broker = createBroker(); broker = createBroker();
} }
ConnectionFactory factory = createConnectionFactory(); ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection(); connection = factory.createConnection();
connection.start(); connection.start();
super.setUp();
} }
@Override @After
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown();
connection.close(); connection.close();
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
} }
} }
protected ActiveMQConnectionFactory createConnectionFactory() protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf; return cf;
} }
@ -240,6 +251,7 @@ public class AdvisoryTests extends TestCase {
protected void configureBroker(BrokerService answer) throws Exception { protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false); answer.setPersistent(false);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD);
policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true); policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDelivery(true);