Fix IndexerWorkerClient#fetchChannelData when response has data and error. (#15084)

* Fix IndexerWorkerClient#fetchChannelData when response has data and error.

When a channel data response from a worker includes some data and then
some I/O error, then when the call is retried, we will re-read the set
of data that was read by the previous connection and add it to the
local channel again. This causes the local channel to become corrupted.
The patch fixes this case by skipping data that has already been read.
This commit is contained in:
Gian Merlino 2023-10-08 22:42:28 -07:00 committed by GitHub
parent c7d0615af3
commit c483cb863d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 11 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.frame.file;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.http.client.response.ClientResponse;
@ -49,11 +50,20 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFi
public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
public static final String HEADER_LAST_FETCH_VALUE = "yes";
/**
* Channel to write to.
*/
private final ReadableByteChunksFrameChannel channel;
/**
* Starting offset for this handler.
*/
private final long startOffset;
public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
{
this.channel = Preconditions.checkNotNull(channel, "channel");
this.startOffset = channel.getBytesAdded();
}
@Override
@ -114,8 +124,21 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFi
return ClientResponse.finished(clientResponseObj);
}
final byte[] chunk = new byte[content.readableBytes()];
content.getBytes(content.readerIndex(), chunk);
final byte[] chunk;
final int chunkSize = content.readableBytes();
// Potentially skip some of this chunk, if the relevant bytes have already been read by the handler. This can
// happen if a request reads some data, then fails with a retryable I/O error, and then is retried. The retry
// will re-read some data that has already been added to the channel, so we need to skip it.
final long readByThisHandler = channel.getBytesAdded() - startOffset;
final long readByThisRequest = clientResponseObj.getBytesRead(); // Prior to the current chunk
final long toSkip = readByThisHandler - readByThisRequest;
if (toSkip < 0) {
throw DruidException.defensive("Expected toSkip[%d] to be nonnegative", toSkip);
} else if (toSkip < chunkSize) { // When toSkip >= chunkSize, we skip the entire chunk and do not toucn the channel
chunk = new byte[chunkSize - (int) toSkip];
content.getBytes(content.readerIndex() + (int) toSkip, chunk);
try {
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
@ -123,13 +146,14 @@ public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFi
if (backpressureFuture != null) {
clientResponseObj.setBackpressureFuture(backpressureFuture);
}
clientResponseObj.addBytesRead(chunk.length);
}
catch (Exception e) {
clientResponseObj.exceptionCaught(e);
}
}
// Call addBytesRead even if we skipped some or all of the chunk, because that lets us know when to stop skipping.
clientResponseObj.addBytesRead(chunkSize);
return ClientResponse.unfinished(clientResponseObj);
}
}

View File

@ -74,6 +74,14 @@ public class FrameFilePartialFetch
return exceptionCaught != null;
}
/**
* Number of bytes read so far by this request.
*/
public long getBytesRead()
{
return bytesRead;
}
/**
* Future that resolves when it is a good time to request the next chunk of the frame file.
*
@ -105,6 +113,9 @@ public class FrameFilePartialFetch
}
}
/**
* Increment the value returned by {@link #getBytesRead()}. Called whenever a chunk of data is read from the response.
*/
void addBytesRead(final long n)
{
bytesRead += n;

View File

@ -346,6 +346,86 @@ public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTes
);
}
@Test
public void testCaughtExceptionDuringChunkedResponseRetryWithSameHandler() throws Exception
{
// Split file into 12 chunks after the first 100 bytes.
final int firstPart = 100;
final int chunkSize = Ints.checkedCast(LongMath.divide(file.length() - firstPart, 12, RoundingMode.CEILING));
final byte[] allBytes = Files.readAllBytes(file.toPath());
// Add firstPart and be done.
ClientResponse<FrameFilePartialFetch> response = handler.done(
handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, 0, firstPart)),
null
)
);
Assert.assertEquals(firstPart, channel.getBytesAdded());
Assert.assertTrue(response.isFinished());
// Add first quarter after firstPart using a new handler.
handler = new FrameFileHttpResponseHandler(channel);
response = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 3)),
null
);
// Set an exception.
handler.exceptionCaught(response, new ISE("Oh no!"));
// Add another chunk after the exception is caught (this can happen in real life!). We expect it to be ignored.
response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 3, chunkSize * 3)),
2
);
// Verify that the exception handler was called.
Assert.assertTrue(response.getObj().isExceptionCaught());
final Throwable e = response.getObj().getExceptionCaught();
MatcherAssert.assertThat(e, CoreMatchers.instanceOf(IllegalStateException.class));
MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Oh no!")));
// Retry connection with the same handler and same initial offset firstPart (don't recreate handler), but now use
// thirds instead of quarters as chunks. (ServiceClientImpl would retry from the same offset with the same handler
// if the exception is retryable.)
response = handler.handleResponse(
makeResponse(HttpResponseStatus.OK, byteSlice(allBytes, firstPart, chunkSize * 4)),
null
);
Assert.assertEquals(firstPart + chunkSize * 4L, channel.getBytesAdded());
Assert.assertFalse(response.isFinished());
// Send the rest of the data.
response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 4, chunkSize * 4)),
1
);
Assert.assertEquals(firstPart + chunkSize * 8L, channel.getBytesAdded());
response = handler.handleChunk(
response,
makeChunk(byteSlice(allBytes, firstPart + chunkSize * 8, chunkSize * 4)),
2
);
response = handler.done(response);
Assert.assertTrue(response.isFinished());
Assert.assertFalse(response.getObj().isExceptionCaught());
// Verify channel.
Assert.assertEquals(allBytes.length, channel.getBytesAdded());
channel.doneWriting();
FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromAdapter(adapter, null, false),
FrameTestUtil.readRowsFromFrameChannel(channel, FrameReader.create(adapter.getRowSignature()))
);
}
private static HttpResponse makeResponse(final HttpResponseStatus status, final byte[] content)
{
final ByteBufferBackedChannelBuffer channelBuffer = new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(content));