This commit is contained in:
Clebert Suconic 2017-02-21 22:21:27 -05:00
commit dfdc4ff750
5 changed files with 134 additions and 6 deletions

View File

@ -731,6 +731,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
if (dest.isTemporary()) {
//Openwire needs to store the DestinationInfo in order to send
//Advisory messages to clients
this.state.addTempDestination(info);
}
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext();
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
@ -773,6 +779,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
amqSession.start();
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
//advisory for temp destinations
if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
// Replay the temporary destinations.
List<DestinationInfo> tmpDests = this.protocolManager.getTemporaryDestinations();
for (DestinationInfo di : tmpDests) {
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(di.getDestination());
String originalConnectionId = di.getConnectionId().getValue();
protocolManager.fireAdvisory(context, topic, di, info.getConsumerId(), originalConnectionId);
}
}
}
}
}
@ -816,6 +835,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public void tempQueueDeleted(SimpleString bindingName) {
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
state.removeTempDestination(dest);
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext();
@ -846,6 +866,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.noLocal = noLocal;
}
public List<DestinationInfo> getTemporaryDestinations() {
return state.getTempDestinations();
}
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override
@ -856,7 +880,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
try {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId());
protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -46,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@ -64,6 +66,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@ -333,7 +336,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
this.fireAdvisory(context, topic, copy, null);
this.fireAdvisory(context, topic, copy, null, null);
}
public BrokerId getBrokerId() {
@ -350,8 +353,14 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void fireAdvisory(AMQConnectionContext context,
ActiveMQTopic topic,
Command command,
ConsumerId targetConsumerId) throws Exception {
ConsumerId targetConsumerId,
String originalConnectionId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
if (originalConnectionId == null) {
originalConnectionId = context.getConnectionId().getValue();
}
advisoryMessage.setStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), originalConnectionId);
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
@ -581,4 +590,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}
public List<DestinationInfo> getTemporaryDestinations() {
List<DestinationInfo> total = new ArrayList<>();
for (OpenWireConnection connection : connections) {
total.addAll(connection.getTemporaryDestinations());
}
return total;
}
}

View File

@ -215,7 +215,9 @@ public class AMQConsumer {
return 0;
}
if (session.getConnection().isNoLocal()) {
if (session.getConnection().isNoLocal() || session.isInternal()) {
//internal session always delivers messages to noLocal advisory consumers
//so we need to remove this property too.
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);

View File

@ -298,8 +298,9 @@ public class AMQSession implements SessionCallback {
ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
if (connection.isNoLocal() || sessInfo.getSessionId().getValue() == -1) {
//Internal session is used to send advisory messages, which are always noLocal
if (connection.isNoLocal()) {
//Note: advisory messages are dealt with in
//OpenWireProtocolManager#fireAdvisory
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getConnectionId().getValue());
}
@ -447,4 +448,8 @@ public class AMQSession implements SessionCallback {
public OpenWireConnection getConnection() {
return connection;
}
public boolean isInternal() {
return sessInfo.getSessionId().getValue() == -1;
}
}

View File

@ -27,12 +27,17 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
@ -45,6 +50,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -492,6 +498,80 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
session.close();
}
@Test
public void testTempTopicDelete() throws Exception {
connection.start();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
TextMessage msg = newTopicSession.createTextMessage("Test Message");
publisher.publish(msg);
try {
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newTopicSession.createMessage();
publisher.publish(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
@Test
public void testTempQueueDelete() throws Exception {
connection.start();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = queueSession.createTemporaryQueue();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
QueueSession newQueueSession = newConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = newQueueSession.createSender(tempQueue);
Message msg = queueSession.createMessage();
queueSender.send(msg);
try {
QueueReceiver consumer = newQueueSession.createReceiver(tempQueue);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newQueueSession.createMessage();
queueSender.send(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
@Test
public void testSimpleTempTopic() throws Exception {
connection.start();