Revert "ARTEMIS-4314 Small Tweak: using executor directly if no delay"

This reverts commit c6a82ff95e.
This commit is contained in:
Gary Tully 2023-06-22 15:40:45 +01:00
parent a6fea084ed
commit a76e9f81d2
1 changed files with 15 additions and 21 deletions

View File

@ -208,29 +208,23 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
}
private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) {
Runnable runnable = () -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
scheduledExecutorService.schedule(() -> {
// use queue executor to sync on message count metric
handle.getExecutor().execute(() -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
}
};
if (delay == 0) { // if delay==0 just use the executor directly
handle.getExecutor().execute(runnable);
} else {
scheduledExecutorService.schedule(() -> {
handle.getExecutor().execute(runnable);
}, delay, TimeUnit.SECONDS);
}
});
}, delay, TimeUnit.SECONDS);
}
private void flow(int creditWindow) {