ARTEMIS-4314 Small Tweak: using executor directly if no delay

This commit is contained in:
Clebert Suconic 2023-06-16 16:13:49 -04:00 committed by clebertsuconic
parent 9b5dbf4265
commit c6a82ff95e
1 changed files with 21 additions and 15 deletions

View File

@ -208,23 +208,29 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
}
private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) {
scheduledExecutorService.schedule(() -> {
// use queue executor to sync on message count metric
handle.getExecutor().execute(() -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
Runnable runnable = () -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
});
}, delay, TimeUnit.SECONDS);
}
};
if (delay == 0) { // if delay==0 just use the executor directly
handle.getExecutor().execute(runnable);
} else {
scheduledExecutorService.schedule(() -> {
handle.getExecutor().execute(runnable);
}, delay, TimeUnit.SECONDS);
}
}
private void flow(int creditWindow) {