Error handling improvements for frame channels. (#12895)

* Error handling improvements for frame channels.

Two changes:

1) Send errors down in-memory channels (BlockingQueueFrameChannel) on
   failure. This ensures that in situations where a chain of processors
   has been set up on a single machine, all processors see the root
   cause error. In particular, this means the final processor in the
   chain reports the root cause error, which ensures that someone with
   a handle to the final processor will get the proper error.

2) Update FrameFileHttpResponseHandler to expect that the final fetch,
   rather than being simply empty, is also empty with a special header.
   This ensures that the handler is able to tell the difference between
   an empty fetch due to being at EOF, and an empty fetch due to a
   truncated HTTP response (after the 200 OK and headers are sent down,
   but before any content appears).

* Fix tests, imports.

* Checkstyle!
This commit is contained in:
Gian Merlino 2022-08-14 23:01:55 -07:00 committed by GitHub
parent b26ab678b9
commit 846345669d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 87 additions and 29 deletions

View File

@ -20,7 +20,6 @@
package org.apache.druid.java.util.common;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import javax.annotation.Nullable;
import java.util.Objects;
@ -87,7 +86,8 @@ public class Either<L, R>
if (isValue()) {
return value;
} else if (error instanceof Throwable) {
Throwables.propagateIfPossible((Throwable) error);
// Always wrap Throwable, even if we could throw it directly, to provide additional context
// about where the exception happened (we want the current stack frame in the trace).
throw new RuntimeException((Throwable) error);
} else {
throw new RuntimeException(error.toString());

View File

@ -96,8 +96,9 @@ public class EitherTest
MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class));
MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no"));
final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(AssertionError.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.equalTo("oh no"));
// Test toString.
Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString());

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Optional;
@ -162,12 +163,12 @@ public class BlockingQueueFrameChannel
}
@Override
public void fail()
public void fail(@Nullable Throwable cause)
{
synchronized (lock) {
queue.clear();
if (!queue.offer(Optional.of(Either.error(new ISE("Aborted"))))) {
if (!queue.offer(Optional.of(Either.error(cause != null ? cause : new ISE("Failed"))))) {
// If this happens, it's a bug, potentially due to incorrectly using this class with multiple writers.
throw new ISE("Could not write error to channel");
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.frame.channel;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.Frame;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
@ -54,16 +55,20 @@ public interface WritableFrameChannel extends Closeable
/**
* Called prior to {@link #close()} if the writer has failed. Must be followed by a call to {@link #close()}.
*
* @param cause optional cause of failure. Used by the in-memory channel {@link BlockingQueueFrameChannel.Writable}
* to propagate exeptions to downstream processors. Most other channels ignore the provided cause.
*/
void fail() throws IOException;
void fail(@Nullable Throwable cause) throws IOException;
/**
* Finish writing to this channel.
*
* When this method is called without {@link #fail()} having previously been called, the writer is understood to have
* completed successfully.
* When this method is called without {@link #fail(Throwable)} having previously been called, the writer is
* understood to have completed successfully.
*
* After calling this method, no additional calls to {@link #write}, {@link #fail()}, or this method are permitted.
* After calling this method, no additional calls to {@link #write}, {@link #fail(Throwable)}, or this method
* are permitted.
*/
@Override
void close() throws IOException;

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.file.FrameFileWriter;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -44,8 +45,9 @@ public class WritableFrameFileChannel implements WritableFrameChannel
}
@Override
public void fail() throws IOException
public void fail(@Nullable Throwable cause) throws IOException
{
// Cause is ignored when writing to frame files. Readers can tell the file is truncated, but they won't know why.
writer.abort();
}

View File

@ -40,9 +40,15 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
* to back off from issuing the next request, if appropriate. However: the handler does not implement backpressure
* through the {@link HttpResponseHandler.TrafficCop} mechanism. Therefore, it is important that each request retrieve
* a modest amount of data.
*
* The last fetch must be empty (zero content bytes) and must have the header {@link #HEADER_LAST_FETCH_NAME} set to
* {@link #HEADER_LAST_FETCH_VALUE}. Under these conditions, {@link FrameFilePartialFetch#isLastFetch()} returns true.
*/
public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFilePartialFetch, FrameFilePartialFetch>
{
public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
public static final String HEADER_LAST_FETCH_VALUE = "yes";
private final ReadableByteChunksFrameChannel channel;
public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
@ -53,14 +59,17 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFi
@Override
public ClientResponse<FrameFilePartialFetch> handleResponse(final HttpResponse response, final TrafficCop trafficCop)
{
final ClientResponse<FrameFilePartialFetch> clientResponse = ClientResponse.unfinished(new FrameFilePartialFetch());
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
// Note: if the error body is chunked, we will discard all future chunks due to setting exceptionCaught here.
// This is OK because we don't need the body; just the HTTP status code.
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(false));
exceptionCaught(clientResponse, new ISE("Server for [%s] returned [%s]", channel.getId(), response.getStatus()));
return clientResponse;
} else {
final boolean lastFetchHeaderSet = HEADER_LAST_FETCH_VALUE.equals(response.headers().get(HEADER_LAST_FETCH_NAME));
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(lastFetchHeaderSet));
return response(clientResponse, response.getContent());
}
}

View File

@ -33,6 +33,7 @@ import javax.annotation.Nullable;
*/
public class FrameFilePartialFetch
{
private final boolean lastFetchHeaderSet;
private long bytesRead;
@Nullable
@ -41,13 +42,14 @@ public class FrameFilePartialFetch
@Nullable
private ListenableFuture<?> backpressureFuture;
FrameFilePartialFetch()
FrameFilePartialFetch(boolean lastFetchHeaderSet)
{
this.lastFetchHeaderSet = lastFetchHeaderSet;
}
public boolean isEmptyFetch()
public boolean isLastFetch()
{
return exceptionCaught == null && bytesRead == 0L;
return exceptionCaught == null && lastFetchHeaderSet && bytesRead == 0L;
}
/**

View File

@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@ -317,7 +318,7 @@ public class FrameProcessorExecutor
{
for (final WritableFrameChannel outputChannel : outputChannels) {
try {
outputChannel.fail();
outputChannel.fail(e);
}
catch (Throwable e1) {
e.addSuppressed(e1);
@ -535,7 +536,7 @@ public class FrameProcessorExecutor
// Fail all output channels prior to calling cleanup.
for (final WritableFrameChannel outputChannel : processor.outputChannels()) {
try {
outputChannel.fail();
outputChannel.fail(new CancellationException("Canceled"));
}
catch (Throwable e) {
log.debug(e, "Exception encountered while marking output channel failed for processor [%s]", processor);

View File

@ -119,14 +119,14 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertFalse(response1.getObj().isEmptyFetch());
Assert.assertFalse(response1.getObj().isLastFetch());
final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertFalse(response2.getObj().isEmptyFetch());
Assert.assertFalse(response2.getObj().isLastFetch());
final ListenableFuture<?> backpressureFuture = response2.getObj().backpressureFuture();
Assert.assertFalse(backpressureFuture.isDone());
@ -143,7 +143,7 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
}
@Test
public void testEmptyResponse()
public void testEmptyResponseWithoutLastFetchHeader()
{
final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY),
@ -153,14 +153,42 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertTrue(response1.getObj().isEmptyFetch());
Assert.assertFalse(response1.getObj().isLastFetch());
final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertTrue(response2.getObj().isEmptyFetch());
Assert.assertFalse(response2.getObj().isLastFetch());
Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
}
@Test
public void testEmptyResponseWithLastFetchHeader()
{
final HttpResponse serverResponse = makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY);
serverResponse.headers().set(
FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME,
FrameFileHttpResponseHandler.HEADER_LAST_FETCH_VALUE
);
final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
serverResponse,
null
);
Assert.assertFalse(response1.isFinished());
Assert.assertTrue(response1.isContinueReading());
Assert.assertFalse(response1.getObj().isExceptionCaught());
Assert.assertTrue(response1.getObj().isLastFetch());
final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
Assert.assertTrue(response2.isFinished());
Assert.assertTrue(response2.isContinueReading());
Assert.assertFalse(response2.getObj().isExceptionCaught());
Assert.assertTrue(response2.getObj().isLastFetch());
Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
}
@ -186,7 +214,7 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
Assert.assertFalse(response.isFinished());
Assert.assertFalse(response.getObj().isExceptionCaught());
Assert.assertFalse(response.getObj().isEmptyFetch());
Assert.assertFalse(response.getObj().isLastFetch());
}
final ClientResponse<FrameFilePartialFetch> finalResponse = handler.done(response);
@ -194,7 +222,7 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
Assert.assertTrue(finalResponse.isFinished());
Assert.assertTrue(finalResponse.isContinueReading());
Assert.assertFalse(response.getObj().isExceptionCaught());
Assert.assertFalse(response.getObj().isEmptyFetch());
Assert.assertFalse(response.getObj().isLastFetch());
final ListenableFuture<?> backpressureFuture = response.getObj().backpressureFuture();
Assert.assertFalse(backpressureFuture.isDone());

View File

@ -195,18 +195,23 @@ public class FrameProcessorExecutorTest
);
MatcherAssert.assertThat(
e.getCause(),
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
);
final ReadableFrameChannel outReadableChannel = outChannel.readable();
Assert.assertTrue(outReadableChannel.canRead());
Assert.assertThrows(
IllegalStateException.class,
final RuntimeException readException = Assert.assertThrows(
RuntimeException.class,
outReadableChannel::read
);
MatcherAssert.assertThat(
readException.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
);
Assert.assertTrue(outReadableChannel.isFinished()); // Finished now that we read the error
}

View File

@ -216,7 +216,11 @@ public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameP
final ExecutionException e = Assert.assertThrows(ExecutionException.class, future::get);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class));
MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!")));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(RuntimeException.class));
MatcherAssert.assertThat(
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!"))
);
}
@Test