#11932 make succeeded and failed in ICB final + introduce onSuccess

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2024-06-24 14:38:28 +02:00
parent 3091012393
commit 6d0bddc07a
No known key found for this signature in database
GPG Key ID: 3D146A4A1C58367E
18 changed files with 49 additions and 91 deletions

View File

@ -347,17 +347,10 @@ public class ContentDocs
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
// After every successful write, release the chunk. // After every successful write, release the chunk.
chunk.release(); chunk.release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
} }
@Override @Override

View File

@ -347,17 +347,10 @@ public class ContentDocs
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
// After every successful write, release the chunk. // After every successful write, release the chunk.
chunk.release(); chunk.release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
} }
@Override @Override

View File

@ -543,7 +543,7 @@ public abstract class HttpSender
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
boolean proceed = true; boolean proceed = true;
if (committed) if (committed)
@ -588,8 +588,6 @@ public abstract class HttpSender
// There was some concurrent error, terminate. // There was some concurrent error, terminate.
complete = true; complete = true;
} }
super.succeeded();
} }
@Override @Override

View File

@ -235,17 +235,9 @@ public class HttpSenderOverHTTP extends HttpSender
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
release(); release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
release();
super.failed(x);
} }
@Override @Override
@ -259,6 +251,7 @@ public class HttpSenderOverHTTP extends HttpSender
protected void onCompleteFailure(Throwable cause) protected void onCompleteFailure(Throwable cause)
{ {
super.onCompleteFailure(cause); super.onCompleteFailure(cause);
release();
callback.failed(cause); callback.failed(cause);
} }

View File

@ -101,12 +101,11 @@ public class Flusher
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (active != null) if (active != null)
active.succeeded(); active.succeeded();
active = null; active = null;
super.succeeded();
} }
@Override @Override

View File

@ -294,7 +294,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Written {} buffers - entries processed/pending {}/{}: {}/{}", LOG.debug("Written {} buffers - entries processed/pending {}/{}: {}/{}",
@ -304,7 +304,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
processedEntries, processedEntries,
pendingEntries); pendingEntries);
finish(); finish();
super.succeeded();
} }
private void finish() private void finish()

View File

@ -515,17 +515,15 @@ public class RawHTTP2ProxyTest
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
frameInfo.callback.succeeded(); frameInfo.callback.succeeded();
super.succeeded();
} }
@Override @Override
public void failed(Throwable failure) protected void onCompleteFailure(Throwable cause)
{ {
frameInfo.callback.failed(failure); frameInfo.callback.failed(cause);
super.failed(failure);
} }
@Override @Override
@ -671,17 +669,15 @@ public class RawHTTP2ProxyTest
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
frameInfo.callback.succeeded(); frameInfo.callback.succeeded();
super.succeeded();
} }
@Override @Override
public void failed(Throwable failure) protected void onCompleteFailure(Throwable cause)
{ {
frameInfo.callback.failed(failure); frameInfo.callback.failed(cause);
super.failed(failure);
} }
private void offer(Stream stream, Frame frame, Callback callback) private void offer(Stream stream, Frame frame, Callback callback)

View File

@ -108,7 +108,7 @@ public class ControlFlusher extends IteratingCallback
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entries, this); LOG.debug("succeeded to write {} on {}", entries, this);
@ -119,8 +119,6 @@ public class ControlFlusher extends IteratingCallback
entries.clear(); entries.clear();
invocationType = InvocationType.NON_BLOCKING; invocationType = InvocationType.NON_BLOCKING;
super.succeeded();
} }
@Override @Override

View File

@ -104,14 +104,12 @@ public class InstructionFlusher extends IteratingCallback
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this); LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this);
accumulator.release(); accumulator.release();
super.succeeded();
} }
@Override @Override

View File

@ -89,7 +89,7 @@ public class MessageFlusher extends IteratingCallback
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this); LOG.debug("succeeded to write {} on {}", entry, this);
@ -98,19 +98,17 @@ public class MessageFlusher extends IteratingCallback
entry.callback.succeeded(); entry.callback.succeeded();
entry = null; entry = null;
super.succeeded();
} }
@Override @Override
public void failed(Throwable x) protected void onCompleteFailure(Throwable cause)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", entry, this, x); LOG.debug("failed to write {} on {}", entry, this, cause);
accumulator.release(); accumulator.release();
entry.callback.failed(x); entry.callback.failed(cause);
entry = null; entry = null;
// Continue the iteration. // Continue the iteration.

View File

@ -287,13 +287,12 @@ public class BufferedContentSink implements Content.Sink
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
_buffer = null; _buffer = null;
Callback callback = _callback; Callback callback = _callback;
_callback = null; _callback = null;
callback.succeeded(); callback.succeeded();
super.succeeded();
} }
@Override @Override

View File

@ -372,17 +372,9 @@ public abstract class QuicConnection extends AbstractConnection
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
entry.callback.succeeded(); entry.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
entry.callback.failed(x);
super.failed(x);
} }
@Override @Override
@ -394,10 +386,11 @@ public abstract class QuicConnection extends AbstractConnection
@Override @Override
protected void onCompleteFailure(Throwable cause) protected void onCompleteFailure(Throwable cause)
{ {
entry.callback.failed(cause);
QuicConnection.this.close(); QuicConnection.this.close();
} }
private class Entry private static class Entry
{ {
private final Callback callback; private final Callback callback;
private final SocketAddress address; private final SocketAddress address;

View File

@ -521,12 +521,11 @@ public abstract class QuicSession extends ContainerLifeCycle
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("written cipher bytes on {}", QuicSession.this); LOG.debug("written cipher bytes on {}", QuicSession.this);
cipherBuffer.release(); cipherBuffer.release();
super.succeeded();
} }
@Override @Override

View File

@ -760,17 +760,11 @@ public class ConnectHandler extends Handler.Wrapper
} }
@Override @Override
public void succeeded() protected void onSuccess()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this); LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this);
buffer.release(); buffer.release();
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
} }
@Override @Override

View File

@ -237,7 +237,8 @@ public class CustomTransportTest
channels.put(channel.id, channel); channels.put(channel.id, channel);
// Register for read interest with the EndPoint. // Register for read interest with the EndPoint.
endPoint.fillInterested(new EndPointToChannelCallback(channel)); EndPointToChannelCallback endPointToChannelCallback = new EndPointToChannelCallback(channel);
endPoint.fillInterested(Callback.from(endPointToChannelCallback::iterate));
} }
// Called when there data to read from the Gateway on the given Channel. // Called when there data to read from the Gateway on the given Channel.
@ -322,18 +323,10 @@ public class CustomTransportTest
endPoint.fillInterested(this); endPoint.fillInterested(this);
return Action.IDLE; return Action.IDLE;
} }
channel.write(this, buffer); channel.write(Callback.from(this::iterate), buffer);
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@Override
public void succeeded()
{
// There is data to read from the EndPoint.
// Iterate to read it and send it to the Gateway.
iterate();
}
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {

View File

@ -167,6 +167,13 @@ public abstract class IteratingCallback implements Callback
*/ */
protected abstract Action process() throws Throwable; protected abstract Action process() throws Throwable;
/**
* Invoked when one task has completed successfully.
*/
protected void onSuccess()
{
}
/** /**
* Invoked when the overall task has completed successfully. * Invoked when the overall task has completed successfully.
* *
@ -239,6 +246,7 @@ public abstract class IteratingCallback implements Callback
boolean notifyCompleteSuccess = false; boolean notifyCompleteSuccess = false;
Throwable notifyCompleteFailure = null; Throwable notifyCompleteFailure = null;
boolean callOnSuccess = false;
// While we are processing // While we are processing
processing: processing:
while (true) while (true)
@ -247,6 +255,11 @@ public abstract class IteratingCallback implements Callback
Action action = null; Action action = null;
try try
{ {
if (callOnSuccess)
{
onSuccess();
callOnSuccess = false;
}
action = process(); action = process();
} }
catch (Throwable x) catch (Throwable x)
@ -309,6 +322,7 @@ public abstract class IteratingCallback implements Callback
throw new IllegalStateException(String.format("%s[action=%s]", this, action)); throw new IllegalStateException(String.format("%s[action=%s]", this, action));
// we lost the race, so we have to keep processing // we lost the race, so we have to keep processing
_state = State.PROCESSING; _state = State.PROCESSING;
callOnSuccess = true;
continue; continue;
} }
@ -374,7 +388,10 @@ public abstract class IteratingCallback implements Callback
} }
} }
if (process) if (process)
{
onSuccess();
processing(); processing();
}
} }
/** /**

View File

@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
} }
@Override @Override
public void failed(Throwable x) protected void onCompleteFailure(Throwable cause)
{ {
super.failed(x); onError(cause);
onError(x);
} }
} }

View File

@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
} }
@Override @Override
public void failed(Throwable x) protected void onCompleteFailure(Throwable cause)
{ {
super.failed(x); onError(cause);
onError(x);
} }
} }