ARTEMIS-724 Implement no-local consumer filter AMQP
This commit is contained in:
parent
c084293374
commit
553f2df745
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
|
|||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
|
@ -193,13 +194,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createTemporaryQueue(String address, String queueName) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
|
||||
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDurableQueue(String address, String queueName) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
|
||||
public void createDurableQueue(String address, String queueName, String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -404,7 +405,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
|
||||
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
|
||||
try {
|
||||
|
||||
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
|
||||
serverSession.send(message, false);
|
||||
|
||||
// FIXME Potential race here...
|
||||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
|
@ -483,6 +487,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
@Override
|
||||
public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
|
||||
|
||||
message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
|
||||
|
||||
ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
|
||||
|
||||
try {
|
||||
|
|
|
@ -40,9 +40,9 @@ public interface AMQPSessionCallback {
|
|||
|
||||
void createTemporaryQueue(String queueName) throws Exception;
|
||||
|
||||
void createTemporaryQueue(String address, String queueName) throws Exception;
|
||||
void createTemporaryQueue(String address, String queueName, String filter) throws Exception;
|
||||
|
||||
void createDurableQueue(String address, String queueName) throws Exception;
|
||||
void createDurableQueue(String address, String queueName, String filter) throws Exception;
|
||||
|
||||
void offerProducerCredit(String address, int credits, int threshold, Receiver receiver);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.proton.plug.context.server;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.qpid.proton.engine.Sender;
|
|||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.proton.plug.AMQPSessionCallback;
|
||||
import org.proton.plug.AmqpSupport;
|
||||
import org.proton.plug.context.AbstractConnectionContext;
|
||||
import org.proton.plug.context.AbstractProtonContextSender;
|
||||
import org.proton.plug.context.AbstractProtonSessionContext;
|
||||
|
@ -114,6 +116,8 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
|
||||
String selector = null;
|
||||
|
||||
String noLocalFilter = null;
|
||||
|
||||
/*
|
||||
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
|
||||
* */
|
||||
|
@ -130,6 +134,11 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
|
||||
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
|
||||
noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -138,11 +147,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
* */
|
||||
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
|
||||
|
||||
//filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
|
||||
|
||||
//if (filter != null) {
|
||||
//todo implement nolocal filter
|
||||
//}
|
||||
if (source == null) {
|
||||
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
|
||||
String clientId = connection.getRemoteContainer();
|
||||
|
@ -195,14 +199,14 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
|||
queue = clientId + ":" + pubId;
|
||||
boolean exists = sessionSPI.queueQuery(queue);
|
||||
if (!exists) {
|
||||
sessionSPI.createDurableQueue(source.getAddress(), queue);
|
||||
sessionSPI.createDurableQueue(source.getAddress(), queue, noLocalFilter);
|
||||
}
|
||||
}
|
||||
//otherwise we are a volatile subscription
|
||||
else {
|
||||
queue = java.util.UUID.randomUUID().toString();
|
||||
try {
|
||||
sessionSPI.createTemporaryQueue(source.getAddress(), queue);
|
||||
sessionSPI.createTemporaryQueue(source.getAddress(), queue, noLocalFilter);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
|
||||
|
|
|
@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createDurableQueue(String address, String queueName) throws Exception {
|
||||
public void createDurableQueue(String address, String queueName, String filter) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createTemporaryQueue(String address, String queueName) throws Exception {
|
||||
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue