Fix openwire unit tests

Fix JmsRedeliveredTest and add processing of RemoveSubscriptionInfo command
This commit is contained in:
Howard Gao 2015-08-03 15:11:25 +08:00 committed by Clebert Suconic
parent d9ba65c01c
commit 65e9ec01b8
3 changed files with 20 additions and 5 deletions

View File

@ -258,7 +258,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|| command.getClass() == MessageAck.class
|| command.getClass() == TransactionInfo.class
|| command.getClass() == DestinationInfo.class
|| command.getClass() == ShutdownInfo.class)
|| command.getClass() == ShutdownInfo.class
|| command.getClass() == RemoveSubscriptionInfo.class)
{
Response response = null;
@ -1706,9 +1707,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
}
@Override
public Response processRemoveSubscription(RemoveSubscriptionInfo arg0) throws Exception
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception
{
throw new IllegalStateException("not implemented! ");
protocolManager.removeSubscription(subInfo);
return null;
}
@Override

View File

@ -57,6 +57,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@ -856,4 +857,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
}
}
public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception
{
SimpleString subQueueName = new SimpleString(
org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(
true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
}
}

View File

@ -32,6 +32,7 @@ import javax.jms.Topic;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
@ -426,8 +427,7 @@ public class JmsRedeliveredTest extends TestCase {
}
});
// whack the connection - like a rebalance or tcp drop
((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
connection.close();
session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
@ -436,6 +436,7 @@ public class JmsRedeliveredTest extends TestCase {
msg.acknowledge();
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.commit();
session.close();
keepBrokerAliveConnection.close();
}
@ -460,6 +461,7 @@ public class JmsRedeliveredTest extends TestCase {
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.commit();
session.close();
}
@ -483,6 +485,8 @@ public class JmsRedeliveredTest extends TestCase {
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.commit();
session.close();
}