AMQ-6031: use System.nanoTime() when deriving time to tick the transport with for idle-timeout handling

(cherry picked from commit 037f91d61e)
This commit is contained in:
Robert Gemmell 2015-10-30 16:28:53 +00:00
parent 547914cabc
commit a0aa3583bc
2 changed files with 18 additions and 8 deletions

View File

@ -39,6 +39,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
@ -245,7 +246,9 @@ public class AmqpConnection implements AmqpProtocolConverter {
LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
if (protonConnection.getLocalState() != EndpointState.CLOSED) { if (protonConnection.getLocalState() != EndpointState.CLOSED) {
rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); // Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
rescheduleAt = protonTransport.tick(now) - now;
pumpProtonToSocket(); pumpProtonToSocket();
if (protonTransport.isClosed()) { if (protonTransport.isClosed()) {
rescheduleAt = 0; rescheduleAt = 0;
@ -813,10 +816,13 @@ public class AmqpConnection implements AmqpProtocolConverter {
// If either end has idle timeout requirements then the tick method // If either end has idle timeout requirements then the tick method
// will give us a deadline on the next time we need to tick() in order // will give us a deadline on the next time we need to tick() in order
// to meet those obligations. // to meet those obligations.
long nextIdleCheck = protonTransport.tick(System.currentTimeMillis()); // Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long nextIdleCheck = protonTransport.tick(now);
if (nextIdleCheck > 0) { if (nextIdleCheck > 0) {
LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck)); long delay = nextIdleCheck - now;
monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis()); LOG.trace("Connection keep-alive processing starts in: {}", delay);
monitor.startKeepAliveTask(delay);
} else { } else {
LOG.trace("Connection does not require keep-alive processing"); LOG.trace("Connection does not require keep-alive processing");
} }

View File

@ -472,8 +472,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
if (!isIdleProcessingDisabled()) { if (!isIdleProcessingDisabled()) {
long nextKeepAliveTime = protonTransport.tick(System.currentTimeMillis()); // Using nano time since it is not related to the wall clock, which may change
if (nextKeepAliveTime > 0) { long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long initialKeepAliveDeadline = protonTransport.tick(initialNow);
if (initialKeepAliveDeadline > 0) {
getScheduler().schedule(new Runnable() { getScheduler().schedule(new Runnable() {
@ -482,7 +484,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
try { try {
if (getEndpoint().getLocalState() != EndpointState.CLOSED) { if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
LOG.debug("Client performing next idle check"); LOG.debug("Client performing next idle check");
long rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); // Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long rescheduleAt = protonTransport.tick(now) - now;
pumpToProtonTransport(); pumpToProtonTransport();
if (protonTransport.isClosed()) { if (protonTransport.isClosed()) {
LOG.debug("Transport closed after inactivity check."); LOG.debug("Transport closed after inactivity check.");
@ -498,7 +502,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
fireClientException(e); fireClientException(e);
} }
} }
}, nextKeepAliveTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }, initialKeepAliveDeadline - initialNow, TimeUnit.MILLISECONDS);
} }
} }
super.doOpenCompletion(); super.doOpenCompletion();