mirror of https://github.com/apache/activemq.git
AMQ-6031: use System.nanoTime() when deriving time to tick the transport with for idle-timeout handling
This commit is contained in:
parent
c5506f5b17
commit
037f91d61e
|
@ -39,6 +39,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.InvalidClientIDException;
|
||||
|
@ -245,7 +246,9 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
|||
LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
|
||||
|
||||
if (protonConnection.getLocalState() != EndpointState.CLOSED) {
|
||||
rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis();
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
rescheduleAt = protonTransport.tick(now) - now;
|
||||
pumpProtonToSocket();
|
||||
if (protonTransport.isClosed()) {
|
||||
rescheduleAt = 0;
|
||||
|
@ -813,10 +816,13 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
|||
// If either end has idle timeout requirements then the tick method
|
||||
// will give us a deadline on the next time we need to tick() in order
|
||||
// to meet those obligations.
|
||||
long nextIdleCheck = protonTransport.tick(System.currentTimeMillis());
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long nextIdleCheck = protonTransport.tick(now);
|
||||
if (nextIdleCheck > 0) {
|
||||
LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck));
|
||||
monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis());
|
||||
long delay = nextIdleCheck - now;
|
||||
LOG.trace("Connection keep-alive processing starts in: {}", delay);
|
||||
monitor.startKeepAliveTask(delay);
|
||||
} else {
|
||||
LOG.trace("Connection does not require keep-alive processing");
|
||||
}
|
||||
|
|
|
@ -472,8 +472,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
|
||||
if (!isIdleProcessingDisabled()) {
|
||||
long nextKeepAliveTime = protonTransport.tick(System.currentTimeMillis());
|
||||
if (nextKeepAliveTime > 0) {
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long initialKeepAliveDeadline = protonTransport.tick(initialNow);
|
||||
if (initialKeepAliveDeadline > 0) {
|
||||
|
||||
getScheduler().schedule(new Runnable() {
|
||||
|
||||
|
@ -482,7 +484,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
try {
|
||||
if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
|
||||
LOG.debug("Client performing next idle check");
|
||||
long rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis();
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long rescheduleAt = protonTransport.tick(now) - now;
|
||||
pumpToProtonTransport();
|
||||
if (protonTransport.isClosed()) {
|
||||
LOG.debug("Transport closed after inactivity check.");
|
||||
|
@ -498,7 +502,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
fireClientException(e);
|
||||
}
|
||||
}
|
||||
}, nextKeepAliveTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||||
}, initialKeepAliveDeadline - initialNow, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
super.doOpenCompletion();
|
||||
|
|
Loading…
Reference in New Issue