diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index a1233e0aef..a668c4eb17 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; @@ -43,13 +45,15 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean */ -public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware { +public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware, + BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); MemoryTransactionStore transactionStore; ConcurrentMap topics = new ConcurrentHashMap(); ConcurrentMap queues = new ConcurrentHashMap(); private boolean useExternalMessageReferences; + protected BrokerService brokerService; @Override public Set getDestinations() { @@ -118,7 +122,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs @Override public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { - transactionStore = new MemoryTransactionStore(this); + transactionStore = new MemoryTransactionStore(this, brokerService); } return transactionStore; } @@ -253,4 +257,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs public boolean isPersistNoLocal() { return true; } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index abf8282435..62b57613ef 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -51,6 +52,7 @@ public class MemoryTransactionStore implements TransactionStore { protected ConcurrentMap inflightTransactions = new ConcurrentHashMap(); protected Map preparedTransactions = Collections.synchronizedMap(new LinkedHashMap()); protected final PersistenceAdapter persistenceAdapter; + protected final BrokerService brokerService; private boolean doingRecover; @@ -93,6 +95,13 @@ public class MemoryTransactionStore implements TransactionStore { */ public void commit() throws IOException { ConnectionContext ctx = new ConnectionContext(); + try { + if (brokerService != null) { + ctx.setBroker(brokerService.getBroker()); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } persistenceAdapter.beginTransaction(ctx); try { @@ -134,8 +143,9 @@ public class MemoryTransactionStore implements TransactionStore { MessageStore getMessageStore(); } - public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { + public MemoryTransactionStore(PersistenceAdapter persistenceAdapter, BrokerService brokerService) { this.persistenceAdapter = persistenceAdapter; + this.brokerService = brokerService; } public MessageStore proxy(MessageStore messageStore) { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index ccf7485013..32871696c4 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -51,7 +51,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { - super(jdbcPersistenceAdapter); + super(jdbcPersistenceAdapter, jdbcPersistenceAdapter.getBrokerService()); } @Override @@ -163,6 +163,13 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { if (tx != null) { // undo prepare work ConnectionContext ctx = new ConnectionContext(); + try { + if (brokerService != null) { + ctx.setBroker(brokerService.getBroker()); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } persistenceAdapter.beginTransaction(ctx); try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 82f50cc801..624126e13a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -25,8 +25,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; import java.util.HashSet; - -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -39,9 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; -import javax.jms.TextMessage; import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; @@ -49,7 +45,6 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.NullMessageReference; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -76,21 +71,27 @@ public class AdvisoryTests { protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected final boolean includeBodyForAdvisory; + protected final boolean persistent; protected final int EXPIRE_MESSAGE_PERIOD = 3000; - @Parameters(name = "includeBodyForAdvisory={0}") + @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}") public static Collection data() { return Arrays.asList(new Object[][] { // Include the full body of the message - {true}, + {true, false}, // Don't include the full body of the message - {false} + {false, false}, + // Include the full body of the message + {true, true}, + // Don't include the full body of the message + {false, true} }); } - public AdvisoryTests(boolean includeBodyForAdvisory) { + public AdvisoryTests(boolean includeBodyForAdvisory, boolean persistent) { super(); this.includeBodyForAdvisory = includeBodyForAdvisory; + this.persistent = persistent; } @Test(timeout = 60000) @@ -172,45 +173,88 @@ public class AdvisoryTests { @Test(timeout = 60000) public void testQueueMessageDeliveryAdvisory() throws Exception { - testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testQueueMessageDeliveryAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true); } @Test(timeout = 60000) public void testQueueMessageDispatchedAdvisory() throws Exception { - testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testQueueMessageDispatchedAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); } @Test(timeout = 60000) public void testQueueMessageDispatchedAdvisorySync() throws Exception { ((ActiveMQConnection)connection).setDispatchAsync(false); - testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testQueueMessageDispatchedAdvisorySyncTransacted() throws Exception { + ((ActiveMQConnection)connection).setDispatchAsync(false); + testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); } @Test(timeout = 60000) public void testTopicMessageDeliveryAdvisory() throws Exception { - testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testTopicMessageDeliveryAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true); } @Test(timeout = 60000) public void testTopicMessageDispatchedAdvisory() throws Exception { - testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testTopicMessageDispatchedAdvisoryTransacted() throws Exception { + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); } @Test(timeout = 60000) public void testTopicMessageDispatchedAdvisorySync() throws Exception { ((ActiveMQConnection)connection).setDispatchAsync(false); - testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testTopicMessageDispatchedAdvisorySyncTransacted() throws Exception { + ((ActiveMQConnection)connection).setDispatchAsync(false); + testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); } @Test(timeout = 60000) public void testDurableMessageDispatchedAdvisory() throws Exception { - testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); } @Test(timeout = 60000) public void testDurableMessageDispatchedAdvisorySync() throws Exception { ((ActiveMQConnection)connection).setDispatchAsync(false); - testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic); + testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, false); + } + + @Test(timeout = 60000) + public void testDurableMessageDispatchedAdvisoryTransacted() throws Exception { + testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); + } + + @Test(timeout = 60000) + public void testDurableMessageDispatchedAdvisorySyncTransacted() throws Exception { + ((ActiveMQConnection)connection).setDispatchAsync(false); + testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, true); } @Test(timeout = 60000) @@ -243,8 +287,9 @@ public class AdvisoryTests { assertNull(msg); } - private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function advisoryTopicSupplier) throws Exception { - Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function advisoryTopicSupplier, + boolean transacted) throws Exception { + Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = s.createConsumer(dest); assertNotNull(consumer); @@ -256,15 +301,21 @@ public class AdvisoryTests { BytesMessage m = s.createBytesMessage(); m.writeBytes(new byte[1024]); producer.send(m); + if (transacted) { + s.commit(); + } Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + if (transacted) { + s.commit(); + } ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); - //This should always be tcp:// because that is the transport that is used to connect even though - //the nio transport is the first one in the list - assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://")); + //Could be either + String originBrokerUrl = (String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL); + assertTrue(originBrokerUrl.startsWith("tcp://") || originBrokerUrl.startsWith("nio://")); assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName()); //Make sure consumer id exists if dispatched advisory @@ -277,8 +328,9 @@ public class AdvisoryTests { assertIncludeBodyForAdvisory(payload); } - private void testDurableSubscriptionAdvisory(Function advisoryTopicSupplier) throws Exception { - Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + private void testDurableSubscriptionAdvisory(Function advisoryTopicSupplier, + boolean transacted) throws Exception { + Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); Topic topic = s.createTopic(getClass().getName()); MessageConsumer consumer = s.createDurableSubscriber(topic, "sub"); assertNotNull(consumer); @@ -291,9 +343,15 @@ public class AdvisoryTests { BytesMessage m = s.createBytesMessage(); m.writeBytes(new byte[1024]); producer.send(m); + if (transacted) { + s.commit(); + } Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + if (transacted) { + s.commit(); + } ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); @@ -632,7 +690,9 @@ public class AdvisoryTests { } protected void configureBroker(BrokerService answer) throws Exception { - answer.setPersistent(false); + answer.setPersistent(persistent); + answer.setDeleteAllMessagesOnStartup(true); + PolicyEntry policy = new PolicyEntry(); policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD); policy.setAdvisoryForFastProducers(true);