ARTEMIS-706 No Keep Alives from Broker
This commit is contained in:
parent
ea293fc5bd
commit
61747acfd1
|
@ -31,6 +31,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
|
@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||||
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
@ -122,6 +124,8 @@ import org.apache.activemq.wireformat.WireFormat;
|
||||||
*/
|
*/
|
||||||
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
|
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
|
||||||
|
|
||||||
|
private static final KeepAliveInfo PING = new KeepAliveInfo();
|
||||||
|
|
||||||
private final OpenWireProtocolManager protocolManager;
|
private final OpenWireProtocolManager protocolManager;
|
||||||
|
|
||||||
private boolean destroyed = false;
|
private boolean destroyed = false;
|
||||||
|
@ -167,6 +171,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
*/
|
*/
|
||||||
private ServerSession internalSession;
|
private ServerSession internalSession;
|
||||||
|
|
||||||
|
private volatile long lastSent = -1;
|
||||||
|
private ConnectionEntry connectionEntry;
|
||||||
|
private boolean useKeepAlive;
|
||||||
|
private long maxInactivityDuration;
|
||||||
|
|
||||||
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
|
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
|
||||||
public OpenWireConnection(Connection connection,
|
public OpenWireConnection(Connection connection,
|
||||||
ActiveMQServer server,
|
ActiveMQServer server,
|
||||||
|
@ -177,6 +186,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.protocolManager = openWireProtocolManager;
|
this.protocolManager = openWireProtocolManager;
|
||||||
this.wireFormat = wf;
|
this.wireFormat = wf;
|
||||||
|
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
|
||||||
|
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
|
||||||
}
|
}
|
||||||
|
|
||||||
// SecurityAuth implementation
|
// SecurityAuth implementation
|
||||||
|
@ -216,6 +227,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//tells the connection that
|
||||||
|
//some bytes just sent
|
||||||
|
public void bufferSent() {
|
||||||
|
lastSent = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||||
super.bufferReceived(connectionID, buffer);
|
super.bufferReceived(connectionID, buffer);
|
||||||
|
@ -226,18 +243,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
boolean responseRequired = command.isResponseRequired();
|
boolean responseRequired = command.isResponseRequired();
|
||||||
int commandId = command.getCommandId();
|
int commandId = command.getCommandId();
|
||||||
|
|
||||||
// TODO: the server should send packets to the client based on the requested times
|
// ignore pings
|
||||||
|
if (command.getClass() != KeepAliveInfo.class) {
|
||||||
// the connection handles pings, negotiations directly.
|
|
||||||
// and delegate all other commands to manager.
|
|
||||||
if (command.getClass() == KeepAliveInfo.class) {
|
|
||||||
KeepAliveInfo info = (KeepAliveInfo) command;
|
|
||||||
info.setResponseRequired(false);
|
|
||||||
// if we don't respond to KeepAlive commands then the client will think the server is dead and timeout
|
|
||||||
// for some reason KeepAliveInfo.isResponseRequired() is always false
|
|
||||||
sendCommand(info);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Response response = null;
|
Response response = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -345,16 +352,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkDataReceived() {
|
public void flush() {
|
||||||
boolean res = dataReceived;
|
checkInactivity();
|
||||||
|
|
||||||
dataReceived = false;
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void checkInactivity() {
|
||||||
public void flush() {
|
if (!this.useKeepAlive) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long dur = System.currentTimeMillis() - lastSent;
|
||||||
|
if (dur >= this.maxInactivityDuration / 2) {
|
||||||
|
this.sendCommand(PING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callFailureListeners(final ActiveMQException me) {
|
private void callFailureListeners(final ActiveMQException me) {
|
||||||
|
@ -390,6 +400,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
synchronized (sendLock) {
|
synchronized (sendLock) {
|
||||||
getTransportConnection().write(buffer, false, false);
|
getTransportConnection().write(buffer, false, false);
|
||||||
}
|
}
|
||||||
|
bufferSent();
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -508,6 +519,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shutdown(boolean fail) {
|
private void shutdown(boolean fail) {
|
||||||
|
|
||||||
if (fail) {
|
if (fail) {
|
||||||
transportConnection.forceClose();
|
transportConnection.forceClose();
|
||||||
}
|
}
|
||||||
|
@ -521,6 +533,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
if (context == null || destroyed) {
|
if (context == null || destroyed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't allow things to be added to the connection state while we
|
// Don't allow things to be added to the connection state while we
|
||||||
// are shutting down.
|
// are shutting down.
|
||||||
// is it necessary? even, do we need state at all?
|
// is it necessary? even, do we need state at all?
|
||||||
|
@ -558,6 +571,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fail(ActiveMQException me, String message) {
|
public void fail(ActiveMQException me, String message) {
|
||||||
|
|
||||||
if (me != null) {
|
if (me != null) {
|
||||||
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
|
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
|
||||||
}
|
}
|
||||||
|
@ -742,6 +756,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setConnectionEntry(ConnectionEntry connectionEntry) {
|
||||||
|
this.connectionEntry = connectionEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) {
|
||||||
|
this.useKeepAlive = useKeepAlive;
|
||||||
|
this.maxInactivityDuration = inactivityDuration;
|
||||||
|
|
||||||
|
protocolManager.getScheduledPool().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (inactivityDuration >= 0) {
|
||||||
|
connectionEntry.ttl = inactivityDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
|
||||||
|
checkInactivity();
|
||||||
|
}
|
||||||
|
|
||||||
class SlowConsumerDetection implements SlowConsumerDetectionListener {
|
class SlowConsumerDetection implements SlowConsumerDetectionListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1025,6 +1058,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
wireFormat.renegotiateWireFormat(command);
|
wireFormat.renegotiateWireFormat(command);
|
||||||
//throw back a brokerInfo here
|
//throw back a brokerInfo here
|
||||||
protocolManager.sendBrokerInfo(OpenWireConnection.this);
|
protocolManager.sendBrokerInfo(OpenWireConnection.this);
|
||||||
|
protocolManager.setUpInactivityParams(OpenWireConnection.this, command);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||||
|
|
||||||
import javax.jms.InvalidClientIDException;
|
import javax.jms.InvalidClientIDException;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -110,6 +111,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
private boolean updateClusterClients = false;
|
private boolean updateClusterClients = false;
|
||||||
private boolean updateClusterClientsOnRemove = false;
|
private boolean updateClusterClientsOnRemove = false;
|
||||||
|
|
||||||
|
//http://activemq.apache.org/activemq-inactivitymonitor.html
|
||||||
|
private long maxInactivityDuration = 30 * 1000L;
|
||||||
|
private long maxInactivityDurationInitalDelay = 10 * 1000L;
|
||||||
|
private boolean useKeepAlive = true;
|
||||||
|
|
||||||
private final OpenWireMessageConverter messageConverter;
|
private final OpenWireMessageConverter messageConverter;
|
||||||
|
|
||||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
||||||
|
@ -217,8 +223,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
|
OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
|
||||||
owConn.sendHandshake();
|
owConn.sendHandshake();
|
||||||
|
|
||||||
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
|
//first we setup ttl to -1
|
||||||
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
|
//then when negotiation, we handle real ttl and delay
|
||||||
|
ConnectionEntry entry = new ConnectionEntry(owConn, null, System.currentTimeMillis(), -1);
|
||||||
|
owConn.setConnectionEntry(entry);
|
||||||
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -475,6 +484,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
connection.dispatch(brokerInfo);
|
connection.dispatch(brokerInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setUpInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException {
|
||||||
|
long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration();
|
||||||
|
long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay();
|
||||||
|
boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive;
|
||||||
|
connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* URI property
|
* URI property
|
||||||
*/
|
*/
|
||||||
|
@ -523,4 +539,30 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
this.brokerName = name;
|
this.brokerName = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUseKeepAlive() {
|
||||||
|
return useKeepAlive;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public void setUseKeepAlive(boolean useKeepAlive) {
|
||||||
|
this.useKeepAlive = useKeepAlive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxInactivityDuration() {
|
||||||
|
return maxInactivityDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxInactivityDuration(long maxInactivityDuration) {
|
||||||
|
this.maxInactivityDuration = maxInactivityDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public long getMaxInactivityDurationInitalDelay() {
|
||||||
|
return maxInactivityDurationInitalDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
|
||||||
|
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,6 +136,34 @@ Currently we support Apache ActiveMQ Artemis clients that using standard JMS API
|
||||||
the future we will get more supports for some advanced, Apache ActiveMQ Artemis
|
the future we will get more supports for some advanced, Apache ActiveMQ Artemis
|
||||||
specific features into Apache ActiveMQ Artemis.
|
specific features into Apache ActiveMQ Artemis.
|
||||||
|
|
||||||
|
### Connection Monitoring
|
||||||
|
|
||||||
|
OpenWire has a few paramters to control how each connection is monitored, they are:
|
||||||
|
|
||||||
|
* maxInactivityDuration:
|
||||||
|
It specifies the time (milliseconds) after which the connection is closed by the broker if no data was received.
|
||||||
|
Default value is 30000.
|
||||||
|
|
||||||
|
* maxInactivityDurationInitalDelay:
|
||||||
|
It specifies the maximum delay (milliseconds) before inactivity monitoring is started on the connection.
|
||||||
|
It can be useful if a broker is under load with many connections being created concurrently.
|
||||||
|
Default value is 10000.
|
||||||
|
|
||||||
|
* useInactivityMonitor:
|
||||||
|
A value of false disables the InactivityMonitor completely and connections will never time out.
|
||||||
|
By default it is enabled. On broker side you don't neet set this. Instead you can set the
|
||||||
|
connection-ttl to -1.
|
||||||
|
|
||||||
|
* useKeepAlive:
|
||||||
|
Whether or not to send a KeepAliveInfo on an idle connection to prevent it from timing out.
|
||||||
|
Enabled by default. Disabling the keep alive will still make connections time out if no data
|
||||||
|
was received on the connection for the specified amount of time.
|
||||||
|
|
||||||
|
Note at the beginning the InactivityMonitor negotiates the appropriate maxInactivityDuration and
|
||||||
|
maxInactivityDurationInitalDelay. The shortest duration is taken for the connection.
|
||||||
|
|
||||||
|
More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
|
||||||
|
|
||||||
## MQTT
|
## MQTT
|
||||||
|
|
||||||
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically
|
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically
|
||||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -30,14 +32,20 @@ import javax.jms.XAConnection;
|
||||||
import javax.jms.XASession;
|
import javax.jms.XASession;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.ActiveMQSession;
|
import org.apache.activemq.ActiveMQSession;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -832,6 +840,45 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This test create a consumer on a connection to consume
|
||||||
|
* messages slowly, so the connection stay for a longer time
|
||||||
|
* than its configured TTL without any user data (messages)
|
||||||
|
* coming from broker side. It tests the working of
|
||||||
|
* KeepAlive mechanism without which the test will fail.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSendReceiveUsingTtl() throws Exception {
|
||||||
|
String brokerUri = "failover://tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.maxInactivityDuration=10000&wireFormat.maxInactivityDurationInitalDelay=5000";
|
||||||
|
ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory(brokerUri);
|
||||||
|
|
||||||
|
Connection sendConnection = testFactory.createConnection();
|
||||||
|
System.out.println("created send connection: " + sendConnection);
|
||||||
|
Connection receiveConnection = testFactory.createConnection();
|
||||||
|
System.out.println("created receive connection: " + receiveConnection);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final int nMsg = 20;
|
||||||
|
final long delay = 2L;
|
||||||
|
|
||||||
|
AsyncConsumer consumer = new AsyncConsumer(queueName, receiveConnection, Session.CLIENT_ACKNOWLEDGE, delay, nMsg);
|
||||||
|
|
||||||
|
Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = sendSession.createQueue(queueName);
|
||||||
|
|
||||||
|
MessageProducer producer = sendSession.createProducer(queue);
|
||||||
|
for (int i = 0; i < nMsg; i++) {
|
||||||
|
producer.send(sendSession.createTextMessage("testXX" + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.waitFor(nMsg * delay * 2);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
sendConnection.close();
|
||||||
|
receiveConnection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitCloseConsumerBefore() throws Exception {
|
public void testCommitCloseConsumerBefore() throws Exception {
|
||||||
testCommitCloseConsumer(true);
|
testCommitCloseConsumer(true);
|
||||||
|
@ -1080,4 +1127,77 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkQueueEmpty(String qName) {
|
||||||
|
PostOffice po = server.getPostOffice();
|
||||||
|
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString("jms.queue." + qName));
|
||||||
|
try {
|
||||||
|
//waiting for last ack to finish
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
assertEquals(0L, binding.getQueue().getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AsyncConsumer {
|
||||||
|
|
||||||
|
private List<Message> messages = new ArrayList<>();
|
||||||
|
private CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
private int nMsgs;
|
||||||
|
private String queueName;
|
||||||
|
|
||||||
|
private MessageConsumer consumer;
|
||||||
|
|
||||||
|
AsyncConsumer(String queueName,
|
||||||
|
Connection receiveConnection,
|
||||||
|
final int ackMode,
|
||||||
|
final long delay,
|
||||||
|
final int expectedMsgs) throws JMSException {
|
||||||
|
this.queueName = queueName;
|
||||||
|
this.nMsgs = expectedMsgs;
|
||||||
|
Session session = receiveConnection.createSession(false, ackMode);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
consumer = session.createConsumer(queue);
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
System.out.println("received : " + message);
|
||||||
|
|
||||||
|
messages.add(message);
|
||||||
|
|
||||||
|
if (messages.size() < expectedMsgs) {
|
||||||
|
//delay
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(delay);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
|
||||||
|
try {
|
||||||
|
message.acknowledge();
|
||||||
|
}
|
||||||
|
catch (JMSException e) {
|
||||||
|
System.err.println("Failed to acknowledge " + message);
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (messages.size() == expectedMsgs) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
receiveConnection.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitFor(long timeout) throws TimeoutException, InterruptedException, JMSException {
|
||||||
|
boolean result = latch.await(timeout, TimeUnit.SECONDS);
|
||||||
|
assertTrue(result);
|
||||||
|
//check queue empty
|
||||||
|
checkQueueEmpty(queueName);
|
||||||
|
//then check messages still the size and no dup.
|
||||||
|
assertEquals(nMsgs, messages.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue