diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index aeb0f2b7f0..b66b802ffd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.advisory.AdvisorySupport; @@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -122,6 +124,8 @@ import org.apache.activemq.wireformat.WireFormat; */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { + private static final KeepAliveInfo PING = new KeepAliveInfo(); + private final OpenWireProtocolManager protocolManager; private boolean destroyed = false; @@ -167,6 +171,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private ServerSession internalSession; + private volatile long lastSent = -1; + private ConnectionEntry connectionEntry; + private boolean useKeepAlive; + private long maxInactivityDuration; + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, ActiveMQServer server, @@ -177,6 +186,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.server = server; this.protocolManager = openWireProtocolManager; this.wireFormat = wf; + this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); + this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); } // SecurityAuth implementation @@ -216,6 +227,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return info; } + //tells the connection that + //some bytes just sent + public void bufferSent() { + lastSent = System.currentTimeMillis(); + } + @Override public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { super.bufferReceived(connectionID, buffer); @@ -226,18 +243,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); - // TODO: the server should send packets to the client based on the requested times - - // the connection handles pings, negotiations directly. - // and delegate all other commands to manager. - if (command.getClass() == KeepAliveInfo.class) { - KeepAliveInfo info = (KeepAliveInfo) command; - info.setResponseRequired(false); - // if we don't respond to KeepAlive commands then the client will think the server is dead and timeout - // for some reason KeepAliveInfo.isResponseRequired() is always false - sendCommand(info); - } - else { + // ignore pings + if (command.getClass() != KeepAliveInfo.class) { Response response = null; try { @@ -345,16 +352,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } @Override - public boolean checkDataReceived() { - boolean res = dataReceived; - - dataReceived = false; - - return res; + public void flush() { + checkInactivity(); } - @Override - public void flush() { + private void checkInactivity() { + if (!this.useKeepAlive) { + return; + } + + long dur = System.currentTimeMillis() - lastSent; + if (dur >= this.maxInactivityDuration / 2) { + this.sendCommand(PING); + } } private void callFailureListeners(final ActiveMQException me) { @@ -390,6 +400,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se synchronized (sendLock) { getTransportConnection().write(buffer, false, false); } + bufferSent(); } catch (IOException e) { throw e; @@ -508,6 +519,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void shutdown(boolean fail) { + if (fail) { transportConnection.forceClose(); } @@ -521,6 +533,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (context == null || destroyed) { return; } + // Don't allow things to be added to the connection state while we // are shutting down. // is it necessary? even, do we need state at all? @@ -558,6 +571,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public void fail(ActiveMQException me, String message) { + if (me != null) { ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); } @@ -742,6 +756,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } + public void setConnectionEntry(ConnectionEntry connectionEntry) { + this.connectionEntry = connectionEntry; + } + + public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) { + this.useKeepAlive = useKeepAlive; + this.maxInactivityDuration = inactivityDuration; + + protocolManager.getScheduledPool().schedule(new Runnable() { + @Override + public void run() { + if (inactivityDuration >= 0) { + connectionEntry.ttl = inactivityDuration; + } + } + }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS); + checkInactivity(); + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override @@ -1025,6 +1058,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se wireFormat.renegotiateWireFormat(command); //throw back a brokerInfo here protocolManager.sendBrokerInfo(OpenWireConnection.this); + protocolManager.setUpInactivityParams(OpenWireConnection.this, command); return null; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index d8dd639986..8497970690 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -110,6 +111,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private boolean updateClusterClients = false; private boolean updateClusterClientsOnRemove = false; + //http://activemq.apache.org/activemq-inactivitymonitor.html + private long maxInactivityDuration = 30 * 1000L; + private long maxInactivityDurationInitalDelay = 10 * 1000L; + private boolean useKeepAlive = true; + private final OpenWireMessageConverter messageConverter; public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { @@ -217,8 +223,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf); owConn.sendHandshake(); - // TODO CLEBERT What is this constant here? we should get it from TTL initial pings - return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000); + //first we setup ttl to -1 + //then when negotiation, we handle real ttl and delay + ConnectionEntry entry = new ConnectionEntry(owConn, null, System.currentTimeMillis(), -1); + owConn.setConnectionEntry(entry); + return entry; } @Override @@ -475,6 +484,13 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl connection.dispatch(brokerInfo); } + public void setUpInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException { + long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration(); + long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay(); + boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive; + connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse); + } + /** * URI property */ @@ -523,4 +539,30 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl this.brokerName = name; } + public boolean isUseKeepAlive() { + return useKeepAlive; + } + + @SuppressWarnings("unused") + public void setUseKeepAlive(boolean useKeepAlive) { + this.useKeepAlive = useKeepAlive; + } + + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + + @SuppressWarnings("unused") + public long getMaxInactivityDurationInitalDelay() { + return maxInactivityDurationInitalDelay; + } + + @SuppressWarnings("unused") + public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { + this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; + } } diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 35d0550245..c5b32a1c75 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -136,6 +136,34 @@ Currently we support Apache ActiveMQ Artemis clients that using standard JMS API the future we will get more supports for some advanced, Apache ActiveMQ Artemis specific features into Apache ActiveMQ Artemis. +### Connection Monitoring + +OpenWire has a few paramters to control how each connection is monitored, they are: + +* maxInactivityDuration: +It specifies the time (milliseconds) after which the connection is closed by the broker if no data was received. +Default value is 30000. + +* maxInactivityDurationInitalDelay: +It specifies the maximum delay (milliseconds) before inactivity monitoring is started on the connection. +It can be useful if a broker is under load with many connections being created concurrently. +Default value is 10000. + +* useInactivityMonitor: +A value of false disables the InactivityMonitor completely and connections will never time out. +By default it is enabled. On broker side you don't neet set this. Instead you can set the +connection-ttl to -1. + +* useKeepAlive: +Whether or not to send a KeepAliveInfo on an idle connection to prevent it from timing out. +Enabled by default. Disabling the keep alive will still make connections time out if no data +was received on the connection for the specified amount of time. + +Note at the beginning the InactivityMonitor negotiates the appropriate maxInactivityDuration and +maxInactivityDurationInitalDelay. The shortest duration is taken for the connection. + +More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html). + ## MQTT MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 82d82425c2..3c2d847a8f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -30,14 +32,20 @@ import javax.jms.XAConnection; import javax.jms.XASession; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -832,6 +840,45 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + /* + * This test create a consumer on a connection to consume + * messages slowly, so the connection stay for a longer time + * than its configured TTL without any user data (messages) + * coming from broker side. It tests the working of + * KeepAlive mechanism without which the test will fail. + */ + @Test + public void testSendReceiveUsingTtl() throws Exception { + String brokerUri = "failover://tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.maxInactivityDuration=10000&wireFormat.maxInactivityDurationInitalDelay=5000"; + ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory(brokerUri); + + Connection sendConnection = testFactory.createConnection(); + System.out.println("created send connection: " + sendConnection); + Connection receiveConnection = testFactory.createConnection(); + System.out.println("created receive connection: " + receiveConnection); + + try { + final int nMsg = 20; + final long delay = 2L; + + AsyncConsumer consumer = new AsyncConsumer(queueName, receiveConnection, Session.CLIENT_ACKNOWLEDGE, delay, nMsg); + + Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = sendSession.createQueue(queueName); + + MessageProducer producer = sendSession.createProducer(queue); + for (int i = 0; i < nMsg; i++) { + producer.send(sendSession.createTextMessage("testXX" + i)); + } + + consumer.waitFor(nMsg * delay * 2); + } + finally { + sendConnection.close(); + receiveConnection.close(); + } + } + @Test public void testCommitCloseConsumerBefore() throws Exception { testCommitCloseConsumer(true); @@ -1080,4 +1127,77 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } + private void checkQueueEmpty(String qName) { + PostOffice po = server.getPostOffice(); + LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString("jms.queue." + qName)); + try { + //waiting for last ack to finish + Thread.sleep(1000); + } + catch (InterruptedException e) { + } + assertEquals(0L, binding.getQueue().getMessageCount()); + } + + private class AsyncConsumer { + + private List messages = new ArrayList<>(); + private CountDownLatch latch = new CountDownLatch(1); + private int nMsgs; + private String queueName; + + private MessageConsumer consumer; + + AsyncConsumer(String queueName, + Connection receiveConnection, + final int ackMode, + final long delay, + final int expectedMsgs) throws JMSException { + this.queueName = queueName; + this.nMsgs = expectedMsgs; + Session session = receiveConnection.createSession(false, ackMode); + Queue queue = session.createQueue(queueName); + consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + System.out.println("received : " + message); + + messages.add(message); + + if (messages.size() < expectedMsgs) { + //delay + try { + TimeUnit.SECONDS.sleep(delay); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (ackMode == Session.CLIENT_ACKNOWLEDGE) { + try { + message.acknowledge(); + } + catch (JMSException e) { + System.err.println("Failed to acknowledge " + message); + e.printStackTrace(); + } + } + if (messages.size() == expectedMsgs) { + latch.countDown(); + } + } + }); + receiveConnection.start(); + } + + public void waitFor(long timeout) throws TimeoutException, InterruptedException, JMSException { + boolean result = latch.await(timeout, TimeUnit.SECONDS); + assertTrue(result); + //check queue empty + checkQueueEmpty(queueName); + //then check messages still the size and no dup. + assertEquals(nMsgs, messages.size()); + } + } }