ARTEMIS-4108 Improving flush on large message fix

This commit is contained in:
Clebert Suconic 2022-12-06 16:07:09 -05:00
parent eb11b044af
commit 33567fca8d
1 changed files with 21 additions and 16 deletions

View File

@ -191,28 +191,33 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// If the draining is already running, then don't do anything // If the draining is already running, then don't do anything
if (draining.compareAndSet(false, true)) { if (draining.compareAndSet(false, true)) {
final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext(); final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
serverConsumer.forceDelivery(1, new Runnable() { flushDrain(serverConsumer, plugSender);
@Override
public void run() {
try {
connection.runNow(() -> {
if (pendingLargeMessage != null) {
afterLargeMessage = () -> drained(plugSender);
} else {
drained(plugSender);
}
});
} finally {
draining.set(false);
}
}
});
} }
} else { } else {
serverConsumer.receiveCredits(-1); serverConsumer.receiveCredits(-1);
} }
} }
private void flushDrain(ServerConsumerImpl serverConsumer, ProtonServerSenderContext plugSender) {
serverConsumer.forceDelivery(1, new Runnable() {
@Override
public void run() {
try {
connection.runNow(() -> {
if (pendingLargeMessage != null) {
// retry the flush after the large message is done
afterLargeMessage = () -> flushDrain(serverConsumer, plugSender);
} else {
drained(plugSender);
}
});
} finally {
draining.set(false);
}
}
});
}
private void drained(ProtonServerSenderContext sender) { private void drained(ProtonServerSenderContext sender) {
sender.reportDrained(); sender.reportDrained();
setupCredit(); setupCredit();