This commit is contained in:
Timothy Bish 2017-09-22 11:55:25 -04:00
commit b893042d8d
2 changed files with 11 additions and 11 deletions

View File

@ -387,13 +387,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
* */ * */
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true); long nextKeepAliveTime = handler.tick(true);
if (nextKeepAliveTime > 0 && scheduledPool != null) { if (nextKeepAliveTime != 0 && scheduledPool != null) {
scheduledPool.schedule(new Runnable() { scheduledPool.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); long rescheduleAt = handler.tick(false);
if (rescheduleAt > 0) { if (rescheduleAt != 0) {
scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
} }
} }
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);

View File

@ -606,7 +606,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
// Using nano time since it is not related to the wall clock, which may change // Using nano time since it is not related to the wall clock, which may change
long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long initialKeepAliveDeadline = protonTransport.tick(initialNow); long initialKeepAliveDeadline = protonTransport.tick(initialNow);
if (initialKeepAliveDeadline > 0) { if (initialKeepAliveDeadline != 0) {
getScheduler().schedule(new Runnable() { getScheduler().schedule(new Runnable() {
@ -617,15 +617,15 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
LOG.debug("Client performing next idle check"); LOG.debug("Client performing next idle check");
// Using nano time since it is not related to the wall clock, which may change // Using nano time since it is not related to the wall clock, which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long rescheduleAt = protonTransport.tick(now) - now; long deadline = protonTransport.tick(now);
pumpToProtonTransport(); pumpToProtonTransport();
if (protonTransport.isClosed()) { if (protonTransport.isClosed()) {
LOG.debug("Transport closed after inactivity check."); LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for to long"); throw new InactivityIOException("Channel was inactive for too long");
} else {
if (deadline != 0) {
getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);
} }
if (rescheduleAt > 0) {
getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
} }
} }
} catch (Exception e) { } catch (Exception e) {