This commit is contained in:
Martyn Taylor 2017-11-02 11:19:35 +00:00
commit 5997e21ec7
2 changed files with 33 additions and 0 deletions

View File

@ -1621,6 +1621,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (ServerConsumer serverConsumer : serverConsumers) {
if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
serverConsumer.close(true);
serverConsumer.disconnect();
return true;
}
}

View File

@ -57,6 +57,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@ -1978,6 +1980,36 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
}
@Test
public void testCloseConsumer() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
boolean durable = true;
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
serverControl.createAddress(address.toString(), "ANYCAST");
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
ServerLocator receiveLocator = createInVMNonHALocator();
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
final ClientConsumer consumer = receiveClientSession.createConsumer(name);
final ClientProducer producer = receiveClientSession.createProducer(name);
ServerSession ss = server.getSessions().iterator().next();
ServerConsumer sc = ss.getServerConsumers().iterator().next();
producer.send(receiveClientSession.createMessage(true));
consumer.receive(1000);
Assert.assertFalse(consumer.isClosed());
serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
Assert.assertTrue(consumer.isClosed());
}
protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>();