ARTEMIS-980 Openwire can't send message to temp destination
When a producer sends a messages to a temp destination created from another connection, it fails. The reason behind it is that the producer's connection didn't receive the advisory message (notification) from broker about this temp destination, and it will throw an exception if it doesn't know this temp destination. The fix is send the advisory to the client so that it knows this destination.
This commit is contained in:
parent
e77d64078b
commit
b84a7f3e25
|
@ -731,6 +731,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
}
|
||||
|
||||
if (dest.isTemporary()) {
|
||||
//Openwire needs to store the DestinationInfo in order to send
|
||||
//Advisory messages to clients
|
||||
this.state.addTempDestination(info);
|
||||
}
|
||||
|
||||
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
|
||||
AMQConnectionContext context = getContext();
|
||||
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
|
||||
|
@ -773,6 +779,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
|
||||
ss.addConsumer(info);
|
||||
amqSession.start();
|
||||
|
||||
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
|
||||
//advisory for temp destinations
|
||||
if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
|
||||
// Replay the temporary destinations.
|
||||
List<DestinationInfo> tmpDests = this.protocolManager.getTemporaryDestinations();
|
||||
for (DestinationInfo di : tmpDests) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(di.getDestination());
|
||||
String originalConnectionId = di.getConnectionId().getValue();
|
||||
protocolManager.fireAdvisory(context, topic, di, info.getConsumerId(), originalConnectionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -816,6 +835,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
@Override
|
||||
public void tempQueueDeleted(SimpleString bindingName) {
|
||||
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
|
||||
state.removeTempDestination(dest);
|
||||
|
||||
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
|
||||
AMQConnectionContext context = getContext();
|
||||
|
@ -846,6 +866,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
this.noLocal = noLocal;
|
||||
}
|
||||
|
||||
public List<DestinationInfo> getTemporaryDestinations() {
|
||||
return state.getTempDestinations();
|
||||
}
|
||||
|
||||
class SlowConsumerDetection implements SlowConsumerDetectionListener {
|
||||
|
||||
@Override
|
||||
|
@ -856,7 +880,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
try {
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
|
||||
protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId());
|
||||
protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire;
|
|||
|
||||
import javax.jms.InvalidClientIDException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
|||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
|
@ -64,6 +66,7 @@ import org.apache.activemq.command.Command;
|
|||
import org.apache.activemq.command.ConnectionControl;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
|
@ -333,7 +336,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
}
|
||||
|
||||
public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
|
||||
this.fireAdvisory(context, topic, copy, null);
|
||||
this.fireAdvisory(context, topic, copy, null, null);
|
||||
}
|
||||
|
||||
public BrokerId getBrokerId() {
|
||||
|
@ -350,8 +353,14 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
public void fireAdvisory(AMQConnectionContext context,
|
||||
ActiveMQTopic topic,
|
||||
Command command,
|
||||
ConsumerId targetConsumerId) throws Exception {
|
||||
ConsumerId targetConsumerId,
|
||||
String originalConnectionId) throws Exception {
|
||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||
|
||||
if (originalConnectionId == null) {
|
||||
originalConnectionId = context.getConnectionId().getValue();
|
||||
}
|
||||
advisoryMessage.setStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), originalConnectionId);
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
|
||||
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
|
||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
|
||||
|
@ -581,4 +590,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
public Map<SimpleString, RoutingType> getPrefixes() {
|
||||
return prefixes;
|
||||
}
|
||||
|
||||
public List<DestinationInfo> getTemporaryDestinations() {
|
||||
List<DestinationInfo> total = new ArrayList<>();
|
||||
for (OpenWireConnection connection : connections) {
|
||||
total.addAll(connection.getTemporaryDestinations());
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -215,7 +215,9 @@ public class AMQConsumer {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (session.getConnection().isNoLocal()) {
|
||||
if (session.getConnection().isNoLocal() || session.isInternal()) {
|
||||
//internal session always delivers messages to noLocal advisory consumers
|
||||
//so we need to remove this property too.
|
||||
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
|
||||
}
|
||||
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
|
||||
|
|
|
@ -298,8 +298,9 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
|
||||
|
||||
if (connection.isNoLocal() || sessInfo.getSessionId().getValue() == -1) {
|
||||
//Internal session is used to send advisory messages, which are always noLocal
|
||||
if (connection.isNoLocal()) {
|
||||
//Note: advisory messages are dealt with in
|
||||
//OpenWireProtocolManager#fireAdvisory
|
||||
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getConnectionId().getValue());
|
||||
}
|
||||
|
||||
|
@ -447,4 +448,8 @@ public class AMQSession implements SessionCallback {
|
|||
public OpenWireConnection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
||||
public boolean isInternal() {
|
||||
return sessInfo.getSessionId().getValue() == -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,12 +27,17 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueReceiver;
|
||||
import javax.jms.QueueSender;
|
||||
import javax.jms.QueueSession;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicPublisher;
|
||||
import javax.jms.TopicSession;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import javax.jms.XAConnection;
|
||||
import javax.jms.XASession;
|
||||
import javax.transaction.xa.XAResource;
|
||||
|
@ -45,6 +50,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -492,6 +498,80 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTempTopicDelete() throws Exception {
|
||||
connection.start();
|
||||
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
|
||||
|
||||
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
|
||||
|
||||
try {
|
||||
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
|
||||
|
||||
TextMessage msg = newTopicSession.createTextMessage("Test Message");
|
||||
|
||||
publisher.publish(msg);
|
||||
|
||||
try {
|
||||
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
|
||||
fail("should have gotten exception but got consumer: " + consumer);
|
||||
} catch (JMSException ex) {
|
||||
//correct
|
||||
}
|
||||
|
||||
connection.close();
|
||||
|
||||
try {
|
||||
Message newMsg = newTopicSession.createMessage();
|
||||
publisher.publish(newMsg);
|
||||
} catch (JMSException e) {
|
||||
//ok
|
||||
}
|
||||
|
||||
} finally {
|
||||
newConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTempQueueDelete() throws Exception {
|
||||
connection.start();
|
||||
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
TemporaryQueue tempQueue = queueSession.createTemporaryQueue();
|
||||
|
||||
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
|
||||
try {
|
||||
QueueSession newQueueSession = newConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
QueueSender queueSender = newQueueSession.createSender(tempQueue);
|
||||
|
||||
Message msg = queueSession.createMessage();
|
||||
queueSender.send(msg);
|
||||
|
||||
try {
|
||||
QueueReceiver consumer = newQueueSession.createReceiver(tempQueue);
|
||||
fail("should have gotten exception but got consumer: " + consumer);
|
||||
} catch (JMSException ex) {
|
||||
//correct
|
||||
}
|
||||
|
||||
connection.close();
|
||||
|
||||
try {
|
||||
Message newMsg = newQueueSession.createMessage();
|
||||
queueSender.send(newMsg);
|
||||
} catch (JMSException e) {
|
||||
//ok
|
||||
}
|
||||
|
||||
} finally {
|
||||
newConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleTempTopic() throws Exception {
|
||||
connection.start();
|
||||
|
|
Loading…
Reference in New Issue