AMQ-9175 - Properly set broker on ConnectionContext inside

MemoryTransactionStores

This fixes a NPE when using the messageDelivered advisory and
transactions
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-21 11:02:12 -05:00
parent 56d89303bd
commit f83c5f1ba1
4 changed files with 115 additions and 29 deletions

View File

@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -43,13 +45,15 @@ import org.slf4j.LoggerFactory;
/** /**
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*/ */
public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware { public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware,
BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
MemoryTransactionStore transactionStore; MemoryTransactionStore transactionStore;
ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>(); ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private boolean useExternalMessageReferences; private boolean useExternalMessageReferences;
protected BrokerService brokerService;
@Override @Override
public Set<ActiveMQDestination> getDestinations() { public Set<ActiveMQDestination> getDestinations() {
@ -118,7 +122,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs
@Override @Override
public TransactionStore createTransactionStore() throws IOException { public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) { if (transactionStore == null) {
transactionStore = new MemoryTransactionStore(this); transactionStore = new MemoryTransactionStore(this, brokerService);
} }
return transactionStore; return transactionStore;
} }
@ -253,4 +257,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs
public boolean isPersistNoLocal() { public boolean isPersistNoLocal() {
return true; return true;
} }
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
} }

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -51,6 +52,7 @@ public class MemoryTransactionStore implements TransactionStore {
protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>()); protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
protected final PersistenceAdapter persistenceAdapter; protected final PersistenceAdapter persistenceAdapter;
protected final BrokerService brokerService;
private boolean doingRecover; private boolean doingRecover;
@ -93,6 +95,13 @@ public class MemoryTransactionStore implements TransactionStore {
*/ */
public void commit() throws IOException { public void commit() throws IOException {
ConnectionContext ctx = new ConnectionContext(); ConnectionContext ctx = new ConnectionContext();
try {
if (brokerService != null) {
ctx.setBroker(brokerService.getBroker());
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
persistenceAdapter.beginTransaction(ctx); persistenceAdapter.beginTransaction(ctx);
try { try {
@ -134,8 +143,9 @@ public class MemoryTransactionStore implements TransactionStore {
MessageStore getMessageStore(); MessageStore getMessageStore();
} }
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { public MemoryTransactionStore(PersistenceAdapter persistenceAdapter, BrokerService brokerService) {
this.persistenceAdapter = persistenceAdapter; this.persistenceAdapter = persistenceAdapter;
this.brokerService = brokerService;
} }
public MessageStore proxy(MessageStore messageStore) { public MessageStore proxy(MessageStore messageStore) {

View File

@ -51,7 +51,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
super(jdbcPersistenceAdapter); super(jdbcPersistenceAdapter, jdbcPersistenceAdapter.getBrokerService());
} }
@Override @Override
@ -163,6 +163,13 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
if (tx != null) { if (tx != null) {
// undo prepare work // undo prepare work
ConnectionContext ctx = new ConnectionContext(); ConnectionContext ctx = new ConnectionContext();
try {
if (brokerService != null) {
ctx.setBroker(brokerService.getBroker());
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
persistenceAdapter.beginTransaction(ctx); persistenceAdapter.beginTransaction(ctx);
try { try {

View File

@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
@ -39,9 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
@ -49,7 +45,6 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.NullMessageReference;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -76,21 +71,27 @@ public class AdvisoryTests {
protected Connection connection; protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected final boolean includeBodyForAdvisory; protected final boolean includeBodyForAdvisory;
protected final boolean persistent;
protected final int EXPIRE_MESSAGE_PERIOD = 3000; protected final int EXPIRE_MESSAGE_PERIOD = 3000;
@Parameters(name = "includeBodyForAdvisory={0}") @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
// Include the full body of the message // Include the full body of the message
{true}, {true, false},
// Don't include the full body of the message // Don't include the full body of the message
{false} {false, false},
// Include the full body of the message
{true, true},
// Don't include the full body of the message
{false, true}
}); });
} }
public AdvisoryTests(boolean includeBodyForAdvisory) { public AdvisoryTests(boolean includeBodyForAdvisory, boolean persistent) {
super(); super();
this.includeBodyForAdvisory = includeBodyForAdvisory; this.includeBodyForAdvisory = includeBodyForAdvisory;
this.persistent = persistent;
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -172,45 +173,88 @@ public class AdvisoryTests {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testQueueMessageDeliveryAdvisory() throws Exception { public void testQueueMessageDeliveryAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testQueueMessageDeliveryAdvisoryTransacted() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisory() throws Exception { public void testQueueMessageDispatchedAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisoryTransacted() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisorySync() throws Exception { public void testQueueMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false); ((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisorySyncTransacted() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTopicMessageDeliveryAdvisory() throws Exception { public void testTopicMessageDeliveryAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testTopicMessageDeliveryAdvisoryTransacted() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisory() throws Exception { public void testTopicMessageDispatchedAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisoryTransacted() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisorySync() throws Exception { public void testTopicMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false); ((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic); testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisorySyncTransacted() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisory() throws Exception { public void testDurableMessageDispatchedAdvisory() throws Exception {
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic); testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisorySync() throws Exception { public void testDurableMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false); ((ActiveMQConnection)connection).setDispatchAsync(false);
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic); testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisoryTransacted() throws Exception {
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisorySyncTransacted() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -243,8 +287,9 @@ public class AdvisoryTests {
assertNull(msg); assertNull(msg);
} }
private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception { private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function<ActiveMQDestination, Topic> advisoryTopicSupplier,
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); boolean transacted) throws Exception {
Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(dest); MessageConsumer consumer = s.createConsumer(dest);
assertNotNull(consumer); assertNotNull(consumer);
@ -256,15 +301,21 @@ public class AdvisoryTests {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]); m.writeBytes(new byte[1024]);
producer.send(m); producer.send(m);
if (transacted) {
s.commit();
}
Message msg = advisoryConsumer.receive(1000); Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
if (transacted) {
s.commit();
}
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should always be tcp:// because that is the transport that is used to connect even though //Could be either
//the nio transport is the first one in the list String originBrokerUrl = (String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL);
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://")); assertTrue(originBrokerUrl.startsWith("tcp://") || originBrokerUrl.startsWith("nio://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName()); assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName());
//Make sure consumer id exists if dispatched advisory //Make sure consumer id exists if dispatched advisory
@ -277,8 +328,9 @@ public class AdvisoryTests {
assertIncludeBodyForAdvisory(payload); assertIncludeBodyForAdvisory(payload);
} }
private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception { private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination, Topic> advisoryTopicSupplier,
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); boolean transacted) throws Exception {
Session s = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName()); Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createDurableSubscriber(topic, "sub"); MessageConsumer consumer = s.createDurableSubscriber(topic, "sub");
assertNotNull(consumer); assertNotNull(consumer);
@ -291,9 +343,15 @@ public class AdvisoryTests {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]); m.writeBytes(new byte[1024]);
producer.send(m); producer.send(m);
if (transacted) {
s.commit();
}
Message msg = advisoryConsumer.receive(1000); Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
if (transacted) {
s.commit();
}
ActiveMQMessage message = (ActiveMQMessage) msg; ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
@ -632,7 +690,9 @@ public class AdvisoryTests {
} }
protected void configureBroker(BrokerService answer) throws Exception { protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false); answer.setPersistent(persistent);
answer.setDeleteAllMessagesOnStartup(true);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD); policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD);
policy.setAdvisoryForFastProducers(true); policy.setAdvisoryForFastProducers(true);