From a0aa3583bc7f0a787471d894370809749cf2cbee Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 30 Oct 2015 16:28:53 +0000 Subject: [PATCH] AMQ-6031: use System.nanoTime() when deriving time to tick the transport with for idle-timeout handling (cherry picked from commit 037f91d61e0e632bc6b3b32c91b8cf58d8b673ae) --- .../transport/amqp/protocol/AmqpConnection.java | 14 ++++++++++---- .../transport/amqp/client/AmqpConnection.java | 12 ++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index c04a61f32e..5eb7033159 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.InvalidClientIDException; @@ -245,7 +246,9 @@ public class AmqpConnection implements AmqpProtocolConverter { LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); if (protonConnection.getLocalState() != EndpointState.CLOSED) { - rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); + // Using nano time since it is not related to the wall clock, which may change + long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + rescheduleAt = protonTransport.tick(now) - now; pumpProtonToSocket(); if (protonTransport.isClosed()) { rescheduleAt = 0; @@ -813,10 +816,13 @@ public class AmqpConnection implements AmqpProtocolConverter { // If either end has idle timeout requirements then the tick method // will give us a deadline on the next time we need to tick() in order // to meet those obligations. - long nextIdleCheck = protonTransport.tick(System.currentTimeMillis()); + // Using nano time since it is not related to the wall clock, which may change + long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long nextIdleCheck = protonTransport.tick(now); if (nextIdleCheck > 0) { - LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck)); - monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis()); + long delay = nextIdleCheck - now; + LOG.trace("Connection keep-alive processing starts in: {}", delay); + monitor.startKeepAliveTask(delay); } else { LOG.trace("Connection does not require keep-alive processing"); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 9c13e74334..d4cf4f2142 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -472,8 +472,10 @@ public class AmqpConnection extends AmqpAbstractResource implements if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { if (!isIdleProcessingDisabled()) { - long nextKeepAliveTime = protonTransport.tick(System.currentTimeMillis()); - if (nextKeepAliveTime > 0) { + // Using nano time since it is not related to the wall clock, which may change + long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long initialKeepAliveDeadline = protonTransport.tick(initialNow); + if (initialKeepAliveDeadline > 0) { getScheduler().schedule(new Runnable() { @@ -482,7 +484,9 @@ public class AmqpConnection extends AmqpAbstractResource implements try { if (getEndpoint().getLocalState() != EndpointState.CLOSED) { LOG.debug("Client performing next idle check"); - long rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); + // Using nano time since it is not related to the wall clock, which may change + long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long rescheduleAt = protonTransport.tick(now) - now; pumpToProtonTransport(); if (protonTransport.isClosed()) { LOG.debug("Transport closed after inactivity check."); @@ -498,7 +502,7 @@ public class AmqpConnection extends AmqpAbstractResource implements fireClientException(e); } } - }, nextKeepAliveTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + }, initialKeepAliveDeadline - initialNow, TimeUnit.MILLISECONDS); } } super.doOpenCompletion();