ARTEMIS-1434: update handling of tick deadline values
- account for potential to be negative due to using nanoTime derived values - add some other edge case protections to avoid task ceasing in error
This commit is contained in:
parent
8160be0087
commit
1a077348e2
|
@ -387,13 +387,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
* */
|
||||
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
|
||||
long nextKeepAliveTime = handler.tick(true);
|
||||
if (nextKeepAliveTime > 0 && scheduledPool != null) {
|
||||
if (nextKeepAliveTime != 0 && scheduledPool != null) {
|
||||
scheduledPool.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (rescheduleAt > 0) {
|
||||
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
|
||||
long rescheduleAt = handler.tick(false);
|
||||
if (rescheduleAt != 0) {
|
||||
scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
|
||||
|
|
|
@ -606,7 +606,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long initialKeepAliveDeadline = protonTransport.tick(initialNow);
|
||||
if (initialKeepAliveDeadline > 0) {
|
||||
if (initialKeepAliveDeadline != 0) {
|
||||
|
||||
getScheduler().schedule(new Runnable() {
|
||||
|
||||
|
@ -617,15 +617,15 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
|||
LOG.debug("Client performing next idle check");
|
||||
// Using nano time since it is not related to the wall clock, which may change
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
long rescheduleAt = protonTransport.tick(now) - now;
|
||||
long deadline = protonTransport.tick(now);
|
||||
pumpToProtonTransport();
|
||||
if (protonTransport.isClosed()) {
|
||||
LOG.debug("Transport closed after inactivity check.");
|
||||
throw new InactivityIOException("Channel was inactive for to long");
|
||||
}
|
||||
|
||||
if (rescheduleAt > 0) {
|
||||
getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
|
||||
throw new InactivityIOException("Channel was inactive for too long");
|
||||
} else {
|
||||
if (deadline != 0) {
|
||||
getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
Loading…
Reference in New Issue