Merge pull request #11950 from jetty/experiment/jetty-12.0.x/11932-icb-subclasses

Make succeeded and failed in ICB final + introduce onSuccess
This commit is contained in:
Joakim Erdfelt 2024-06-27 12:00:27 -05:00 committed by GitHub
commit f89e3b8aee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 123 additions and 114 deletions

View File

@ -347,17 +347,10 @@ public class ContentDocs
}
@Override
public void succeeded()
protected void onSuccess()
{
// After every successful write, release the chunk.
chunk.release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
}
@Override

View File

@ -347,17 +347,10 @@ public class ContentDocs
}
@Override
public void succeeded()
protected void onSuccess()
{
// After every successful write, release the chunk.
chunk.release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
super.failed(x);
}
@Override

View File

@ -543,7 +543,7 @@ public abstract class HttpSender
}
@Override
public void succeeded()
protected void onSuccess()
{
boolean proceed = true;
if (committed)
@ -588,8 +588,6 @@ public abstract class HttpSender
// There was some concurrent error, terminate.
complete = true;
}
super.succeeded();
}
@Override

View File

@ -235,17 +235,9 @@ public class HttpSenderOverHTTP extends HttpSender
}
@Override
public void succeeded()
protected void onSuccess()
{
release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
release();
super.failed(x);
}
@Override
@ -259,6 +251,7 @@ public class HttpSenderOverHTTP extends HttpSender
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
release();
callback.failed(cause);
}

View File

@ -101,12 +101,11 @@ public class Flusher
}
@Override
public void succeeded()
protected void onSuccess()
{
if (active != null)
active.succeeded();
active = null;
super.succeeded();
}
@Override

View File

@ -294,7 +294,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("Written {} buffers - entries processed/pending {}/{}: {}/{}",
@ -304,7 +304,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
processedEntries,
pendingEntries);
finish();
super.succeeded();
}
private void finish()

View File

@ -515,17 +515,15 @@ public class RawHTTP2ProxyTest
}
@Override
public void succeeded()
protected void onSuccess()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
protected void onCompleteFailure(Throwable cause)
{
frameInfo.callback.failed(failure);
super.failed(failure);
frameInfo.callback.failed(cause);
}
@Override
@ -671,17 +669,15 @@ public class RawHTTP2ProxyTest
}
@Override
public void succeeded()
protected void onSuccess()
{
frameInfo.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
protected void onCompleteFailure(Throwable cause)
{
frameInfo.callback.failed(failure);
super.failed(failure);
frameInfo.callback.failed(cause);
}
private void offer(Stream stream, Frame frame, Callback callback)

View File

@ -108,7 +108,7 @@ public class ControlFlusher extends IteratingCallback
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entries, this);
@ -119,8 +119,6 @@ public class ControlFlusher extends IteratingCallback
entries.clear();
invocationType = InvocationType.NON_BLOCKING;
super.succeeded();
}
@Override

View File

@ -104,14 +104,12 @@ public class InstructionFlusher extends IteratingCallback
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this);
accumulator.release();
super.succeeded();
}
@Override

View File

@ -75,7 +75,7 @@ public class MessageFlusher extends IteratingCallback
return Action.SCHEDULED;
}
int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::failed);
int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::onGenerateFailure);
if (generated < 0)
return Action.SCHEDULED;
@ -88,33 +88,48 @@ public class MessageFlusher extends IteratingCallback
return Action.SCHEDULED;
}
private void onGenerateFailure(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to generate {} on {}", entry, this, cause);
accumulator.release();
entry.callback.failed(cause);
entry = null;
// Continue the iteration.
succeeded();
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this);
accumulator.release();
if (entry != null)
{
entry.callback.succeeded();
entry = null;
super.succeeded();
}
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", entry, this, x);
LOG.debug("failed to write {} on {}", entry, this, cause);
accumulator.release();
entry.callback.failed(x);
if (entry != null)
{
entry.callback.failed(cause);
entry = null;
// Continue the iteration.
super.succeeded();
}
}
@Override

View File

@ -287,13 +287,12 @@ public class BufferedContentSink implements Content.Sink
}
@Override
public void succeeded()
protected void onSuccess()
{
_buffer = null;
Callback callback = _callback;
_callback = null;
callback.succeeded();
super.succeeded();
}
@Override

View File

@ -17,11 +17,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.EventListener;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@ -39,7 +40,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
@ -344,26 +344,19 @@ public abstract class QuicConnection extends AbstractConnection
private class Flusher extends IteratingCallback
{
private final AutoLock lock = new AutoLock();
private final ArrayDeque<Entry> queue = new ArrayDeque<>();
private final Queue<Entry> queue = new ConcurrentLinkedQueue<>();
private Entry entry;
public void offer(Callback callback, SocketAddress address, ByteBuffer[] buffers)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(callback, address, buffers));
}
iterate();
}
@Override
protected Action process()
{
try (AutoLock l = lock.lock())
{
entry = queue.poll();
}
if (entry == null)
return Action.IDLE;
@ -372,17 +365,9 @@ public abstract class QuicConnection extends AbstractConnection
}
@Override
public void succeeded()
protected void onSuccess()
{
entry.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
entry.callback.failed(x);
super.failed(x);
}
@Override
@ -394,10 +379,11 @@ public abstract class QuicConnection extends AbstractConnection
@Override
protected void onCompleteFailure(Throwable cause)
{
entry.callback.failed(cause);
QuicConnection.this.close();
}
private class Entry
private static class Entry
{
private final Callback callback;
private final SocketAddress address;

View File

@ -521,12 +521,11 @@ public abstract class QuicSession extends ContainerLifeCycle
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("written cipher bytes on {}", QuicSession.this);
cipherBuffer.release();
super.succeeded();
}
@Override

View File

@ -760,17 +760,11 @@ public class ConnectHandler extends Handler.Wrapper
}
@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this);
buffer.release();
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
}
@Override

View File

@ -237,7 +237,8 @@ public class CustomTransportTest
channels.put(channel.id, channel);
// Register for read interest with the EndPoint.
endPoint.fillInterested(new EndPointToChannelCallback(channel));
EndPointToChannelCallback endPointToChannelCallback = new EndPointToChannelCallback(channel);
endPoint.fillInterested(Callback.from(endPointToChannelCallback::iterate));
}
// Called when there data to read from the Gateway on the given Channel.
@ -322,18 +323,10 @@ public class CustomTransportTest
endPoint.fillInterested(this);
return Action.IDLE;
}
channel.write(this, buffer);
channel.write(Callback.from(this::iterate), buffer);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
// There is data to read from the EndPoint.
// Iterate to read it and send it to the Gateway.
iterate();
}
@Override
protected void onCompleteSuccess()
{

View File

@ -167,6 +167,18 @@ public abstract class IteratingCallback implements Callback
*/
protected abstract Action process() throws Throwable;
/**
* Invoked when one task has completed successfully, either by the
* caller thread or by the processing thread. This invocation is
* always serialized w.r.t the execution of {@link #process()}.
* <p>
* This method is not invoked when a call to {@link #abort(Throwable)}
* is made before the {@link #succeeded()} callback happens.
*/
protected void onSuccess()
{
}
/**
* Invoked when the overall task has completed successfully.
*
@ -255,6 +267,7 @@ public abstract class IteratingCallback implements Callback
// Fall through to possibly invoke onCompleteFailure().
}
boolean callOnSuccess = false;
// acted on the action we have just received
try (AutoLock ignored = _lock.lock())
{
@ -305,6 +318,7 @@ public abstract class IteratingCallback implements Callback
case CALLED:
{
callOnSuccess = true;
if (action != Action.SCHEDULED)
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
// we lost the race, so we have to keep processing
@ -327,6 +341,11 @@ public abstract class IteratingCallback implements Callback
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
}
}
finally
{
if (callOnSuccess)
onSuccess();
}
}
if (notifyCompleteSuccess)
@ -338,8 +357,11 @@ public abstract class IteratingCallback implements Callback
/**
* Method to invoke when the asynchronous sub-task succeeds.
* <p>
* Subclasses that override this method must always remember
* to call {@code super.succeeded()}.
* This method should be considered final for all practical purposes.
* <p>
* Eventually, {@link #onSuccess()} is
* called, either by the caller thread or by the processing
* thread.
*/
@Override
public void succeeded()
@ -374,16 +396,18 @@ public abstract class IteratingCallback implements Callback
}
}
if (process)
{
onSuccess();
processing();
}
}
/**
* Method to invoke when the asynchronous sub-task fails,
* or to fail the overall asynchronous task and therefore
* terminate the iteration.
* <p>
* Subclasses that override this method must always remember
* to call {@code super.failed(Throwable)}.
* This method should be considered final for all practical purposes.
* <p>
* Eventually, {@link #onCompleteFailure(Throwable)} is
* called, either by the caller thread or by the processing

View File

@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class IteratingCallbackTest
@ -435,4 +436,28 @@ public class IteratingCallbackTest
assertEquals(1, count.get());
}
@Test
public void testOnSuccessCalledDespiteISE() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
IteratingCallback icb = new IteratingCallback()
{
@Override
protected Action process()
{
succeeded();
return Action.IDLE; // illegal action
}
@Override
protected void onSuccess()
{
latch.countDown();
}
};
assertThrows(IllegalStateException.class, icb::iterate);
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
super.failed(x);
onError(x);
onError(cause);
}
}

View File

@ -131,6 +131,9 @@
<configuration>
<debug>${it.debug}</debug>
<addTestClassPath>true</addTestClassPath>
<extraArtifacts>
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
</extraArtifacts>
<junitPackageName>org.eclipse.jetty.maven.its.ee10.runner</junitPackageName>
<scriptVariables>
<maven.dependency.plugin.version>${maven.dependency.plugin.version}</maven.dependency.plugin.version>

View File

@ -135,6 +135,9 @@
<configuration>
<debug>${it.debug}</debug>
<addTestClassPath>true</addTestClassPath>
<extraArtifacts>
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
</extraArtifacts>
<junitPackageName>org.eclipse.jetty.maven.its.ee8.runner</junitPackageName>
<scriptVariables>
<maven.dependency.plugin.version>${maven.dependency.plugin.version}</maven.dependency.plugin.version>

View File

@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
super.failed(x);
onError(x);
onError(cause);
}
}

View File

@ -133,6 +133,9 @@
<configuration>
<debug>${it.debug}</debug>
<addTestClassPath>true</addTestClassPath>
<extraArtifacts>
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
</extraArtifacts>
<junitPackageName>org.eclipse.jetty.maven.its.ee9.runner</junitPackageName>
<scriptVariables>
<maven.dependency.plugin.version>${maven.dependency.plugin.version}</maven.dependency.plugin.version>