ARTEMIS-1283 Fix delay on drained response
On completion of drain the response is not flushed and the client can wait a few seconds before another broker task flushes the work. Flush the connection after updating the linked as being drained. Also perform the work with the connection lock held to prevent conccurent update of proton state.
This commit is contained in:
parent
3e48cd7787
commit
22b8076b71
|
@ -129,7 +129,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
plugSender.getSender().drained();
|
||||
plugSender.reportDrained();
|
||||
} finally {
|
||||
draining.set(false);
|
||||
}
|
||||
|
|
|
@ -775,4 +775,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return queue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update link state to reflect that the previous drain attempt has completed.
|
||||
*/
|
||||
public void reportDrained() {
|
||||
connection.lock();
|
||||
try {
|
||||
sender.drained();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -579,10 +579,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
forceDelivery(sequence, r);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
r.run();
|
||||
return;
|
||||
}
|
||||
}
|
||||
r.run();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue