ARTEMIS-1486 Core client should be notified if consumer is closed on broker side
This commit is contained in:
parent
8703d9d51d
commit
61ce7a7454
|
@ -1621,6 +1621,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
for (ServerConsumer serverConsumer : serverConsumers) {
|
||||
if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
|
||||
serverConsumer.close(true);
|
||||
serverConsumer.disconnect();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
|
@ -1978,6 +1980,36 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseConsumer() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString name = RandomUtil.randomSimpleString();
|
||||
boolean durable = true;
|
||||
|
||||
ActiveMQServerControl serverControl = createManagementControl();
|
||||
|
||||
checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
|
||||
serverControl.createAddress(address.toString(), "ANYCAST");
|
||||
serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
|
||||
|
||||
ServerLocator receiveLocator = createInVMNonHALocator();
|
||||
ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
|
||||
ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
|
||||
final ClientConsumer consumer = receiveClientSession.createConsumer(name);
|
||||
final ClientProducer producer = receiveClientSession.createProducer(name);
|
||||
|
||||
ServerSession ss = server.getSessions().iterator().next();
|
||||
ServerConsumer sc = ss.getServerConsumers().iterator().next();
|
||||
|
||||
producer.send(receiveClientSession.createMessage(true));
|
||||
consumer.receive(1000);
|
||||
|
||||
Assert.assertFalse(consumer.isClosed());
|
||||
serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
|
||||
Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
|
||||
Assert.assertTrue(consumer.isClosed());
|
||||
}
|
||||
|
||||
protected void scaleDown(ScaleDownHandler handler) throws Exception {
|
||||
SimpleString address = new SimpleString("testQueue");
|
||||
HashMap<String, Object> params = new HashMap<>();
|
||||
|
|
Loading…
Reference in New Issue