Eagerly fetching remote s3 files leading to out of disk (OOD) (#13981)

* Eagerly fetching remote s3 files leading to OOD.
This commit is contained in:
Karan Kumar 2023-04-03 14:10:37 +05:30 committed by GitHub
parent 518698a952
commit 217b0f6832
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 29 deletions

View File

@ -26,7 +26,6 @@ import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -97,18 +96,15 @@ public class DurableStorageInputChannelFactory implements InputChannelFactory
workerNumber, workerNumber,
remotePartitionPath remotePartitionPath
); );
RetryUtils.retry(() -> { if (!storageConnector.pathExists(remotePartitionPath)) {
if (!storageConnector.pathExists(remotePartitionPath)) { throw new ISE(
throw new ISE( "Could not find remote outputs of stage [%d] partition [%d] for worker [%d] at the path [%s]",
"Could not find remote outputs of stage [%d] partition [%d] for worker [%d] at the path [%s]", stageId.getStageNumber(),
stageId.getStageNumber(), partitionNumber,
partitionNumber, workerNumber,
workerNumber, remotePartitionPath
remotePartitionPath );
); }
}
return Boolean.TRUE;
}, (throwable) -> true, 10);
final InputStream inputStream = storageConnector.read(remotePartitionPath); final InputStream inputStream = storageConnector.read(remotePartitionPath);
return ReadableInputStreamFrameChannel.open( return ReadableInputStreamFrameChannel.open(

View File

@ -39,7 +39,6 @@ import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MappedByteBufferHandler; import org.apache.druid.java.util.common.MappedByteBufferHandler;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -142,12 +141,9 @@ public class DurableStorageOutputChannelFactory implements OutputChannelFactory
ArenaMemoryAllocator.createOnHeap(frameSize), ArenaMemoryAllocator.createOnHeap(frameSize),
() -> { () -> {
try { try {
RetryUtils.retry(() -> { if (!storageConnector.pathExists(fileName)) {
if (!storageConnector.pathExists(fileName)) { throw new ISE("File does not exist : %s", fileName);
throw new ISE("File does not exist : %s", fileName); }
}
return Boolean.TRUE;
}, (throwable) -> true, 10);
} }
catch (Exception exception) { catch (Exception exception) {
throw new RuntimeException(exception); throw new RuntimeException(exception);

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import org.apache.commons.io.input.NullInputStream;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
@ -150,6 +151,7 @@ public class S3StorageConnector implements StorageConnector
// build a sequence input stream from chunks // build a sequence input stream from chunks
return new SequenceInputStream(new Enumeration<InputStream>() return new SequenceInputStream(new Enumeration<InputStream>()
{ {
boolean initStream = false;
@Override @Override
public boolean hasMoreElements() public boolean hasMoreElements()
{ {
@ -166,6 +168,12 @@ public class S3StorageConnector implements StorageConnector
@Override @Override
public InputStream nextElement() public InputStream nextElement()
{ {
// since Sequence input stream calls nextElement in the constructor, we start chunking as soon as we call read.
// to avoid that we pass a nullInputStream for the first iteration.
if (!initStream) {
initStream = true;
return new NullInputStream();
}
File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString()); File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
// in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1; long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;

View File

@ -33,7 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
/** /**
* Channel backed by an {@link InputStream}. * Channel backed by an {@link InputStream}.
* * <p>
* Frame channels are expected to be nonblocking, but InputStreams cannot be read in nonblocking fashion. * Frame channels are expected to be nonblocking, but InputStreams cannot be read in nonblocking fashion.
* This implementation deals with that by using an {@link ExecutorService} to read from the stream in a * This implementation deals with that by using an {@link ExecutorService} to read from the stream in a
* separate thread. * separate thread.
@ -56,6 +56,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
@GuardedBy("lock") @GuardedBy("lock")
private boolean inputStreamError = false; private boolean inputStreamError = false;
private boolean isStarted = false;
private volatile boolean keepReading = true; private volatile boolean keepReading = true;
private final Object readMonitor = new Object(); private final Object readMonitor = new Object();
@ -96,20 +98,18 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
boolean framesOnly boolean framesOnly
) )
{ {
final ReadableInputStreamFrameChannel channel = new ReadableInputStreamFrameChannel( return new ReadableInputStreamFrameChannel(
inputStream, inputStream,
ReadableByteChunksFrameChannel.create(id, framesOnly), ReadableByteChunksFrameChannel.create(id, framesOnly),
executorService executorService
); );
channel.startReading();
return channel;
} }
@Override @Override
public boolean isFinished() public boolean isFinished()
{ {
synchronized (lock) { synchronized (lock) {
startReading();
return delegate.isFinished(); return delegate.isFinished();
} }
} }
@ -118,6 +118,7 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
public boolean canRead() public boolean canRead()
{ {
synchronized (lock) { synchronized (lock) {
startReading();
return delegate.canRead(); return delegate.canRead();
} }
} }
@ -126,6 +127,7 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
public Frame read() public Frame read()
{ {
synchronized (lock) { synchronized (lock) {
startReading();
return delegate.read(); return delegate.read();
} }
} }
@ -134,6 +136,7 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
public ListenableFuture<?> readabilityFuture() public ListenableFuture<?> readabilityFuture()
{ {
synchronized (lock) { synchronized (lock) {
startReading();
return delegate.readabilityFuture(); return delegate.readabilityFuture();
} }
} }
@ -150,6 +153,12 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
private void startReading() private void startReading()
{ {
// the task to the executor service is submitted only once.
if (isStarted) {
return;
}
isStarted = true;
executorService.submit(() -> { executorService.submit(() -> {
int nTry = 1; int nTry = 1;
while (true) { while (true) {
@ -168,7 +177,7 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
++nTry; ++nTry;
} }
catch (InterruptedException e) { catch (InterruptedException e) {
// close inputstream anyway if the thread interrups // close input stream anyway if the thread interrupts
IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(inputStream);
throw new ISE(e, Thread.currentThread().getName() + "interrupted"); throw new ISE(e, Thread.currentThread().getName() + "interrupted");
} }
@ -187,6 +196,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
if (bytesRead == -1) { if (bytesRead == -1) {
inputStreamFinished = true; inputStreamFinished = true;
delegate.doneWriting(); delegate.doneWriting();
// eagerly release input stream resources since everything is read.
IOUtils.closeQuietly(inputStream);
break; break;
} else { } else {
ListenableFuture<?> backpressureFuture = delegate.addChunk(Arrays.copyOfRange(buffer, 0, bytesRead)); ListenableFuture<?> backpressureFuture = delegate.addChunk(Arrays.copyOfRange(buffer, 0, bytesRead));
@ -233,8 +244,7 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
private static long nextRetrySleepMillis(final int nTry) private static long nextRetrySleepMillis(final int nTry)
{ {
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2); final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2);
final long sleepMillis = (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1)) return (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1))
* fuzzyMultiplier); * fuzzyMultiplier);
return sleepMillis;
} }
} }