This closes #740

This commit is contained in:
Clebert Suconic 2016-09-02 21:39:44 -04:00
commit 15e4799d4d
4 changed files with 246 additions and 22 deletions

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.advisory.AdvisorySupport;
@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -122,6 +124,8 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
private static final KeepAliveInfo PING = new KeepAliveInfo();
private final OpenWireProtocolManager protocolManager;
private boolean destroyed = false;
@ -167,6 +171,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/
private ServerSession internalSession;
private volatile long lastSent = -1;
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
private long maxInactivityDuration;
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
ActiveMQServer server,
@ -177,6 +186,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.server = server;
this.protocolManager = openWireProtocolManager;
this.wireFormat = wf;
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
}
// SecurityAuth implementation
@ -216,6 +227,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return info;
}
//tells the connection that
//some bytes just sent
public void bufferSent() {
lastSent = System.currentTimeMillis();
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
super.bufferReceived(connectionID, buffer);
@ -226,18 +243,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
// TODO: the server should send packets to the client based on the requested times
// the connection handles pings, negotiations directly.
// and delegate all other commands to manager.
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
info.setResponseRequired(false);
// if we don't respond to KeepAlive commands then the client will think the server is dead and timeout
// for some reason KeepAliveInfo.isResponseRequired() is always false
sendCommand(info);
}
else {
// ignore pings
if (command.getClass() != KeepAliveInfo.class) {
Response response = null;
try {
@ -345,16 +352,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
@Override
public boolean checkDataReceived() {
boolean res = dataReceived;
dataReceived = false;
return res;
public void flush() {
checkInactivity();
}
@Override
public void flush() {
private void checkInactivity() {
if (!this.useKeepAlive) {
return;
}
long dur = System.currentTimeMillis() - lastSent;
if (dur >= this.maxInactivityDuration / 2) {
this.sendCommand(PING);
}
}
private void callFailureListeners(final ActiveMQException me) {
@ -390,6 +400,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
synchronized (sendLock) {
getTransportConnection().write(buffer, false, false);
}
bufferSent();
}
catch (IOException e) {
throw e;
@ -508,6 +519,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void shutdown(boolean fail) {
if (fail) {
transportConnection.forceClose();
}
@ -521,6 +533,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
if (context == null || destroyed) {
return;
}
// Don't allow things to be added to the connection state while we
// are shutting down.
// is it necessary? even, do we need state at all?
@ -558,6 +571,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public void fail(ActiveMQException me, String message) {
if (me != null) {
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
}
@ -742,6 +756,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
public void setConnectionEntry(ConnectionEntry connectionEntry) {
this.connectionEntry = connectionEntry;
}
public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) {
this.useKeepAlive = useKeepAlive;
this.maxInactivityDuration = inactivityDuration;
protocolManager.getScheduledPool().schedule(new Runnable() {
@Override
public void run() {
if (inactivityDuration >= 0) {
connectionEntry.ttl = inactivityDuration;
}
}
}, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
checkInactivity();
}
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override
@ -1025,6 +1058,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
wireFormat.renegotiateWireFormat(command);
//throw back a brokerInfo here
protocolManager.sendBrokerInfo(OpenWireConnection.this);
protocolManager.setUpInactivityParams(OpenWireConnection.this, command);
return null;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -110,6 +111,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private boolean updateClusterClients = false;
private boolean updateClusterClientsOnRemove = false;
//http://activemq.apache.org/activemq-inactivitymonitor.html
private long maxInactivityDuration = 30 * 1000L;
private long maxInactivityDurationInitalDelay = 10 * 1000L;
private boolean useKeepAlive = true;
private final OpenWireMessageConverter messageConverter;
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
@ -217,8 +223,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
owConn.sendHandshake();
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
//first we setup ttl to -1
//then when negotiation, we handle real ttl and delay
ConnectionEntry entry = new ConnectionEntry(owConn, null, System.currentTimeMillis(), -1);
owConn.setConnectionEntry(entry);
return entry;
}
@Override
@ -475,6 +484,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
connection.dispatch(brokerInfo);
}
public void setUpInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException {
long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration();
long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay();
boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive;
connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse);
}
/**
* URI property
*/
@ -523,4 +539,30 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
this.brokerName = name;
}
public boolean isUseKeepAlive() {
return useKeepAlive;
}
@SuppressWarnings("unused")
public void setUseKeepAlive(boolean useKeepAlive) {
this.useKeepAlive = useKeepAlive;
}
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
@SuppressWarnings("unused")
public long getMaxInactivityDurationInitalDelay() {
return maxInactivityDurationInitalDelay;
}
@SuppressWarnings("unused")
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
}
}

View File

@ -136,6 +136,34 @@ Currently we support Apache ActiveMQ Artemis clients that using standard JMS API
the future we will get more supports for some advanced, Apache ActiveMQ Artemis
specific features into Apache ActiveMQ Artemis.
### Connection Monitoring
OpenWire has a few paramters to control how each connection is monitored, they are:
* maxInactivityDuration:
It specifies the time (milliseconds) after which the connection is closed by the broker if no data was received.
Default value is 30000.
* maxInactivityDurationInitalDelay:
It specifies the maximum delay (milliseconds) before inactivity monitoring is started on the connection.
It can be useful if a broker is under load with many connections being created concurrently.
Default value is 10000.
* useInactivityMonitor:
A value of false disables the InactivityMonitor completely and connections will never time out.
By default it is enabled. On broker side you don't neet set this. Instead you can set the
connection-ttl to -1.
* useKeepAlive:
Whether or not to send a KeepAliveInfo on an idle connection to prevent it from timing out.
Enabled by default. Disabling the keep alive will still make connections time out if no data
was received on the connection for the specified amount of time.
Note at the beginning the InactivityMonitor negotiates the appropriate maxInactivityDuration and
maxInactivityDurationInitalDelay. The shortest duration is taken for the connection.
More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
## MQTT
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -30,14 +32,20 @@ import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -832,6 +840,45 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
}
/*
* This test create a consumer on a connection to consume
* messages slowly, so the connection stay for a longer time
* than its configured TTL without any user data (messages)
* coming from broker side. It tests the working of
* KeepAlive mechanism without which the test will fail.
*/
@Test
public void testSendReceiveUsingTtl() throws Exception {
String brokerUri = "failover://tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.maxInactivityDuration=10000&wireFormat.maxInactivityDurationInitalDelay=5000";
ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory(brokerUri);
Connection sendConnection = testFactory.createConnection();
System.out.println("created send connection: " + sendConnection);
Connection receiveConnection = testFactory.createConnection();
System.out.println("created receive connection: " + receiveConnection);
try {
final int nMsg = 20;
final long delay = 2L;
AsyncConsumer consumer = new AsyncConsumer(queueName, receiveConnection, Session.CLIENT_ACKNOWLEDGE, delay, nMsg);
Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sendSession.createQueue(queueName);
MessageProducer producer = sendSession.createProducer(queue);
for (int i = 0; i < nMsg; i++) {
producer.send(sendSession.createTextMessage("testXX" + i));
}
consumer.waitFor(nMsg * delay * 2);
}
finally {
sendConnection.close();
receiveConnection.close();
}
}
@Test
public void testCommitCloseConsumerBefore() throws Exception {
testCommitCloseConsumer(true);
@ -1080,4 +1127,77 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString("jms.queue." + qName));
try {
//waiting for last ack to finish
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
assertEquals(0L, binding.getQueue().getMessageCount());
}
private class AsyncConsumer {
private List<Message> messages = new ArrayList<>();
private CountDownLatch latch = new CountDownLatch(1);
private int nMsgs;
private String queueName;
private MessageConsumer consumer;
AsyncConsumer(String queueName,
Connection receiveConnection,
final int ackMode,
final long delay,
final int expectedMsgs) throws JMSException {
this.queueName = queueName;
this.nMsgs = expectedMsgs;
Session session = receiveConnection.createSession(false, ackMode);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("received : " + message);
messages.add(message);
if (messages.size() < expectedMsgs) {
//delay
try {
TimeUnit.SECONDS.sleep(delay);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
try {
message.acknowledge();
}
catch (JMSException e) {
System.err.println("Failed to acknowledge " + message);
e.printStackTrace();
}
}
if (messages.size() == expectedMsgs) {
latch.countDown();
}
}
});
receiveConnection.start();
}
public void waitFor(long timeout) throws TimeoutException, InterruptedException, JMSException {
boolean result = latch.await(timeout, TimeUnit.SECONDS);
assertTrue(result);
//check queue empty
checkQueueEmpty(queueName);
//then check messages still the size and no dup.
assertEquals(nMsgs, messages.size());
}
}
}