This closes #763

This commit is contained in:
Clebert Suconic 2016-09-09 09:49:12 -04:00
commit a18ad27841
4 changed files with 25 additions and 15 deletions

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -193,13 +194,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
public void createTemporaryQueue(String address, String queueName) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
}
@Override
public void createDurableQueue(String address, String queueName) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true);
public void createDurableQueue(String address, String queueName, String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
}
@Override
@ -404,7 +405,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
try {
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
serverSession.send(message, false);
// FIXME Potential race here...
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
@ -483,6 +487,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
try {

View File

@ -40,9 +40,9 @@ public interface AMQPSessionCallback {
void createTemporaryQueue(String queueName) throws Exception;
void createTemporaryQueue(String address, String queueName) throws Exception;
void createTemporaryQueue(String address, String queueName, String filter) throws Exception;
void createDurableQueue(String address, String queueName) throws Exception;
void createDurableQueue(String address, String queueName, String filter) throws Exception;
void offerProducerCredit(String address, int credits, int threshold, Receiver receiver);

View File

@ -18,6 +18,7 @@ package org.proton.plug.context.server;
import java.util.Map;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@ -40,6 +41,7 @@ import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AmqpSupport;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonContextSender;
import org.proton.plug.context.AbstractProtonSessionContext;
@ -114,6 +116,8 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
String selector = null;
String noLocalFilter = null;
/*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */
@ -130,6 +134,11 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
return;
}
}
if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
}
}
/*
@ -138,11 +147,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
* */
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
//filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
//if (filter != null) {
//todo implement nolocal filter
//}
if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
String clientId = connection.getRemoteContainer();
@ -195,14 +199,14 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
queue = clientId + ":" + pubId;
boolean exists = sessionSPI.queueQuery(queue);
if (!exists) {
sessionSPI.createDurableQueue(source.getAddress(), queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, noLocalFilter);
}
}
//otherwise we are a volatile subscription
else {
queue = java.util.UUID.randomUUID().toString();
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue);
sessionSPI.createTemporaryQueue(source.getAddress(), queue, noLocalFilter);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());

View File

@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void createDurableQueue(String address, String queueName) throws Exception {
public void createDurableQueue(String address, String queueName, String filter) throws Exception {
}
@ -81,7 +81,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void createTemporaryQueue(String address, String queueName) throws Exception {
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
}