mirror of
https://github.com/apache/activemq.git
synced 2025-02-07 02:29:21 +00:00
AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections
(cherry picked from commit f82eccd2f504b59c2e98ba8273e28f4d7a2a8698)
This commit is contained in:
parent
e1ac826ad0
commit
4f6326f4fb
@ -251,12 +251,16 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
||||
if (protonConnection.getLocalState() != EndpointState.CLOSED) {
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
rescheduleAt = protonTransport.tick(now) - now;
|
||||
long deadline = protonTransport.tick(now);
|
||||
pumpProtonToSocket();
|
||||
if (protonTransport.isClosed()) {
|
||||
rescheduleAt = 0;
|
||||
LOG.debug("Transport closed after inactivity check.");
|
||||
throw new InactivityIOException("Channel was inactive for to long");
|
||||
throw new InactivityIOException("Channel was inactive for too long");
|
||||
} else {
|
||||
if(deadline != 0) {
|
||||
// caller treats 0 as no-work, ensure value is at least 1 as there was a deadline
|
||||
rescheduleAt = Math.max(deadline - now, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -835,8 +839,9 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long nextIdleCheck = protonTransport.tick(now);
|
||||
if (nextIdleCheck > 0) {
|
||||
long delay = nextIdleCheck - now;
|
||||
if (nextIdleCheck != 0) {
|
||||
// monitor treats <= 0 as no work, ensure value is at least 1 as there was a deadline
|
||||
long delay = Math.max(nextIdleCheck - now, 1);
|
||||
LOG.trace("Connection keep-alive processing starts in: {}", delay);
|
||||
monitor.startKeepAliveTask(delay);
|
||||
} else {
|
||||
|
@ -587,7 +587,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long initialKeepAliveDeadline = protonTransport.tick(initialNow);
|
||||
if (initialKeepAliveDeadline > 0) {
|
||||
if (initialKeepAliveDeadline != 0) {
|
||||
|
||||
getScheduler().schedule(new Runnable() {
|
||||
|
||||
@ -598,15 +598,16 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||
LOG.debug("Client performing next idle check");
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long rescheduleAt = protonTransport.tick(now) - now;
|
||||
long deadline = protonTransport.tick(now);
|
||||
|
||||
pumpToProtonTransport();
|
||||
if (protonTransport.isClosed()) {
|
||||
LOG.debug("Transport closed after inactivity check.");
|
||||
throw new InactivityIOException("Channel was inactive for to long");
|
||||
}
|
||||
|
||||
if (rescheduleAt > 0) {
|
||||
getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
|
||||
throw new InactivityIOException("Channel was inactive for too long");
|
||||
} else {
|
||||
if(deadline != 0) {
|
||||
getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user