diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6c6bff312b..674055b04a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -258,7 +258,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor || command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class - || command.getClass() == ShutdownInfo.class) + || command.getClass() == ShutdownInfo.class + || command.getClass() == RemoveSubscriptionInfo.class) { Response response = null; @@ -1706,9 +1707,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } @Override - public Response processRemoveSubscription(RemoveSubscriptionInfo arg0) throws Exception + public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - throw new IllegalStateException("not implemented! "); + protocolManager.removeSubscription(subInfo); + return null; } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index fcdff057a4..8e7d31cd4a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -57,6 +57,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; @@ -856,4 +857,12 @@ public class OpenWireProtocolManager implements ProtocolManager, No fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId()); } } + + public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception + { + SimpleString subQueueName = new SimpleString( + org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription( + true, subInfo.getClientId(), subInfo.getSubscriptionName())); + server.destroyQueue(subQueueName); + } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java index 8a2cd62576..07e8eebce9 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java @@ -32,6 +32,7 @@ import javax.jms.Topic; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.util.Wait; @@ -426,8 +427,7 @@ public class JmsRedeliveredTest extends TestCase { } }); - // whack the connection - like a rebalance or tcp drop - ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop(); + connection.close(); session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(queue); @@ -436,6 +436,7 @@ public class JmsRedeliveredTest extends TestCase { msg.acknowledge(); assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.commit(); session.close(); keepBrokerAliveConnection.close(); } @@ -460,6 +461,7 @@ public class JmsRedeliveredTest extends TestCase { assertNotNull(msg); assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.commit(); session.close(); } @@ -483,6 +485,8 @@ public class JmsRedeliveredTest extends TestCase { assertNotNull(msg); assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + + session.commit(); session.close(); }