* Separated read failures from write failures. * In this way it is possible to read even if the write side is failed and write even if the read side is failed. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
71354331e5
commit
8638d80bd5
|
@ -191,6 +191,7 @@ Failure listeners are invoked also in case of idle timeouts, in the following ca
|
|||
* There are no idle timeout listeners.
|
||||
|
||||
Failures reported to a failure listener are always fatal failures; see also xref:pg-arch-io-content-source[this section] about fatal versus transient failures.
|
||||
This means that it is not possible to read or write from a failure listener: the read returns a fatal failure chunk, and the write will immediately fail the write callback.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.server.internal;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -111,17 +112,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
private long _committedContentLength = -1;
|
||||
private Runnable _onContentAvailable;
|
||||
private Predicate<TimeoutException> _onIdleTimeout;
|
||||
/**
|
||||
* Failure passed to {@link #onFailure(Throwable)}
|
||||
*/
|
||||
private Content.Chunk _failure;
|
||||
/**
|
||||
* Listener for {@link #onFailure(Throwable)} events
|
||||
*/
|
||||
private Content.Chunk _readFailure;
|
||||
private Consumer<Throwable> _onFailure;
|
||||
/**
|
||||
* Failure passed to {@link ChannelCallback#failed(Throwable)}
|
||||
*/
|
||||
private Throwable _callbackFailure;
|
||||
private Attributes _cache;
|
||||
|
||||
|
@ -158,7 +150,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_committedContentLength = -1;
|
||||
_onContentAvailable = null;
|
||||
_onIdleTimeout = null;
|
||||
_failure = null;
|
||||
_readFailure = null;
|
||||
_onFailure = null;
|
||||
_callbackFailure = null;
|
||||
}
|
||||
|
@ -329,43 +321,43 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onIdleTimeout {}", this, t);
|
||||
|
||||
// if not already a failure,
|
||||
if (_failure == null)
|
||||
Runnable invokeOnContentAvailable = null;
|
||||
if (_readFailure == null)
|
||||
{
|
||||
// if we are currently demanding, take the onContentAvailable runnable to invoke below.
|
||||
Runnable invokeOnContentAvailable = _onContentAvailable;
|
||||
// If there is demand, take the onContentAvailable runnable to invoke below.
|
||||
invokeOnContentAvailable = _onContentAvailable;
|
||||
_onContentAvailable = null;
|
||||
|
||||
// If demand was in process, then arrange for the next read to return the idle timeout, if no other error
|
||||
// If there was demand, then arrange for the next read to return a transient chunk failure.
|
||||
if (invokeOnContentAvailable != null)
|
||||
_failure = Content.Chunk.from(t, false);
|
||||
_readFailure = Content.Chunk.from(t, false);
|
||||
}
|
||||
|
||||
// If a write call is in progress, take the writeCallback to fail below
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(t);
|
||||
// If a write call is pending, take the writeCallback to fail below.
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(t, false);
|
||||
|
||||
// If there was an IO operation, just deliver the idle timeout via them
|
||||
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
|
||||
return _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure);
|
||||
// If there was a pending IO operation, deliver the idle timeout via them.
|
||||
if (invokeOnContentAvailable != null || invokeWriteFailure != null)
|
||||
return _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure);
|
||||
|
||||
// otherwise, if there is an idle timeout listener, we ask if if we should call onFailure or not
|
||||
Predicate<TimeoutException> onIdleTimeout = _onIdleTimeout;
|
||||
if (onIdleTimeout != null)
|
||||
// Otherwise, if there are idle timeout listeners, ask them whether we should call onFailure.
|
||||
Predicate<TimeoutException> onIdleTimeout = _onIdleTimeout;
|
||||
if (onIdleTimeout != null)
|
||||
{
|
||||
return _serializedInvoker.offer(() ->
|
||||
{
|
||||
return _serializedInvoker.offer(() ->
|
||||
if (onIdleTimeout.test(t))
|
||||
{
|
||||
if (onIdleTimeout.test(t))
|
||||
{
|
||||
// If the idle timeout listener(s) return true, then we call onFailure and run any task it returns.
|
||||
Runnable task = onFailure(t);
|
||||
if (task != null)
|
||||
task.run();
|
||||
}
|
||||
});
|
||||
}
|
||||
// If the idle timeout listener(s) return true, then we call onFailure and run any task it returns.
|
||||
Runnable task = onFailure(t);
|
||||
if (task != null)
|
||||
task.run();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// otherwise treat as a failure
|
||||
// Otherwise treat as a failure.
|
||||
return onFailure(t);
|
||||
}
|
||||
|
||||
|
@ -393,12 +385,6 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_response = new ChannelResponse(_request);
|
||||
}
|
||||
|
||||
// Set the error to arrange for any subsequent reads, demands or writes to fail.
|
||||
if (_failure == null)
|
||||
_failure = Content.Chunk.from(x, true);
|
||||
else if (ExceptionUtil.areNotAssociated(_failure.getFailure(), x) && _failure.getFailure().getClass() != x.getClass())
|
||||
_failure.getFailure().addSuppressed(x);
|
||||
|
||||
// If not handled, then we just fail the request callback
|
||||
if (!_handled && _handling == null)
|
||||
{
|
||||
|
@ -406,12 +392,18 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
}
|
||||
else
|
||||
{
|
||||
// Set the failure to arrange for any subsequent reads or demands to fail.
|
||||
if (_readFailure == null)
|
||||
_readFailure = Content.Chunk.from(x, true);
|
||||
else
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(_readFailure.getFailure(), x);
|
||||
|
||||
// If there is demand, take the onContentAvailable runnable to invoke below.
|
||||
Runnable invokeOnContentAvailable = _onContentAvailable;
|
||||
_onContentAvailable = null;
|
||||
|
||||
// If a write call is in progress, take the writeCallback to fail below.
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(x);
|
||||
Runnable invokeWriteFailure = _response.lockedFailWrite(x, true);
|
||||
|
||||
// Notify the failure listeners only once.
|
||||
Consumer<Throwable> onFailure = _onFailure;
|
||||
|
@ -850,8 +842,8 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
Content.Chunk error = httpChannel._failure;
|
||||
httpChannel._failure = Content.Chunk.next(error);
|
||||
Content.Chunk error = httpChannel._readFailure;
|
||||
httpChannel._readFailure = Content.Chunk.next(error);
|
||||
if (error != null)
|
||||
return error;
|
||||
|
||||
|
@ -898,7 +890,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("demand {}", httpChannelState);
|
||||
|
||||
error = httpChannelState._failure != null;
|
||||
error = httpChannelState._readFailure != null;
|
||||
if (!error)
|
||||
{
|
||||
if (httpChannelState._onContentAvailable != null)
|
||||
|
@ -936,7 +928,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
if (httpChannel._failure != null)
|
||||
if (httpChannel._readFailure != null)
|
||||
return;
|
||||
|
||||
if (httpChannel._onIdleTimeout == null)
|
||||
|
@ -963,7 +955,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
HttpChannelState httpChannel = lockedGetHttpChannelState();
|
||||
|
||||
if (httpChannel._failure != null)
|
||||
if (httpChannel._readFailure != null)
|
||||
return;
|
||||
|
||||
if (httpChannel._onFailure == null)
|
||||
|
@ -1031,6 +1023,7 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
private long _contentBytesWritten;
|
||||
private Supplier<HttpFields> _trailers;
|
||||
private Callback _writeCallback;
|
||||
private Throwable _writeFailure;
|
||||
|
||||
private ChannelResponse(ChannelRequest request)
|
||||
{
|
||||
|
@ -1054,12 +1047,21 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
return _writeCallback != null;
|
||||
}
|
||||
|
||||
private Runnable lockedFailWrite(Throwable x)
|
||||
private Runnable lockedFailWrite(Throwable x, boolean fatal)
|
||||
{
|
||||
assert _request._lock.isHeldByCurrentThread();
|
||||
Callback writeCallback = _writeCallback;
|
||||
_writeCallback = null;
|
||||
return writeCallback == null ? null : () -> writeCallback.failed(x);
|
||||
if (writeCallback != null || fatal)
|
||||
{
|
||||
if (_writeFailure == null)
|
||||
_writeFailure = x;
|
||||
else
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(_writeFailure, x);
|
||||
}
|
||||
if (writeCallback == null)
|
||||
return null;
|
||||
return () -> writeCallback.failed(x);
|
||||
}
|
||||
|
||||
public long getContentBytesWritten()
|
||||
|
@ -1115,65 +1117,69 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
long length = BufferUtil.length(content);
|
||||
|
||||
HttpChannelState httpChannelState;
|
||||
HttpChannelState httpChannel;
|
||||
HttpStream stream;
|
||||
Throwable failure;
|
||||
Throwable writeFailure;
|
||||
MetaData.Response responseMetaData = null;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
httpChannelState = _request.lockedGetHttpChannelState();
|
||||
long committedContentLength = httpChannelState._committedContentLength;
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
long totalWritten = _contentBytesWritten + length;
|
||||
long contentLength = committedContentLength >= 0 ? committedContentLength : getHeaders().getLongField(HttpHeader.CONTENT_LENGTH);
|
||||
writeFailure = _writeFailure;
|
||||
|
||||
if (_writeCallback != null)
|
||||
if (writeFailure == null)
|
||||
{
|
||||
failure = new IllegalStateException("write pending");
|
||||
}
|
||||
else
|
||||
{
|
||||
failure = getFailure(httpChannelState);
|
||||
if (failure == null && contentLength >= 0 && totalWritten != contentLength)
|
||||
if (_writeCallback != null)
|
||||
{
|
||||
// If the content length were not compatible with what was written, then we need to abort.
|
||||
String lengthError = null;
|
||||
if (totalWritten > contentLength)
|
||||
lengthError = "written %d > %d content-length";
|
||||
else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())))
|
||||
lengthError = "written %d < %d content-length";
|
||||
if (lengthError != null)
|
||||
writeFailure = new WritePendingException();
|
||||
}
|
||||
else
|
||||
{
|
||||
long committedContentLength = httpChannel._committedContentLength;
|
||||
long contentLength = committedContentLength >= 0 ? committedContentLength : getHeaders().getLongField(HttpHeader.CONTENT_LENGTH);
|
||||
|
||||
if (contentLength >= 0 && totalWritten != contentLength)
|
||||
{
|
||||
String message = lengthError.formatted(totalWritten, contentLength);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("fail {} {}", callback, message);
|
||||
failure = new IOException(message);
|
||||
// If the content length were not compatible with what was written, then we need to abort.
|
||||
String lengthError = null;
|
||||
if (totalWritten > contentLength)
|
||||
lengthError = "written %d > %d content-length";
|
||||
else if (last && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())))
|
||||
lengthError = "written %d < %d content-length";
|
||||
if (lengthError != null)
|
||||
{
|
||||
String message = lengthError.formatted(totalWritten, contentLength);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("fail {} {}", callback, message);
|
||||
writeFailure = new IOException(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no failure by this point, we can try to switch to sending state.
|
||||
if (failure == null)
|
||||
failure = httpChannelState.lockedStreamSend(last, length);
|
||||
if (writeFailure == null)
|
||||
writeFailure = httpChannel.lockedStreamSend(last, length);
|
||||
|
||||
if (failure == NOTHING_TO_SEND)
|
||||
if (writeFailure == NOTHING_TO_SEND)
|
||||
{
|
||||
httpChannelState._serializedInvoker.run(callback::succeeded);
|
||||
httpChannel._serializedInvoker.run(callback::succeeded);
|
||||
return;
|
||||
}
|
||||
// Have we failed in some way?
|
||||
if (failure != null)
|
||||
if (writeFailure != null)
|
||||
{
|
||||
Throwable throwable = failure;
|
||||
httpChannelState._serializedInvoker.run(() -> callback.failed(throwable));
|
||||
Throwable failure = writeFailure;
|
||||
httpChannel._serializedInvoker.run(() -> callback.failed(failure));
|
||||
return;
|
||||
}
|
||||
|
||||
// No failure, do the actual stream send using the ChannelResponse as the callback.
|
||||
_writeCallback = callback;
|
||||
_contentBytesWritten = totalWritten;
|
||||
stream = httpChannelState._stream;
|
||||
stream = httpChannel._stream;
|
||||
if (_httpFields.commit())
|
||||
responseMetaData = lockedPrepareResponse(httpChannelState, last);
|
||||
responseMetaData = lockedPrepareResponse(httpChannel, last);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -1181,12 +1187,6 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
stream.send(_request._metaData, responseMetaData, last, content, this);
|
||||
}
|
||||
|
||||
protected Throwable getFailure(HttpChannelState httpChannelState)
|
||||
{
|
||||
Content.Chunk failure = httpChannelState._failure;
|
||||
return failure == null ? null : failure.getFailure();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the call to
|
||||
* {@link HttpStream#send(MetaData.Request, MetaData.Response, boolean, ByteBuffer, Callback)}
|
||||
|
@ -1199,14 +1199,13 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("write succeeded {}", this);
|
||||
// Called when an individual write succeeds.
|
||||
Callback callback;
|
||||
HttpChannelState httpChannel;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
callback = _writeCallback;
|
||||
_writeCallback = null;
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
httpChannel.lockedStreamSendCompleted(true);
|
||||
}
|
||||
if (callback != null)
|
||||
|
@ -1227,14 +1226,14 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("write failed {}", this, x);
|
||||
// Called when an individual write succeeds.
|
||||
Callback callback;
|
||||
HttpChannelState httpChannel;
|
||||
try (AutoLock ignored = _request._lock.lock())
|
||||
{
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
_writeFailure = x;
|
||||
callback = _writeCallback;
|
||||
_writeCallback = null;
|
||||
httpChannel = _request.lockedGetHttpChannelState();
|
||||
httpChannel.lockedStreamSendCompleted(false);
|
||||
}
|
||||
if (callback != null)
|
||||
|
@ -1520,13 +1519,6 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
_status = HttpStatus.INTERNAL_SERVER_ERROR_500;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Throwable getFailure(HttpChannelState httpChannelState)
|
||||
{
|
||||
// we ignore channel failures so we can try to generate an error response.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResponseHttpFields getResponseHttpFields(HttpChannelState httpChannelState)
|
||||
{
|
||||
|
@ -1649,41 +1641,23 @@ public class HttpChannelState implements HttpChannel, Components
|
|||
protected void onError(Runnable task, Throwable failure)
|
||||
{
|
||||
ChannelRequest request;
|
||||
Content.Chunk error;
|
||||
boolean callbackCompleted;
|
||||
try (AutoLock ignore = _lock.lock())
|
||||
{
|
||||
callbackCompleted = _callbackCompleted;
|
||||
request = _request;
|
||||
error = _request == null ? null : _failure;
|
||||
}
|
||||
|
||||
if (request == null || callbackCompleted)
|
||||
{
|
||||
// It is too late to handle error, so just log it
|
||||
// It is too late to handle error.
|
||||
super.onError(task, failure);
|
||||
return;
|
||||
}
|
||||
else if (error == null)
|
||||
{
|
||||
// Try to fail the request, but we might lose a race.
|
||||
try
|
||||
{
|
||||
request._callback.failed(failure);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
ExceptionUtil.addSuppressedIfNotAssociated(failure, t);
|
||||
super.onError(task, failure);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We are already in error, so we will not handle this one,
|
||||
// but we will add as suppressed if we have not seen it already.
|
||||
Throwable cause = error.getFailure();
|
||||
if (ExceptionUtil.areNotAssociated(cause, failure))
|
||||
error.getFailure().addSuppressed(failure);
|
||||
}
|
||||
|
||||
Runnable failureTask = onFailure(failure);
|
||||
if (failureTask != null)
|
||||
failureTask.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -506,6 +506,8 @@ public class HttpChannelTest
|
|||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
request.addFailureListener(callback::failed);
|
||||
|
||||
response.setStatus(200);
|
||||
response.getHeaders().put(HttpHeader.CONTENT_LENGTH, 10);
|
||||
response.write(false, null, Callback.from(() ->
|
||||
|
@ -1164,7 +1166,7 @@ public class HttpChannelTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOnError() throws Exception
|
||||
public void testOnFailure() throws Exception
|
||||
{
|
||||
AtomicReference<Response> handling = new AtomicReference<>();
|
||||
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
|
@ -1193,24 +1195,24 @@ public class HttpChannelTest
|
|||
Runnable onRequest = channel.onRequest(request);
|
||||
onRequest.run();
|
||||
|
||||
// check we are handling
|
||||
// Check we are handling.
|
||||
assertNotNull(handling.get());
|
||||
assertThat(stream.isComplete(), is(false));
|
||||
assertThat(stream.getFailure(), nullValue());
|
||||
assertThat(stream.getResponse(), nullValue());
|
||||
|
||||
// failure happens
|
||||
// Failure happens.
|
||||
IOException failure = new IOException("Testing");
|
||||
Runnable onError = channel.onFailure(failure);
|
||||
assertNotNull(onError);
|
||||
Runnable onFailure = channel.onFailure(failure);
|
||||
assertNotNull(onFailure);
|
||||
|
||||
// onError not yet called
|
||||
// Failure listeners not yet called.
|
||||
assertThat(error.get(), nullValue());
|
||||
|
||||
// request still handling
|
||||
// Request still handling.
|
||||
assertFalse(stream.isComplete());
|
||||
|
||||
// but now we cannot read, demand nor write
|
||||
// Can read the failure.
|
||||
Request rq = handling.get().getRequest();
|
||||
Content.Chunk chunk = rq.read();
|
||||
assertTrue(chunk.isLast());
|
||||
|
@ -1218,30 +1220,30 @@ public class HttpChannelTest
|
|||
assertThat(chunk.getFailure(), sameInstance(failure));
|
||||
|
||||
CountDownLatch demand = new CountDownLatch(1);
|
||||
// Callback serialized until after onError task
|
||||
// Demand callback serialized until after onFailure listeners.
|
||||
rq.demand(demand::countDown);
|
||||
assertThat(demand.getCount(), is(1L));
|
||||
|
||||
FuturePromise<Throwable> callback = new FuturePromise<>();
|
||||
// Callback serialized until after onError task
|
||||
// Write callback serialized until after onFailure listeners.
|
||||
handling.get().write(false, null, Callback.from(() ->
|
||||
{}, callback::succeeded));
|
||||
assertFalse(callback.isDone());
|
||||
|
||||
// process error callback
|
||||
// Process onFailure task.
|
||||
try (StacklessLogging ignore = new StacklessLogging(Response.class))
|
||||
{
|
||||
onError.run();
|
||||
onFailure.run();
|
||||
}
|
||||
|
||||
// onError was called
|
||||
// onFailure listeners were called.
|
||||
assertThat(error.get(), sameInstance(failure));
|
||||
// demand callback was called
|
||||
// Demand callback was called.
|
||||
assertTrue(demand.await(5, TimeUnit.SECONDS));
|
||||
// write callback was failed
|
||||
// Write callback was failed.
|
||||
assertThat(callback.get(5, TimeUnit.SECONDS), sameInstance(failure));
|
||||
|
||||
// request completed handling
|
||||
// Request handling was completed.
|
||||
assertTrue(stream.isComplete());
|
||||
}
|
||||
|
||||
|
|
|
@ -208,7 +208,7 @@ public class HttpServerTestFixture
|
|||
response.setStatus(200);
|
||||
Content.Source.asString(request, StandardCharsets.UTF_8, Promise.from(
|
||||
s -> Content.Sink.write(response, true, "read %d%n" + s.length(), callback),
|
||||
t -> Content.Sink.write(response, true, String.format("caught %s%n", t), callback)
|
||||
callback::failed
|
||||
));
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,245 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpTester;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.QuietException;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ReadWriteFailuresTest
|
||||
{
|
||||
private Server server;
|
||||
private LocalConnector connector;
|
||||
|
||||
private void start(Handler handler) throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
connector = new LocalConnector(server);
|
||||
server.addConnector(connector);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void destroy() throws Exception
|
||||
{
|
||||
LifeCycle.stop(server);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFailureDoesNotImpactSubsequentWrite() throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
String content = "no impact :)";
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
// Upon idle timeout, the demand callback is invoked.
|
||||
request.demand(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
Content.Chunk chunk = request.read();
|
||||
assertTrue(Content.Chunk.isFailure(chunk, false));
|
||||
|
||||
response.setStatus(HttpStatus.ACCEPTED_202);
|
||||
Content.Sink.write(response, true, content, callback);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
});
|
||||
connector.setIdleTimeout(idleTimeout);
|
||||
|
||||
String request = """
|
||||
POST / HTTP/1.1
|
||||
Host: localhost
|
||||
Content-Length: 1
|
||||
|
||||
""";
|
||||
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request, 5, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(HttpStatus.ACCEPTED_202, response.getStatus());
|
||||
assertEquals(content, response.getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteFailureDoesNotImpactSubsequentReads() throws Exception
|
||||
{
|
||||
String content = "0123456789";
|
||||
Throwable writeFailure = new IOException();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
request.addHttpStreamWrapper(stream -> new HttpStream.Wrapper(stream)
|
||||
{
|
||||
@Override
|
||||
public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback)
|
||||
{
|
||||
callback.failed(writeFailure);
|
||||
}
|
||||
});
|
||||
|
||||
// First write must fail.
|
||||
Callback.Completable completable1 = new Callback.Completable();
|
||||
Content.Sink.write(response, false, "first_write", completable1);
|
||||
Throwable writeFailure1 = assertThrows(ExecutionException.class, () -> completable1.get(5, TimeUnit.SECONDS)).getCause();
|
||||
assertSame(writeFailure, writeFailure1);
|
||||
|
||||
// Try a second write, it should fail.
|
||||
Callback.Completable completable2 = new Callback.Completable();
|
||||
Content.Sink.write(response, false, "second_write", completable2);
|
||||
Throwable writeFailure2 = assertThrows(ExecutionException.class, () -> completable2.get(5, TimeUnit.SECONDS)).getCause();
|
||||
assertSame(writeFailure1, writeFailure2);
|
||||
|
||||
// Now try to read.
|
||||
String read = Content.Source.asString(request);
|
||||
assertEquals(content, read);
|
||||
|
||||
latch.countDown();
|
||||
|
||||
callback.succeeded();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
String request = """
|
||||
POST / HTTP/1.1
|
||||
Host: localhost
|
||||
Content-Length: %d
|
||||
|
||||
%s
|
||||
""".formatted(content.length(), content);
|
||||
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(request))
|
||||
{
|
||||
endPoint.waitUntilClosedOrIdleFor(5, TimeUnit.SECONDS);
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testFailureFailsPendingWrite(boolean fatal) throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
request.addHttpStreamWrapper(stream -> new HttpStream.Wrapper(stream)
|
||||
{
|
||||
@Override
|
||||
public void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback)
|
||||
{
|
||||
// Do nothing to make the write pending.
|
||||
}
|
||||
});
|
||||
|
||||
request.addIdleTimeoutListener(x -> fatal);
|
||||
|
||||
Callback.Completable completable1 = new Callback.Completable();
|
||||
Content.Sink.write(response, true, "hello world", completable1);
|
||||
Throwable writeFailure1 = assertThrows(ExecutionException.class, () -> completable1.get(2 * idleTimeout, TimeUnit.MILLISECONDS)).getCause();
|
||||
|
||||
// Verify that further writes are failed.
|
||||
Callback.Completable completable2 = new Callback.Completable();
|
||||
Content.Sink.write(response, true, "hello world", completable2);
|
||||
Throwable writeFailure2 = assertThrows(ExecutionException.class, () -> completable2.get(5, TimeUnit.SECONDS)).getCause();
|
||||
assertSame(writeFailure1, writeFailure2);
|
||||
|
||||
latch.countDown();
|
||||
|
||||
callback.failed(writeFailure1);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
connector.setIdleTimeout(idleTimeout);
|
||||
|
||||
String request = """
|
||||
POST / HTTP/1.1
|
||||
Host: localhost
|
||||
Content-Length: 1
|
||||
|
||||
""";
|
||||
try (LocalConnector.LocalEndPoint endPoint = connector.executeRequest(request))
|
||||
{
|
||||
endPoint.waitUntilClosedOrIdleFor(5, TimeUnit.SECONDS);
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDemandCallbackThrows() throws Exception
|
||||
{
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, Response response, Callback callback)
|
||||
{
|
||||
request.addFailureListener(callback::failed);
|
||||
request.demand(() ->
|
||||
{
|
||||
// Results in a fatal failure, and failure listener is invoked.
|
||||
throw new QuietException.RuntimeException();
|
||||
});
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
String content = "hello world";
|
||||
String request = """
|
||||
POST / HTTP/1.1
|
||||
Host: localhost
|
||||
Content-Length: %d
|
||||
|
||||
%s
|
||||
""".formatted(content.length(), content);
|
||||
HttpTester.Response response = HttpTester.parseResponse(connector.getResponse(request, 5, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus());
|
||||
assertThat(response.getContent(), containsString("QuietException"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue