diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 107df8a617..c3ac671980 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -193,13 +194,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se } @Override - public void createTemporaryQueue(String address, String queueName) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); + public void createTemporaryQueue(String address, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); } @Override - public void createDurableQueue(String address, String queueName) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true); + public void createDurableQueue(String address, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); } @Override @@ -404,7 +405,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception { try { + + message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer()); serverSession.send(message, false); + // FIXME Potential race here... manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override @@ -483,6 +487,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se @Override public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()); + ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); try { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index 459931886c..8406431201 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -40,9 +40,9 @@ public interface AMQPSessionCallback { void createTemporaryQueue(String queueName) throws Exception; - void createTemporaryQueue(String address, String queueName) throws Exception; + void createTemporaryQueue(String address, String queueName, String filter) throws Exception; - void createDurableQueue(String address, String queueName) throws Exception; + void createDurableQueue(String address, String queueName, String filter) throws Exception; void offerProducerCredit(String address, int credits, int threshold, Receiver receiver); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index 564c9ba5af..78b1668967 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -18,6 +18,7 @@ package org.proton.plug.context.server; import java.util.Map; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.qpid.proton.amqp.DescribedType; @@ -40,6 +41,7 @@ import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.message.ProtonJMessage; import org.jboss.logging.Logger; import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AmqpSupport; import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonContextSender; import org.proton.plug.context.AbstractProtonSessionContext; @@ -114,6 +116,8 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple String selector = null; + String noLocalFilter = null; + /* * even tho the filter is a map it will only return a single filter unless a nolocal is also provided * */ @@ -130,6 +134,11 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple return; } } + + if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) { + String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); + noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; + } } /* @@ -138,11 +147,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple * */ boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); - //filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); - - //if (filter != null) { - //todo implement nolocal filter - //} if (source == null) { // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue String clientId = connection.getRemoteContainer(); @@ -195,14 +199,14 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple queue = clientId + ":" + pubId; boolean exists = sessionSPI.queueQuery(queue); if (!exists) { - sessionSPI.createDurableQueue(source.getAddress(), queue); + sessionSPI.createDurableQueue(source.getAddress(), queue, noLocalFilter); } } //otherwise we are a volatile subscription else { queue = java.util.UUID.randomUUID().toString(); try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue); + sessionSPI.createTemporaryQueue(source.getAddress(), queue, noLocalFilter); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index 7397612ce0..0701b17c28 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void createDurableQueue(String address, String queueName) throws Exception { + public void createDurableQueue(String address, String queueName, String filter) throws Exception { } @@ -81,7 +81,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } @Override - public void createTemporaryQueue(String address, String queueName) throws Exception { + public void createTemporaryQueue(String address, String queueName, String filter) throws Exception { }