From 4f6326f4fb35867b6d83c624a947a4510d0f674f Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Fri, 15 Sep 2017 17:52:36 +0100 Subject: [PATCH] AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections (cherry picked from commit f82eccd2f504b59c2e98ba8273e28f4d7a2a8698) --- .../transport/amqp/protocol/AmqpConnection.java | 15 ++++++++++----- .../transport/amqp/client/AmqpConnection.java | 15 ++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 57b2502fe4..dec1bc9b3c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -251,12 +251,16 @@ public class AmqpConnection implements AmqpProtocolConverter { if (protonConnection.getLocalState() != EndpointState.CLOSED) { // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - rescheduleAt = protonTransport.tick(now) - now; + long deadline = protonTransport.tick(now); pumpProtonToSocket(); if (protonTransport.isClosed()) { - rescheduleAt = 0; LOG.debug("Transport closed after inactivity check."); - throw new InactivityIOException("Channel was inactive for to long"); + throw new InactivityIOException("Channel was inactive for too long"); + } else { + if(deadline != 0) { + // caller treats 0 as no-work, ensure value is at least 1 as there was a deadline + rescheduleAt = Math.max(deadline - now, 1); + } } } @@ -835,8 +839,9 @@ public class AmqpConnection implements AmqpProtocolConverter { // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long nextIdleCheck = protonTransport.tick(now); - if (nextIdleCheck > 0) { - long delay = nextIdleCheck - now; + if (nextIdleCheck != 0) { + // monitor treats <= 0 as no work, ensure value is at least 1 as there was a deadline + long delay = Math.max(nextIdleCheck - now, 1); LOG.trace("Connection keep-alive processing starts in: {}", delay); monitor.startKeepAliveTask(delay); } else { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 1f3fe090bc..3fe3ab6f7d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -587,7 +587,7 @@ public class AmqpConnection extends AmqpAbstractResource implements // Using nano time since it is not related to the wall clock, which may change long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long initialKeepAliveDeadline = protonTransport.tick(initialNow); - if (initialKeepAliveDeadline > 0) { + if (initialKeepAliveDeadline != 0) { getScheduler().schedule(new Runnable() { @@ -598,15 +598,16 @@ public class AmqpConnection extends AmqpAbstractResource implements LOG.debug("Client performing next idle check"); // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - long rescheduleAt = protonTransport.tick(now) - now; + long deadline = protonTransport.tick(now); + pumpToProtonTransport(); if (protonTransport.isClosed()) { LOG.debug("Transport closed after inactivity check."); - throw new InactivityIOException("Channel was inactive for to long"); - } - - if (rescheduleAt > 0) { - getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); + throw new InactivityIOException("Channel was inactive for too long"); + } else { + if(deadline != 0) { + getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS); + } } } } catch (Exception e) {