This closes #773

This commit is contained in:
Clebert Suconic 2016-09-13 14:10:26 -04:00
commit 17430b9be1
4 changed files with 95 additions and 64 deletions

View File

@ -16,11 +16,14 @@
*/ */
package org.apache.activemq.artemis.core.protocol.proton.plug; package org.apache.activemq.artemis.core.protocol.proton.plug;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
@ -58,6 +61,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext; import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult; import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender; import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.proton.plug.sasl.PlainSASLResult; import org.proton.plug.sasl.PlainSASLResult;
@ -204,56 +208,39 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
} }
@Override @Override
public boolean queueQuery(String queueName) throws Exception { public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception {
boolean queryResult = false; QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues() && autoCreate) {
try {
if (queueQuery.isExists()) {
queryResult = true;
}
else {
if (queueQuery.isAutoCreateJmsQueues()) {
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true);
queryResult = true; queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(), true);
} }
else { catch (ActiveMQQueueExistsException e) {
queryResult = false; // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }
} }
return queueQueryResult;
return queryResult;
} }
@Override @Override
public boolean bindingQuery(String address) throws Exception { public boolean bindingQuery(String address) throws Exception {
boolean queryResult = false;
BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address)); BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
return queueQuery.isExists();
if (queueQuery.isExists()) {
queryResult = true;
}
else {
if (queueQuery.isAutoCreateJmsQueues()) {
serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
queryResult = true;
}
else {
queryResult = false;
}
}
return queryResult;
} }
@Override @Override
public void closeSender(final Object brokerConsumer) throws Exception { public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
((ServerConsumer) brokerConsumer).close(false); consumer.close(false);
latch.countDown();
} }
catch (Exception e) { catch (Exception e) {
} }
@ -271,6 +258,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
else { else {
runnable.run(); runnable.run();
} }
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
}
} }
@Override @Override
@ -459,8 +453,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
} }
@Override @Override
public void deleteQueue(String address) throws Exception { public void deleteQueue(String queueName) throws Exception {
manager.getServer().destroyQueue(new SimpleString(address)); manager.getServer().destroyQueue(new SimpleString(queueName));
} }
private void resetContext() { private void resetContext() {
@ -540,5 +534,4 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
return false; return false;
} }
} }
} }

View File

@ -17,6 +17,7 @@
package org.proton.plug; package org.proton.plug;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
@ -48,7 +49,16 @@ public interface AMQPSessionCallback {
void deleteQueue(String address) throws Exception; void deleteQueue(String address) throws Exception;
boolean queueQuery(String queueName) throws Exception; /**
* Returns true if a queue is found with matching name, if autoCreate=true and autoCreateJMSQueues is switched on then
* this method will auto create the queue, with name=queueName, address=queueName, filter=null.
*
* @param queueName
* @param autoCreate
* @return
* @throws Exception
*/
QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception;
boolean bindingQuery(String address) throws Exception; boolean bindingQuery(String address) throws Exception;

View File

@ -17,7 +17,10 @@
package org.proton.plug.context.server; package org.proton.plug.context.server;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.selector.impl.SelectorParser;
@ -47,6 +50,7 @@ import org.proton.plug.context.AbstractProtonContextSender;
import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.context.ProtonPlugSender; import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException; import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException; import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException;
import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle; import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
@ -116,8 +120,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
String selector = null; String selector = null;
String noLocalFilter = null;
/* /*
* even tho the filter is a map it will only return a single filter unless a nolocal is also provided * even tho the filter is a map it will only return a single filter unless a nolocal is also provided
* */ * */
@ -134,11 +136,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
return; return;
} }
} }
if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
}
} }
/* /*
@ -147,12 +144,25 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
* */ * */
boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
if (isPubSub) {
if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
selector += " AND " + noLocalFilter;
}
else {
selector = noLocalFilter;
}
}
}
if (source == null) { if (source == null) {
// Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = clientId + ":" + pubId; queue = clientId + ":" + pubId;
boolean exists = sessionSPI.queueQuery(queue); boolean exists = sessionSPI.queueQuery(queue, false).isExists();
/* /*
* If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
@ -188,8 +198,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
else { else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this. //be a queue bound to it so we nee to check this.
if (isPubSub) { if (isPubSub) {
// if we are a subscription and durable create a durable queue using the container id and link name // if we are a subscription and durable create a durable queue using the container id and link name
if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
@ -197,23 +205,39 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = clientId + ":" + pubId; queue = clientId + ":" + pubId;
boolean exists = sessionSPI.queueQuery(queue); QueueQueryResult result = sessionSPI.queueQuery(queue, false);
if (!exists) {
sessionSPI.createDurableQueue(source.getAddress(), queue, noLocalFilter); if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local filter value, selector
// or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
(sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
}
else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
}
}
} }
else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
}
source.setAddress(queue);
} }
//otherwise we are a volatile subscription //otherwise we are a volatile subscription
else { else {
queue = java.util.UUID.randomUUID().toString(); queue = java.util.UUID.randomUUID().toString();
try { try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, noLocalFilter); sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
} }
catch (Exception e) { catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }
source.setAddress(queue); source.setAddress(queue);
} }
} }
else { else {
queue = source.getAddress(); queue = source.getAddress();
@ -223,7 +247,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
} }
try { try {
if (!sessionSPI.queueQuery(queue)) { if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
} }
} }
@ -237,7 +261,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try { try {
brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly); brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
} }
catch (Exception e) { catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
@ -250,7 +274,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix); return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
} }
/* /*
* close the session * close the session
* */ * */
@ -276,20 +299,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
sessionSPI.closeSender(brokerConsumer); sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
// say pub subs // say pub subs
if (remoteLinkClose ) { if (remoteLinkClose) {
Source source = (Source)sender.getSource(); Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
String address = source.getAddress(); String queueName = source.getAddress();
boolean exists = sessionSPI.queueQuery(address); QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
if (exists) { if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(address); sessionSPI.deleteQueue(queueName);
} }
else { else {
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
String queue = clientId + ":" + pubId; String queue = clientId + ":" + pubId;
exists = sessionSPI.queueQuery(queue); result = sessionSPI.queueQuery(queue, false);
if (exists) { if (result.isExists()) {
if (result.getConsumerCount() > 0) {
System.out.println("error");
}
sessionSPI.deleteQueue(queue); sessionSPI.deleteQueue(queue);
} }
} }

View File

@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
@ -100,8 +102,8 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
} }
@Override @Override
public boolean queueQuery(String queueName) { public QueueQueryResult queueQuery(String queueName, boolean autoCreate) {
return true; return new QueueQueryResult(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), false, false, null, 0, 0, false);
} }
@Override @Override