diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index a1233e0aef..a668c4eb17 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; @@ -43,13 +45,15 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean */ -public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware { +public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware, + BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); MemoryTransactionStore transactionStore; ConcurrentMap topics = new ConcurrentHashMap(); ConcurrentMap queues = new ConcurrentHashMap(); private boolean useExternalMessageReferences; + protected BrokerService brokerService; @Override public Set getDestinations() { @@ -118,7 +122,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs @Override public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { - transactionStore = new MemoryTransactionStore(this); + transactionStore = new MemoryTransactionStore(this, brokerService); } return transactionStore; } @@ -253,4 +257,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs public boolean isPersistNoLocal() { return true; } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index abf8282435..62b57613ef 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -51,6 +52,7 @@ public class MemoryTransactionStore implements TransactionStore { protected ConcurrentMap inflightTransactions = new ConcurrentHashMap(); protected Map preparedTransactions = Collections.synchronizedMap(new LinkedHashMap()); protected final PersistenceAdapter persistenceAdapter; + protected final BrokerService brokerService; private boolean doingRecover; @@ -93,6 +95,13 @@ public class MemoryTransactionStore implements TransactionStore { */ public void commit() throws IOException { ConnectionContext ctx = new ConnectionContext(); + try { + if (brokerService != null) { + ctx.setBroker(brokerService.getBroker()); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } persistenceAdapter.beginTransaction(ctx); try { @@ -134,8 +143,9 @@ public class MemoryTransactionStore implements TransactionStore { MessageStore getMessageStore(); } - public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { + public MemoryTransactionStore(PersistenceAdapter persistenceAdapter, BrokerService brokerService) { this.persistenceAdapter = persistenceAdapter; + this.brokerService = brokerService; } public MessageStore proxy(MessageStore messageStore) { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index ccf7485013..32871696c4 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -51,7 +51,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { - super(jdbcPersistenceAdapter); + super(jdbcPersistenceAdapter, jdbcPersistenceAdapter.getBrokerService()); } @Override @@ -163,6 +163,13 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { if (tx != null) { // undo prepare work ConnectionContext ctx = new ConnectionContext(); + try { + if (brokerService != null) { + ctx.setBroker(brokerService.getBroker()); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } persistenceAdapter.beginTransaction(ctx); try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 497856505c..9207fb0b41 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collection; +import java.util.Enumeration; import java.util.HashSet; - -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -35,9 +35,9 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; @@ -45,7 +45,6 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.NullMessageReference; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -71,24 +70,28 @@ public class AdvisoryTests { protected BrokerService broker; protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; - protected int topicCount; protected final boolean includeBodyForAdvisory; + protected final boolean persistent; protected final int EXPIRE_MESSAGE_PERIOD = 3000; - - @Parameters(name = "includeBodyForAdvisory={0}") + @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}") public static Collection data() { return Arrays.asList(new Object[][] { - // Include the full body of the message - {true}, - // Don't include the full body of the message - {false} + // Include the full body of the message + {true, false}, + // Don't include the full body of the message + {false, false}, + // Include the full body of the message + {true, true}, + // Don't include the full body of the message + {false, true} }); } - public AdvisoryTests(boolean includeBodyForAdvisory) { + public AdvisoryTests(boolean includeBodyForAdvisory, boolean persistent) { super(); this.includeBodyForAdvisory = includeBodyForAdvisory; + this.persistent = persistent; } @Test(timeout = 60000) @@ -169,30 +172,92 @@ public class AdvisoryTests { } @Test(timeout = 60000) - public void testMessageDeliveryAdvisory() throws Exception { - Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = s.createQueue(getClass().getName()); - MessageConsumer consumer = s.createConsumer(queue); + public void testQueueMessageDeliveryAdvisory() throws Exception { + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testQueueMessageDeliveryAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true); + } + + @Test(timeout = 60000) + public void testTopicMessageDeliveryAdvisory() throws Exception { + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testTopicMessageDeliveryAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true); + } + + private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function advisoryTopicSupplier, + boolean transacted) throws Exception { + Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = s.createConsumer(dest); assertNotNull(consumer); - Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); + Topic advisoryTopic = advisoryTopicSupplier.apply(dest); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer - MessageProducer producer = s.createProducer(queue); + MessageProducer producer = s.createProducer(dest); BytesMessage m = s.createBytesMessage(); m.writeBytes(new byte[1024]); producer.send(m); + if (transacted) { + s.commit(); + } Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + if (transacted) { + s.commit(); + } + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + + //Could be either + String originBrokerUrl = (String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL); + assertTrue(originBrokerUrl.startsWith("tcp://") || originBrokerUrl.startsWith("nio://")); + assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName()); + + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); + } + + private void testDurableSubscriptionAdvisory(Function advisoryTopicSupplier, + boolean transacted) throws Exception { + Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Topic topic = s.createTopic(getClass().getName()); + MessageConsumer consumer = s.createDurableSubscriber(topic, "sub"); + assertNotNull(consumer); + + Topic advisoryTopic = advisoryTopicSupplier.apply((ActiveMQDestination) topic); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(topic); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + if (transacted) { + s.commit(); + } + + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + if (transacted) { + s.commit(); + } ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); //This should always be tcp:// because that is the transport that is used to connect even though //the nio transport is the first one in the list assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://")); - assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) queue).getQualifiedName()); + assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) topic).getQualifiedName()); //Add assertion to make sure body is included for advisory topics //when includeBodyForAdvisory is true @@ -524,7 +589,9 @@ public class AdvisoryTests { } protected void configureBroker(BrokerService answer) throws Exception { - answer.setPersistent(false); + answer.setPersistent(persistent); + answer.setDeleteAllMessagesOnStartup(true); + PolicyEntry policy = new PolicyEntry(); policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD); policy.setAdvisoryForFastProducers(true);