This commit is contained in:
Clebert Suconic 2017-12-20 15:59:36 -05:00
commit bd8ec582b1
3 changed files with 3 additions and 9 deletions

View File

@ -95,7 +95,8 @@ public class AMQConsumer {
} else { } else {
preAck = true; preAck = true;
} }
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + info.getClientId() + "'"; String id = info.getClientId() != null ? info.getClientId() : this.getId().getConnectionId();
String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + id + "'";
if (selector == null) { if (selector == null) {
selector = new SimpleString(noLocalSelector); selector = new SimpleString(noLocalSelector);
} else { } else {

View File

@ -317,11 +317,7 @@ public class AMQSession implements SessionCallback {
org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend); org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
if (connection.isNoLocal()) { originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
//Note: advisory messages are dealt with in
//OpenWireProtocolManager#fireAdvisory
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getConnectionId().getValue());
}
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to

View File

@ -26,7 +26,6 @@ import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
@ -147,7 +146,6 @@ public class NoLocalSubscriberTest extends JMSTestBase {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(topic); MessageProducer messageProducer = session.createProducer(topic);
TextMessage textMessage = session.createTextMessage("M3"); TextMessage textMessage = session.createTextMessage("M3");
textMessage.setStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), clientID);
messageProducer.send(textMessage); messageProducer.send(textMessage);
connection.close(); connection.close();
} }
@ -161,7 +159,6 @@ public class NoLocalSubscriberTest extends JMSTestBase {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(topic); MessageProducer messageProducer = session.createProducer(topic);
TextMessage textMessage = session.createTextMessage("M4"); TextMessage textMessage = session.createTextMessage("M4");
textMessage.setStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), clientID + "_different");
messageProducer.send(textMessage); messageProducer.send(textMessage);
connection.close(); connection.close();
} }