Support for temporary topic delete
This commit is contained in:
Timothy Bish 2015-03-03 15:55:28 -05:00
parent 2ec586f267
commit ad57cc6fcb
2 changed files with 89 additions and 33 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -38,6 +39,7 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
@ -517,12 +519,21 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
protected ActiveMQDestination destination;
protected List<Runnable> closeActions = new ArrayList<Runnable>();
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onDetach() throws Exception {
}
public void onClose() throws Exception {
for (Runnable action : closeActions) {
action.run();
}
closeActions.clear();
}
public void drainCheck() {
@ -531,6 +542,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
abstract void doCommit() throws Exception;
abstract void doRollback() throws Exception;
public void addCloseAction(Runnable action) {
closeActions.add(action);
}
public ActiveMQDestination getDestination() {
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
}
private void onConnectionOpen() throws AmqpProtocolException {
@ -683,14 +706,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
class ProducerContext extends BaseProducerContext {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
private boolean closed;
private final boolean anonymous;
private boolean anonymous;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
public ProducerContext(ProducerId producerId) {
this.producerId = producerId;
this.destination = destination;
this.anonymous = anonymous;
}
@Override
@ -797,6 +817,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (!closed) {
sendToActiveMQ(new RemoveInfo(producerId), null);
}
super.onClose();
}
public void close() {
@ -914,6 +936,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
// Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
int flow = producerCredit;
try {
if (remoteTarget instanceof Coordinator) {
pumpProtonToSocket();
@ -924,29 +947,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
final ProducerContext producerContext = new ProducerContext(producerId);
ActiveMQDestination destination = null;
boolean anonymous = false;
String targetNodeName = target.getAddress();
if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic()) {
anonymous = true;
producerContext.anonymous = true;
} else if (target.getDynamic()) {
destination = createTemporaryDestination(receiver, target.getCapabilities());
Target actualTarget = new Target();
actualTarget.setAddress(destination.getQualifiedName());
actualTarget.setDynamic(true);
receiver.setTarget(actualTarget);
producerContext.addCloseAction(new Runnable() {
@Override
public void run() {
deleteTemporaryDestination((ActiveMQTempDestination) producerContext.getDestination());
}
});
} else {
destination = createDestination(remoteTarget);
}
final ProducerContext producerContext = new ProducerContext(producerId, destination, anonymous);
receiver.setContext(producerContext);
receiver.flow(flow);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(destination);
producerContext.setDestination(destination);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
@ -1005,7 +1034,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private boolean closed;
public ConsumerInfo info;
private boolean endOfBrowse = false;
public ActiveMQDestination destination;
public int credit;
public int consumerPrefetch = 0;
private long lastDeliveredSequenceId;
@ -1068,28 +1096,32 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
@Override
public void onClose() throws Exception {
if (!closed) {
closed = true;
sender.setContext(null);
subscriptionsByConsumerId.remove(consumerId);
try {
if (!closed) {
closed = true;
sender.setContext(null);
subscriptionsByConsumerId.remove(consumerId);
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
if (session != null) {
session.consumers.remove(info.getConsumerId());
}
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
if (info.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
sendToActiveMQ(rsi, null);
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
if (session != null) {
session.consumers.remove(info.getConsumerId());
}
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
if (info.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
sendToActiveMQ(rsi, null);
}
}
} finally {
super.onClose();
}
}
@ -1415,6 +1447,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
source.setAddress(destination.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
consumerContext.addCloseAction(new Runnable() {
@Override
public void run() {
deleteTemporaryDestination((ActiveMQTempDestination) consumerContext.getDestination());
}
});
} else {
destination = createDestination(source);
}
@ -1425,7 +1464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);
consumerContext.destination = destination;
consumerContext.setDestination(destination);
int senderCredit = sender.getRemoteCredit();
if (prefetch != 0) {
// use the value configured on the transport connector
@ -1551,6 +1590,24 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
return rc;
}
private void deleteTemporaryDestination(ActiveMQTempDestination destination) {
DestinationInfo info = new DestinationInfo();
info.setConnectionId(connectionId);
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
sendToActiveMQ(info, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
}
}
});
}
private boolean contains(Symbol[] symbols, Symbol key) {
if (symbols == null) {
return false;

View File

@ -1067,7 +1067,6 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Ignore("Broker cannot currently tell if it should delete a temp destination")
@Test(timeout=30000)
public void testDeleteTemporaryQueue() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
@ -1112,7 +1111,7 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Ignore("Broker cannot currently tell if it should delete a temp destination")
@Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
@Test(timeout=30000)
public void testDeleteTemporaryTopic() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();