AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections

This commit is contained in:
Robbie Gemmell 2017-09-15 17:52:36 +01:00
parent 9326ccabac
commit f82eccd2f5
2 changed files with 18 additions and 12 deletions

View File

@ -251,12 +251,16 @@ public class AmqpConnection implements AmqpProtocolConverter {
if (protonConnection.getLocalState() != EndpointState.CLOSED) {
// Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
rescheduleAt = protonTransport.tick(now) - now;
long deadline = protonTransport.tick(now);
pumpProtonToSocket();
if (protonTransport.isClosed()) {
rescheduleAt = 0;
LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for to long");
throw new InactivityIOException("Channel was inactive for too long");
} else {
if(deadline != 0) {
// caller treats 0 as no-work, ensure value is at least 1 as there was a deadline
rescheduleAt = Math.max(deadline - now, 1);
}
}
}
@ -835,8 +839,9 @@ public class AmqpConnection implements AmqpProtocolConverter {
// Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long nextIdleCheck = protonTransport.tick(now);
if (nextIdleCheck > 0) {
long delay = nextIdleCheck - now;
if (nextIdleCheck != 0) {
// monitor treats <= 0 as no work, ensure value is at least 1 as there was a deadline
long delay = Math.max(nextIdleCheck - now, 1);
LOG.trace("Connection keep-alive processing starts in: {}", delay);
monitor.startKeepAliveTask(delay);
} else {

View File

@ -587,7 +587,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
// Using nano time since it is not related to the wall clock, which may change
long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long initialKeepAliveDeadline = protonTransport.tick(initialNow);
if (initialKeepAliveDeadline > 0) {
if (initialKeepAliveDeadline != 0) {
getScheduler().schedule(new Runnable() {
@ -598,15 +598,16 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
LOG.debug("Client performing next idle check");
// Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long rescheduleAt = protonTransport.tick(now) - now;
long deadline = protonTransport.tick(now);
pumpToProtonTransport();
if (protonTransport.isClosed()) {
LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for to long");
}
if (rescheduleAt > 0) {
getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
throw new InactivityIOException("Channel was inactive for too long");
} else {
if(deadline != 0) {
getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);
}
}
}
} catch (Exception e) {