Clean up the durable subscription unsubscribe handling to be in line
with the AMQP JMS mapping spec and switch to the QPid 0.32-SNAPSHOT
build for now to allow us to track and other changes we might want to
feed back there before release.
This commit is contained in:
Timothy Bish 2015-02-27 12:18:49 -05:00
parent 5667e4ddcc
commit 7af7c0143f
3 changed files with 120 additions and 71 deletions

View File

@ -30,10 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
@ -85,6 +87,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
@ -127,7 +130,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
@ -333,9 +335,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
processSessionEvent(event.getSession());
break;
case LINK_REMOTE_OPEN:
case LINK_REMOTE_CLOSE:
processLinkOpen(event.getLink());
break;
case LINK_REMOTE_DETACH:
processLinkEvent(event.getLink());
processLinkDetach(event.getLink());
break;
case LINK_REMOTE_CLOSE:
processLinkClose(event.getLink());
break;
case LINK_FLOW:
processLinkFlow(event.getLink());
@ -387,11 +393,20 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
protected void processLinkEvent(Link link) throws Exception {
EndpointState remoteState = link.getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
protected void processLinkOpen(Link link) throws Exception {
onLinkOpen(link);
} else if (remoteState == EndpointState.CLOSED) {
}
protected void processLinkDetach(Link link) throws Exception {
AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
if (context != null) {
context.onDetach();
}
link.detach();
link.free();
}
protected void processLinkClose(Link link) throws Exception {
AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
if (context != null) {
context.onClose();
@ -399,7 +414,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
link.close();
link.free();
}
}
protected void processSessionEvent(Session session) throws Exception {
EndpointState remoteState = session.getRemoteState();
@ -502,6 +516,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onDetach() throws Exception {
}
public void onClose() throws Exception {
}
@ -673,6 +690,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.anonymous = anonymous;
}
@Override
public String toString() {
return "ProducerContext { producerId = " + producerId + ", destination = " + destination + " }";
}
@Override
protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
if (!closed) {
@ -1026,6 +1048,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
@Override
public String toString() {
return "ConsumerContext { " + info + " }";
}
@Override
public void onDetach() throws Exception {
if (!closed) {
closed = true;
sender.setContext(null);
subscriptionsByConsumerId.remove(consumerId);
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
if (session != null) {
session.consumers.remove(info.getConsumerId());
}
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
}
}
@Override
public void onClose() throws Exception {
if (!closed) {
@ -1041,6 +1086,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
if (info.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
sendToActiveMQ(rsi, null);
}
}
}
@ -1339,57 +1393,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
ActiveMQDestination dest;
ActiveMQDestination destination;
if (source == null) {
// Attempt to recover previous subscription
destination = lookupSubscription(sender.getName());
if (destination != null) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress("");
source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
source.setAddress(destination.getQualifiedName());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
sender.setSource(source);
// Looks like durable sub removal.
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
consumerContext.closed = true;
sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
sender.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
} else if (exception instanceof InvalidDestinationException){
sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, exception.getMessage()));
} else {
sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
}
consumerContext.closed = true;
sender.setSource(null);
sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
sender.close();
sender.free();
} else {
sender.open();
}
pumpProtonToSocket();
}
});
return;
} else if (contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED)) {
consumerContext.closed = true;
sender.close();
pumpProtonToSocket();
return;
}
} else if (source.getDynamic()) {
// lets create a temp dest.
dest = createTempQueue();
destination = createTempQueue();
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(dest.getQualifiedName());
source.setAddress(destination.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
} else {
dest = createDestination(source);
destination = createDestination(source);
}
subscriptionsByConsumerId.put(id, consumerContext);
@ -1397,8 +1429,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerContext.info = consumerInfo;
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerContext.destination = dest;
consumerInfo.setDestination(destination);
consumerContext.destination = destination;
int senderCredit = sender.getRemoteCredit();
if (prefetch != 0) {
// use the value configured on the transport connector
@ -1419,11 +1451,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
consumerContext.credit = senderCredit;
consumerInfo.setDispatchAsync(true);
if (source.getDistributionMode() == COPY && dest.isQueue()) {
if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
}
if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
TerminusDurability.CONFIGURATION.equals(source.getDurable())) && dest.isTopic()) {
TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
consumerInfo.setSubscriptionName(sender.getName());
}
@ -1466,15 +1498,23 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
static private boolean contains(Symbol[] haystack, Symbol needle) {
if (haystack != null) {
for (Symbol capability : haystack) {
if (capability == needle) {
return true;
private ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
ActiveMQDestination result = null;
RegionBroker regionBroker;
try {
regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
} catch (Exception e) {
throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
}
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
if (subscription != null) {
result = subscription.getActiveMQDestination();
}
}
return false;
return result;
}
private ActiveMQDestination createTempQueue() {

View File

@ -378,6 +378,15 @@ public class TopicRegion extends AbstractRegion {
return inactiveDestinations;
}
public DurableTopicSubscription lookupSubscription(String subscriptionName, String clientId) {
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if (durableSubscriptions.containsKey(key)) {
return durableSubscriptions.get(key);
}
return null;
}
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}

View File

@ -105,7 +105,7 @@
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.8</qpid-proton-version>
<qpid-jms-version>0.30</qpid-jms-version>
<qpid-jms-version>0.32-SNAPSHOT</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-2</saxon-version>