Support retrying for PrefetchableTextFilesFirehoseFactory when prefetch is disabled (#5162)

* Add RetryingInputStream

* unnecessary exception

* fix PrefetchableTextFilesFirehoseFactoryTest

* Fix retrying on connection reset

* fix start offset

* fix checkstyle

* fix check connection reset

* address comments

* fix compile

* address comments

* address comments
This commit is contained in:
Jihoon Son 2018-01-11 01:37:19 +09:00 committed by Roman Leventov
parent 7b8b0a96d6
commit 5d0619f5ce
41 changed files with 1055 additions and 460 deletions

View File

@ -25,6 +25,7 @@ import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@ -67,7 +68,7 @@ public interface FirehoseFactory<T extends InputRowParser>
* @param parser an input row parser
* @param temporaryDirectory a directory where temporary files are stored
*/
default Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException
default Firehose connect(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException
{
return connect(parser);
}

View File

@ -106,8 +106,6 @@ public abstract class AbstractTextFilesFirehoseFactory<T>
* @param object an object to be read
*
* @return an input stream for the object
*
* @throws IOException
*/
protected abstract InputStream openObjectStream(T object) throws IOException;
@ -117,8 +115,7 @@ public abstract class AbstractTextFilesFirehoseFactory<T>
*
* @param object an input object
* @param stream a stream for the object
* @return
* @throws IOException
* @return an wrapped input stream
*/
protected abstract InputStream wrapObjectStream(T object, InputStream stream) throws IOException;
}

View File

@ -19,11 +19,16 @@
package io.druid.data.input.impl.prefetch;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.IOUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
@ -54,6 +59,8 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
private final CacheManager<T> cacheManager;
private final List<T> objects;
private final ExecutorService fetchExecutor;
@Nullable
private final File temporaryDirectory;
// A roughly max size of total fetched objects, but the actual fetched size can be bigger. The reason is our current
@ -80,6 +87,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
private final AtomicLong fetchedBytes = new AtomicLong(0);
private final ObjectOpenFunction<T> openObjectFunction;
private final Predicate<Throwable> retryCondition;
private final byte[] buffer;
private Future<Void> fetchFuture;
@ -94,12 +102,13 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
CacheManager<T> cacheManager,
List<T> objects,
ExecutorService fetchExecutor,
File temporaryDirectory,
@Nullable File temporaryDirectory,
long maxFetchCapacityBytes,
long prefetchTriggerBytes,
long fetchTimeout,
int maxFetchRetry,
ObjectOpenFunction<T> openObjectFunction
ObjectOpenFunction<T> openObjectFunction,
Predicate<Throwable> retryCondition
)
{
this.cacheManager = cacheManager;
@ -111,6 +120,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
this.fetchTimeout = fetchTimeout;
this.maxFetchRetry = maxFetchRetry;
this.openObjectFunction = openObjectFunction;
this.retryCondition = retryCondition;
this.buffer = new byte[BUFFER_SIZE];
this.prefetchEnabled = maxFetchCapacityBytes > 0;
@ -120,6 +130,10 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
this.fetchedFiles.addAll(cacheManager.getFiles());
this.nextFetchIndex = fetchedFiles.size();
if (cacheManager.isEnabled() || prefetchEnabled) {
Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory");
}
if (prefetchEnabled) {
fetchIfNeeded(0L);
}
@ -155,7 +169,7 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
final T object = objects.get(nextFetchIndex);
LOG.info("Fetching [%d]th object[%s], fetchedBytes[%d]", nextFetchIndex, object, fetchedBytes.get());
final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory);
fetchedBytes.addAndGet(download(object, outFile, 0));
fetchedBytes.addAndGet(download(object, outFile));
fetchedFiles.put(new FetchedFile<>(object, outFile, getFileCloser(outFile, fetchedBytes)));
}
}
@ -166,26 +180,27 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
*
* @param object an object to be downloaded
* @param outFile a file which the object data is stored
* @param tryCount current retry count
*
* @return number of downloaded bytes
*/
private long download(T object, File outFile, int tryCount) throws IOException
private long download(T object, File outFile) throws IOException
{
try (final InputStream is = openObjectFunction.open(object);
final OutputStream os = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(is, os, buffer);
try {
return RetryUtils.retry(
() -> {
try (final InputStream is = openObjectFunction.open(object);
final OutputStream os = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(is, os, buffer);
}
},
retryCondition,
outFile::delete,
maxFetchRetry + 1,
StringUtils.format("Failed to download object[%s]", object)
);
}
catch (IOException e) {
final int nextTry = tryCount + 1;
if (!Thread.currentThread().isInterrupted() && nextTry < maxFetchRetry) {
LOG.error(e, "Failed to download object[%s], retrying (%d of %d)", object, nextTry, maxFetchRetry);
outFile.delete();
return download(object, outFile, nextTry);
} else {
LOG.error(e, "Failed to download object[%s], retries exhausted, aborting", object);
throw e;
}
catch (Exception e) {
throw new IOException(e);
}
}
@ -289,7 +304,11 @@ public class Fetcher<T> implements Iterator<OpenedObject<T>>
final T object = objects.get(nextFetchIndex);
LOG.info("Reading [%d]th object[%s]", nextFetchIndex, object);
nextFetchIndex++;
return new OpenedObject<>(object, openObjectFunction.open(object), getNoopCloser());
return new OpenedObject<>(
object,
new RetryingInputStream<>(object, openObjectFunction, retryCondition, maxFetchRetry),
getNoopCloser()
);
}
}

View File

@ -25,4 +25,6 @@ import java.io.InputStream;
interface ObjectOpenFunction<T>
{
InputStream open(T object) throws IOException;
InputStream open(T object, long start) throws IOException;
}

View File

@ -22,6 +22,7 @@ package io.druid.data.input.impl.prefetch;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Firehose;
import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
@ -32,6 +33,7 @@ import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.LineIterator;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@ -159,22 +161,25 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
}
@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException
{
if (!cacheManager.isEnabled() && maxFetchCapacityBytes == 0) {
return super.connect(firehoseParser, temporaryDirectory);
}
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));
}
Preconditions.checkState(temporaryDirectory.exists(), "temporaryDirectory[%s] does not exist", temporaryDirectory);
Preconditions.checkState(
temporaryDirectory.isDirectory(),
"temporaryDirectory[%s] is not a directory",
temporaryDirectory
);
if (cacheManager.isEnabled() || maxFetchCapacityBytes > 0) {
Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory");
Preconditions.checkArgument(
temporaryDirectory.exists(),
"temporaryDirectory[%s] does not exist",
temporaryDirectory
);
Preconditions.checkArgument(
temporaryDirectory.isDirectory(),
"temporaryDirectory[%s] is not a directory",
temporaryDirectory
);
}
LOG.info("Create a new firehose for [%d] objects", objects.size());
@ -189,7 +194,21 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
prefetchTriggerBytes,
fetchTimeout,
maxFetchRetry,
this::openObjectStream
new ObjectOpenFunction<T>()
{
@Override
public InputStream open(T object) throws IOException
{
return openObjectStream(object);
}
@Override
public InputStream open(T object, long start) throws IOException
{
return openObjectStream(object, start);
}
},
getRetryCondition()
);
return new FileIteratingFirehose(
@ -240,6 +259,23 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
);
}
/**
* Returns a predicate describing retry conditions. {@link Fetcher} and {@link RetryingInputStream} will retry on the
* errors satisfying this condition.
*/
protected abstract Predicate<Throwable> getRetryCondition();
/**
* Open an input stream from the given object. If the object is compressed, this method should return a byte stream
* as it is compressed. The object compression should be handled in {@link #wrapObjectStream(Object, InputStream)}.
*
* @param object an object to be read
* @param start start offset
*
* @return an input stream for the object
*/
protected abstract InputStream openObjectStream(T object, long start) throws IOException;
/**
* This class calls the {@link Closeable#close()} method of the resourceCloser when it is closed.
*/

View File

@ -0,0 +1,184 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.io.CountingInputStream;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
/**
* This class is used by {@link Fetcher} when prefetch is disabled. It's responsible for re-opening the underlying input
* stream for the input object on the socket connection reset as well as the given {@link #retryCondition}.
*
* @param <T> object type
*/
class RetryingInputStream<T> extends InputStream
{
private static final Logger log = new Logger(RetryingInputStream.class);
private final T object;
private final ObjectOpenFunction<T> objectOpenFunction;
private final Predicate<Throwable> retryCondition;
private final int maxRetry;
private CountingInputStream delegate;
private long startOffset;
RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
Predicate<Throwable> retryCondition,
int maxRetry
) throws IOException
{
this.object = object;
this.objectOpenFunction = objectOpenFunction;
this.retryCondition = retryCondition;
this.maxRetry = maxRetry;
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
}
private boolean isConnectionReset(Throwable t)
{
return (t instanceof SocketException && (t.getMessage() != null && t.getMessage().contains("Connection reset"))) ||
(t.getCause() != null && isConnectionReset(t.getCause()));
}
private void waitOrThrow(Throwable t, int nTry) throws IOException
{
final boolean isConnectionReset = isConnectionReset(t);
if (isConnectionReset || retryCondition.apply(t)) {
if (isConnectionReset) {
// Re-open the input stream on connection reset
startOffset += delegate.getCount();
try {
delegate.close();
}
catch (IOException e) {
// ignore this exception
log.warn(e, "Error while closing the delegate input stream");
}
}
try {
// Wait for the next try
RetryUtils.awaitNextRetry(t, null, nTry + 1, maxRetry, false);
if (isConnectionReset) {
log.info("retrying from offset[%d]", startOffset);
delegate = new CountingInputStream(objectOpenFunction.open(object, startOffset));
}
}
catch (InterruptedException | IOException e) {
t.addSuppressed(e);
throwAsIOException(t);
}
} else {
throwAsIOException(t);
}
}
private static void throwAsIOException(Throwable t) throws IOException
{
Throwables.propagateIfInstanceOf(t, IOException.class);
throw new IOException(t);
}
@Override
public int read() throws IOException
{
for (int nTry = 0; nTry < maxRetry; nTry++) {
try {
return delegate.read();
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
return delegate.read();
}
@Override
public int read(byte b[]) throws IOException
{
for (int nTry = 0; nTry < maxRetry; nTry++) {
try {
return delegate.read(b);
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
return delegate.read(b);
}
@Override
public int read(byte b[], int off, int len) throws IOException
{
for (int nTry = 0; nTry < maxRetry; nTry++) {
try {
return delegate.read(b, off, len);
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
return delegate.read(b, off, len);
}
@Override
public long skip(long n) throws IOException
{
for (int nTry = 0; nTry < maxRetry; nTry++) {
try {
return delegate.skip(n);
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
return delegate.skip(n);
}
@Override
public int available() throws IOException
{
for (int nTry = 0; nTry < maxRetry; nTry++) {
try {
return delegate.available();
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
return delegate.available();
}
@Override
public void close() throws IOException
{
delegate.close();
}
}

View File

@ -21,6 +21,7 @@ package io.druid.data.input.impl.prefetch;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.io.CountingOutputStream;
import io.druid.data.input.Firehose;
@ -30,6 +31,7 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
@ -47,6 +49,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
@ -187,6 +190,25 @@ public class PrefetchableTextFilesFirehoseFactoryTest
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
}
@Test
public void testWithoutCacheAndFetchAgainstConnectionReset() throws IOException
{
final TestPrefetchableTextFilesFirehoseFactory factory =
TestPrefetchableTextFilesFirehoseFactory.withConnectionResets(TEST_DIR, 0, 0, 2);
final List<Row> rows = new ArrayList<>();
final File firehoseTmpDir = createFirehoseTmpDir("testWithoutCacheAndFetch");
try (Firehose firehose = factory.connect(parser, firehoseTmpDir)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(0, factory.getCacheManager().getTotalCachedBytes());
assertResult(rows);
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
}
@Test
public void testWithoutCache() throws IOException
{
@ -377,10 +399,10 @@ public class PrefetchableTextFilesFirehoseFactoryTest
static class TestPrefetchableTextFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<File>
{
private static final long defaultTimeout = 1000;
private final long sleepMillis;
private final File baseDir;
private int openExceptionCount;
private int numOpenExceptions;
private int maxConnectionResets;
static TestPrefetchableTextFilesFirehoseFactory with(File baseDir, long cacheCapacity, long fetchCapacity)
{
@ -389,9 +411,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest
1024,
cacheCapacity,
fetchCapacity,
defaultTimeout,
3,
0,
0,
0
);
}
@ -403,9 +425,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest
1024,
2048,
2048,
defaultTimeout,
3,
0,
0,
0
);
}
@ -417,9 +439,28 @@ public class PrefetchableTextFilesFirehoseFactoryTest
1024,
2048,
2048,
defaultTimeout,
3,
count,
0,
0
);
}
static TestPrefetchableTextFilesFirehoseFactory withConnectionResets(
File baseDir,
long cacheCapacity,
long fetchCapacity,
int numConnectionResets
)
{
return new TestPrefetchableTextFilesFirehoseFactory(
baseDir,
fetchCapacity / 2,
cacheCapacity,
fetchCapacity,
3,
0,
numConnectionResets,
0
);
}
@ -434,18 +475,54 @@ public class PrefetchableTextFilesFirehoseFactoryTest
100,
3,
0,
0,
ms
);
}
public TestPrefetchableTextFilesFirehoseFactory(
private static long computeTimeout(int maxRetry)
{
// See RetryUtils.nextRetrySleepMillis()
final double maxFuzzyMultiplier = 2.;
return (long) Math.min(
RetryUtils.MAX_SLEEP_MILLIS,
RetryUtils.BASE_SLEEP_MILLIS * Math.pow(2, maxRetry - 1) * maxFuzzyMultiplier
);
}
TestPrefetchableTextFilesFirehoseFactory(
File baseDir,
long prefetchTriggerThreshold,
long maxCacheCapacityBytes,
long maxFetchCapacityBytes,
int maxRetry,
int numOpenExceptions,
int numConnectionResets,
long sleepMillis
)
{
this(
baseDir,
prefetchTriggerThreshold,
maxCacheCapacityBytes,
maxFetchCapacityBytes,
computeTimeout(maxRetry),
maxRetry,
numOpenExceptions,
numConnectionResets,
sleepMillis
);
}
TestPrefetchableTextFilesFirehoseFactory(
File baseDir,
long prefetchTriggerThreshold,
long maxCacheCapacityBytes,
long maxFetchCapacityBytes,
long timeout,
int maxRetry,
int openExceptionCount,
int numOpenExceptions,
int maxConnectionResets,
long sleepMillis
)
{
@ -456,7 +533,8 @@ public class PrefetchableTextFilesFirehoseFactoryTest
timeout,
maxRetry
);
this.openExceptionCount = openExceptionCount;
this.numOpenExceptions = numOpenExceptions;
this.maxConnectionResets = maxConnectionResets;
this.sleepMillis = sleepMillis;
this.baseDir = baseDir;
}
@ -474,8 +552,8 @@ public class PrefetchableTextFilesFirehoseFactoryTest
@Override
protected InputStream openObjectStream(File object) throws IOException
{
if (openExceptionCount > 0) {
openExceptionCount--;
if (numOpenExceptions > 0) {
numOpenExceptions--;
throw new IOException("Exception for retry test");
}
if (sleepMillis > 0) {
@ -486,7 +564,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest
throw new RuntimeException(e);
}
}
return FileUtils.openInputStream(object);
return maxConnectionResets > 0 ?
new TestInputStream(FileUtils.openInputStream(object), maxConnectionResets) :
FileUtils.openInputStream(object);
}
@Override
@ -494,5 +574,76 @@ public class PrefetchableTextFilesFirehoseFactoryTest
{
return stream;
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return e -> e instanceof IOException;
}
@Override
protected InputStream openObjectStream(File object, long start) throws IOException
{
if (numOpenExceptions > 0) {
numOpenExceptions--;
throw new IOException("Exception for retry test");
}
if (sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final InputStream in = FileUtils.openInputStream(object);
in.skip(start);
return maxConnectionResets > 0 ? new TestInputStream(in, maxConnectionResets) : in;
}
private int readCount;
private int numConnectionResets;
private class TestInputStream extends InputStream
{
private static final int NUM_READ_COUNTS_BEFORE_ERROR = 10;
private final InputStream delegate;
private final int maxConnectionResets;
TestInputStream(
InputStream delegate,
int maxConnectionResets
)
{
this.delegate = delegate;
this.maxConnectionResets = maxConnectionResets;
}
@Override
public int read() throws IOException
{
if (readCount++ % NUM_READ_COUNTS_BEFORE_ERROR == 0) {
if (numConnectionResets++ < maxConnectionResets) {
// Simulate connection reset
throw new SocketException("Test Connection reset");
}
}
return delegate.read();
}
@Override
public int read(byte b[], int off, int len) throws IOException
{
if (readCount++ % NUM_READ_COUNTS_BEFORE_ERROR == 0) {
if (numConnectionResets++ < maxConnectionResets) {
// Simulate connection reset
throw new SocketException("Test Connection reset");
}
}
return delegate.read(b, off, len);
}
}
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.impl.prefetch;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class RetryingInputStreamTest
{
private static final int MAX_RETRY = 5;
private static final int MAX_ERROR = 4;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File testFile;
private DataInputStream inputStream;
@Before
public void setup() throws IOException
{
testFile = temporaryFolder.newFile();
try (FileOutputStream fis = new FileOutputStream(testFile);
GZIPOutputStream gis = new GZIPOutputStream(fis);
DataOutputStream dis = new DataOutputStream(gis)) {
for (int i = 0; i < 10000; i++) {
dis.writeInt(i);
}
}
throwError = false;
final InputStream retryingInputStream = new RetryingInputStream<>(
testFile,
new ObjectOpenFunction<File>()
{
@Override
public InputStream open(File object) throws IOException
{
return new TestInputStream(new FileInputStream(object));
}
@Override
public InputStream open(File object, long start) throws IOException
{
final FileInputStream fis = new FileInputStream(object);
Preconditions.checkState(fis.skip(start) == start);
return new TestInputStream(fis);
}
},
e -> e instanceof IOException,
MAX_RETRY
);
inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream));
throwError = true;
}
@After
public void teardown() throws IOException
{
inputStream.close();
FileUtils.forceDelete(testFile);
}
@Test
public void testReadRetry() throws IOException
{
for (int i = 0; i < 10000; i++) {
Assert.assertEquals(i, inputStream.readInt());
}
}
private boolean throwError = true;
private int errorCount = 0;
private class TestInputStream extends InputStream
{
private final InputStream delegate;
TestInputStream(InputStream delegate)
{
this.delegate = delegate;
}
@Override
public int read() throws IOException
{
return delegate.read();
}
@Override
public int read(byte b[], int off, int len) throws IOException
{
if (throwError) {
throwError = false;
errorCount++;
if (errorCount % 2 == 0) {
throw new IOException("test retry");
} else {
delegate.close();
throw new SocketException("Test Connection reset");
}
} else {
throwError = errorCount < MAX_ERROR;
return delegate.read(b, off, len);
}
}
}
}

View File

@ -22,10 +22,13 @@ package io.druid.firehose.azure;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.storage.azure.AzureByteSource;
import io.druid.storage.azure.AzureStorage;
import io.druid.storage.azure.AzureUtils;
import java.io.IOException;
import java.io.InputStream;
@ -75,6 +78,16 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
return makeByteSource(azureStorage, object).openStream();
}
@Override
protected InputStream openObjectStream(AzureBlob object, long start) throws IOException
{
// BlobInputStream.skip() moves the next read offset instead of skipping first 'start' bytes.
final InputStream in = openObjectStream(object);
final long skip = in.skip(start);
Preconditions.checkState(skip == start, "start offset was [%s] but [%s] bytes were skipped", start, skip);
return in;
}
@Override
protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException
{
@ -124,4 +137,10 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
getMaxFetchRetry()
);
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return AzureUtils.AZURE_RETRY;
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
@ -40,7 +39,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class AzureDataSegmentPusher implements DataSegmentPusher
{
@ -149,14 +147,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final Map<String, String> azurePaths = getAzurePaths(segment);
return AzureUtils.retryAzureOperation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting);
}
},
() -> uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting),
config.getMaxTries()
);
}

View File

@ -33,7 +33,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
public class AzureTaskLogs implements TaskLogs
{
@ -58,7 +57,7 @@ public class AzureTaskLogs implements TaskLogs
try {
AzureUtils.retryAzureOperation(
(Callable<Void>) () -> {
() -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true);
return null;
},

View File

@ -21,12 +21,11 @@ package io.druid.storage.azure;
import com.google.common.base.Predicate;
import com.microsoft.azure.storage.StorageException;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.RetryUtils.Task;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
public class AzureUtils
{
@ -53,7 +52,7 @@ public class AzureUtils
}
};
public static <T> T retryAzureOperation(Callable<T> f, int maxTries) throws Exception
public static <T> T retryAzureOperation(Task<T> f, int maxTries) throws Exception
{
return RetryUtils.retry(f, AZURE_RETRY, maxTries);
}

View File

@ -35,7 +35,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
/**
* Cassandra Segment Puller
@ -77,20 +76,15 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data
final FileUtils.FileCopyResult localResult;
try {
localResult = RetryUtils.retry(
new Callable<FileUtils.FileCopyResult>()
{
@Override
public FileUtils.FileCopyResult call() throws Exception
{
try (OutputStream os = new FileOutputStream(tmpFile)) {
ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
}
return new FileUtils.FileCopyResult(tmpFile);
() -> {
try (OutputStream os = new FileOutputStream(tmpFile)) {
ChunkedStorage
.newReader(indexStorage, key, os)
.withBatchSize(BATCH_SIZE)
.withConcurrencyLevel(CONCURRENCY)
.call();
}
return new FileUtils.FileCopyResult(tmpFile);
},
Predicates.<Throwable>alwaysTrue(),
10

View File

@ -22,11 +22,13 @@ package io.druid.firehose.cloudfiles;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.storage.cloudfiles.CloudFilesByteSource;
import io.druid.storage.cloudfiles.CloudFilesObjectApiProxy;
import io.druid.storage.cloudfiles.CloudFilesUtils;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import java.io.IOException;
@ -72,6 +74,17 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
@Override
protected InputStream openObjectStream(CloudFilesBlob object) throws IOException
{
return openObjectStream(object, 0);
}
@Override
protected InputStream openObjectStream(CloudFilesBlob object, long start) throws IOException
{
return createCloudFilesByteSource(object).openStream(start);
}
private CloudFilesByteSource createCloudFilesByteSource(CloudFilesBlob object)
{
final String region = object.getRegion();
final String container = object.getContainer();
@ -82,9 +95,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
);
CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(
cloudFilesApi, region, container);
final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path);
return byteSource.openStream();
return new CloudFilesByteSource(objectApi, path);
}
@Override
@ -125,4 +136,10 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
getMaxFetchRetry()
);
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return CloudFilesUtils.CLOUDFILESRETRY;
}
}

View File

@ -51,7 +51,12 @@ public class CloudFilesByteSource extends ByteSource
@Override
public InputStream openStream() throws IOException
{
payload = (payload == null) ? objectApi.get(path).getPayload() : payload;
return openStream(0);
}
public InputStream openStream(long start) throws IOException
{
payload = (payload == null) ? objectApi.get(path, start).getPayload() : payload;
try {
return payload.openStream();

View File

@ -35,7 +35,6 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.Callable;
public class CloudFilesDataSegmentPusher implements DataSegmentPusher
{
@ -90,41 +89,37 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath);
return CloudFilesUtils.retryCloudFilesOperation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
CloudFilesObject segmentData = new CloudFilesObject(
segmentPath, outFile, objectApi.getRegion(),
objectApi.getContainer()
() -> {
CloudFilesObject segmentData = new CloudFilesObject(
segmentPath, outFile, objectApi.getRegion(),
objectApi.getContainer()
);
if (!replaceExisting && objectApi.exists(segmentData.getPath())) {
log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath());
} else {
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
if (!replaceExisting && objectApi.exists(segmentData.getPath())) {
log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath());
} else {
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
}
final DataSegment outSegment = inSegment
.withSize(indexSize)
.withLoadSpec(makeLoadSpec(new URI(segmentData.getPath())))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
return outSegment;
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
}
}, this.config.getOperationMaxRetries()
final DataSegment outSegment = inSegment
.withSize(indexSize)
.withLoadSpec(makeLoadSpec(new URI(segmentData.getPath())))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
return outSegment;
},
this.config.getOperationMaxRetries()
);
}
catch (Exception e) {

View File

@ -19,6 +19,7 @@
package io.druid.storage.cloudfiles;
import org.jclouds.http.options.GetOptions;
import org.jclouds.io.Payload;
import org.jclouds.openstack.swift.v1.domain.SwiftObject;
import org.jclouds.openstack.swift.v1.features.ObjectApi;
@ -52,9 +53,14 @@ public class CloudFilesObjectApiProxy
return objectApi.put(cloudFilesObject.getPath(), cloudFilesObject.getPayload());
}
public CloudFilesObject get(String path)
public CloudFilesObject get(String path, long start)
{
SwiftObject swiftObject = objectApi.get(path);
final SwiftObject swiftObject;
if (start == 0) {
swiftObject = objectApi.get(path);
} else {
swiftObject = objectApi.get(path, new GetOptions().startAt(start));
}
Payload payload = swiftObject.getPayload();
return new CloudFilesObject(payload, this.region, this.container, path);
}

View File

@ -20,11 +20,10 @@
package io.druid.storage.cloudfiles;
import com.google.common.base.Predicate;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.RetryUtils.Task;
import java.io.IOException;
import java.util.concurrent.Callable;
/**
*
@ -50,7 +49,7 @@ public class CloudFilesUtils
/**
* Retries CloudFiles operations that fail due to io-related exceptions.
*/
public static <T> T retryCloudFilesOperation(Callable<T> f, final int maxTries) throws Exception
public static <T> T retryCloudFilesOperation(Task<T> f, final int maxTries) throws Exception
{
return RetryUtils.retry(f, CLOUDFILESRETRY, maxTries);
}

View File

@ -42,7 +42,7 @@ public class CloudFilesByteSourceTest extends EasyMockSupport
Payload payload = createMock(Payload.class);
InputStream stream = createMock(InputStream.class);
expect(objectApi.get(path)).andReturn(cloudFilesObject);
expect(objectApi.get(path, 0)).andReturn(cloudFilesObject);
expect(cloudFilesObject.getPayload()).andReturn(payload);
expect(payload.openStream()).andReturn(stream);
payload.close();
@ -66,7 +66,7 @@ public class CloudFilesByteSourceTest extends EasyMockSupport
Payload payload = createMock(Payload.class);
InputStream stream = createMock(InputStream.class);
expect(objectApi.get(path)).andReturn(cloudFilesObject);
expect(objectApi.get(path, 0)).andReturn(cloudFilesObject);
expect(cloudFilesObject.getPayload()).andReturn(payload);
expect(payload.openStream()).andThrow(new IOException()).andReturn(stream);
payload.close();

View File

@ -51,7 +51,7 @@ public class CloudFilesObjectApiProxyTest extends EasyMockSupport
replayAll();
CloudFilesObjectApiProxy cfoApiProxy = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
CloudFilesObject cloudFilesObject = cfoApiProxy.get(path);
CloudFilesObject cloudFilesObject = cfoApiProxy.get(path, 0);
assertEquals(cloudFilesObject.getPayload(), payload);
assertEquals(cloudFilesObject.getRegion(), region);

View File

@ -22,10 +22,12 @@ package io.druid.firehose.google;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.storage.google.GoogleByteSource;
import io.druid.storage.google.GoogleStorage;
import io.druid.storage.google.GoogleUtils;
import java.io.IOException;
import java.io.InputStream;
@ -68,13 +70,24 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
@Override
protected InputStream openObjectStream(GoogleBlob object) throws IOException
{
return openObjectStream(object, 0);
}
@Override
protected InputStream openObjectStream(GoogleBlob object, long start) throws IOException
{
return createGoogleByteSource(object).openStream(start);
}
private GoogleByteSource createGoogleByteSource(GoogleBlob object)
{
final String bucket = object.getBucket();
final String path = object.getPath().startsWith("/")
? object.getPath().substring(1)
: object.getPath();
return new GoogleByteSource(storage, bucket, path).openStream();
return new GoogleByteSource(storage, bucket, path);
}
@Override
@ -115,5 +128,11 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
getMaxFetchRetry()
);
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return GoogleUtils.GOOGLE_RETRY;
}
}

View File

@ -42,4 +42,9 @@ public class GoogleByteSource extends ByteSource
{
return storage.get(bucket, path);
}
public InputStream openStream(long start) throws IOException
{
return storage.get(bucket, path, start);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.storage.google;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Get;
import java.io.IOException;
import java.io.InputStream;
@ -44,9 +45,17 @@ public class GoogleStorage
public InputStream get(final String bucket, final String path) throws IOException
{
Storage.Objects.Get getObject = storage.objects().get(bucket, path);
getObject.getMediaHttpDownloader().setDirectDownloadEnabled(false);
return getObject.executeMediaAsInputStream();
return get(bucket, path, 0);
}
public InputStream get(final String bucket, final String path, long start) throws IOException
{
final Get get = storage.objects().get(bucket, path);
if (start > 0) {
get.getMediaHttpDownloader().setBytesDownloaded(start);
}
get.getMediaHttpDownloader().setDirectDownloadEnabled(false);
return get.executeMediaAsInputStream();
}
public void delete(final String bucket, final String path) throws IOException

View File

@ -48,7 +48,6 @@ import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.util.concurrent.Callable;
/**
*/
@ -183,40 +182,34 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller
try {
return RetryUtils.retry(
new Callable<FileUtils.FileCopyResult>()
{
@Override
public FileUtils.FileCopyResult call() throws Exception
{
if (!fs.exists(path)) {
throw new SegmentLoadingException("No files found at [%s]", path.toString());
}
final RemoteIterator<LocatedFileStatus> children = fs.listFiles(path, false);
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
while (children.hasNext()) {
final LocatedFileStatus child = children.next();
final Path childPath = child.getPath();
final String fname = childPath.getName();
if (fs.isDirectory(childPath)) {
log.warn("[%s] is a child directory, skipping", childPath.toString());
} else {
final File outFile = new File(outDir, fname);
// Actual copy
fs.copyToLocalFile(childPath, new Path(outFile.toURI()));
result.addFile(outFile);
}
}
log.info(
"Copied %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outDir.getAbsolutePath()
);
return result;
() -> {
if (!fs.exists(path)) {
throw new SegmentLoadingException("No files found at [%s]", path.toString());
}
final RemoteIterator<LocatedFileStatus> children = fs.listFiles(path, false);
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
while (children.hasNext()) {
final LocatedFileStatus child = children.next();
final Path childPath = child.getPath();
final String fname = childPath.getName();
if (fs.isDirectory(childPath)) {
log.warn("[%s] is a child directory, skipping", childPath.toString());
} else {
final File outFile = new File(outDir, fname);
// Actual copy
fs.copyToLocalFile(childPath, new Path(outFile.toURI()));
result.addFile(outFile);
}
}
log.info(
"Copied %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outDir.getAbsolutePath()
);
return result;
},
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT

View File

@ -21,10 +21,8 @@ package io.druid.storage.hdfs;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.java.util.common.RetryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -34,7 +32,6 @@ import org.apache.hadoop.fs.PathFilter;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
/**
@ -89,17 +86,12 @@ public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implem
final Path path = new Path(uri);
try {
return RetryUtils.retry(
new Callable<URI>()
{
@Override
public URI call() throws Exception
{
final FileSystem fs = path.getFileSystem(config);
if (!fs.exists(path)) {
return null;
}
return mostRecentInDir(fs.isDirectory(path) ? path : path.getParent(), pattern);
() -> {
final FileSystem fs = path.getFileSystem(config);
if (!fs.exists(path)) {
return null;
}
return mostRecentInDir(fs.isDirectory(path) ? path : path.getParent(), pattern);
},
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT

View File

@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.storage.s3.S3Utils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
@ -188,6 +190,28 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
}
@Override
protected InputStream openObjectStream(S3Object object, long start) throws IOException
{
try {
final S3Object result = s3Client.getObject(
object.getBucketName(),
object.getKey(),
null,
null,
null,
null,
start,
null
);
return result.getDataInputStream();
}
catch (ServiceException e) {
throw new IOException(e);
}
}
@Override
protected InputStream wrapObjectStream(S3Object object, InputStream stream) throws IOException
{
@ -228,4 +252,10 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
getMaxFetchRetry()
);
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return S3Utils.S3RETRY;
}
}

View File

@ -41,7 +41,6 @@ import org.jets3t.service.model.S3Object;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
public class S3DataSegmentMover implements DataSegmentMover
{
@ -118,7 +117,7 @@ public class S3DataSegmentMover implements DataSegmentMover
{
try {
S3Utils.retryS3Operation(
(Callable<Void>) () -> {
() -> {
final String copyMsg = StringUtils.format(
"[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
@ -228,7 +227,7 @@ public class S3DataSegmentMover implements DataSegmentMover
private void deleteWithRetries(final String s3Bucket, final String s3Path) throws Exception
{
RetryUtils.retry(
(Callable<Void>) () -> {
() -> {
try {
s3Client.deleteObject(s3Bucket, s3Path);
return null;

View File

@ -52,7 +52,6 @@ import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* A data segment puller that also hanldes URI data pulls.
@ -310,14 +309,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
try {
return S3Utils.retryS3Operation(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path);
}
}
() -> S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path)
);
}
catch (S3ServiceException | IOException e) {

View File

@ -41,7 +41,6 @@ import java.net.URI;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
{
@ -100,39 +99,34 @@ public class S3DataSegmentPusher implements DataSegmentPusher
try {
return S3Utils.retryS3Operation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
S3Object toPush = new S3Object(zipOutFile);
putObject(config.getBucket(), s3Path, toPush, replaceExisting);
() -> {
S3Object toPush = new S3Object(zipOutFile);
putObject(config.getBucket(), s3Path, toPush, replaceExisting);
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
S3Object descriptorObject = new S3Object(descriptorFile);
File descriptorFile = File.createTempFile("druid", "descriptor.json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
S3Object descriptorObject = new S3Object(descriptorFile);
putObject(
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorObject,
replaceExisting
);
putObject(
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorObject,
replaceExisting
);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return outSegment;
}
return outSegment;
}
);
}

View File

@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
@ -36,7 +35,6 @@ import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
/**
* Provides task logs archived on S3.
@ -118,16 +116,11 @@ public class S3TaskLogs implements TaskLogs
try {
S3Utils.retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(config.getS3Bucket(), object);
return null;
}
() -> {
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(config.getS3Bucket(), object);
return null;
}
);
}

View File

@ -21,17 +21,14 @@ package io.druid.storage.s3;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
/**
@ -64,35 +61,30 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
{
try {
return RetryUtils.retry(
new Callable<URI>()
{
@Override
public URI call() throws Exception
{
final S3Coords coords = new S3Coords(checkURI(uri));
long mostRecent = Long.MIN_VALUE;
URI latest = null;
S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null);
if (objects == null) {
return null;
}
for (S3Object storageObject : objects) {
storageObject.closeDataInputStream();
String keyString = storageObject.getKey().substring(coords.path.length());
if (keyString.startsWith("/")) {
keyString = keyString.substring(1);
}
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = storageObject.getLastModifiedDate().getTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey()));
}
}
return latest;
() -> {
final S3Coords coords = new S3Coords(checkURI(uri));
long mostRecent = Long.MIN_VALUE;
URI latest = null;
S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null);
if (objects == null) {
return null;
}
for (S3Object storageObject : objects) {
storageObject.closeDataInputStream();
String keyString = storageObject.getKey().substring(coords.path.length());
if (keyString.startsWith("/")) {
keyString = keyString.substring(1);
}
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = storageObject.getLastModifiedDate().getTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey()));
}
}
return latest;
},
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT

View File

@ -22,8 +22,8 @@ package io.druid.storage.s3;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.RetryUtils.Task;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -32,7 +32,6 @@ import org.jets3t.service.model.StorageObject;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;
/**
*
@ -83,7 +82,7 @@ public class S3Utils
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws Exception
public static <T> T retryS3Operation(Task<T> f) throws Exception
{
final int maxTries = 10;
return RetryUtils.retry(f, S3RETRY, maxTries);
@ -147,15 +146,7 @@ public class S3Utils
{
try {
return retryS3Operation(
new Callable<StorageObjectsChunk>()
{
@Override
public StorageObjectsChunk call() throws Exception
{
return s3Client.listObjectsChunked(
bucket, prefix, null, maxListingLength, priorLastKey);
}
}
() -> s3Client.listObjectsChunked(bucket, prefix, null, maxListingLength, priorLastKey)
);
}
catch (Exception e) {

View File

@ -64,7 +64,6 @@ import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@ -160,18 +159,13 @@ public class JobHelper
if (jarFile.getName().endsWith(".jar")) {
try {
RetryUtils.retry(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
if (isSnapshot(jarFile)) {
addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
} else {
addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
}
return true;
() -> {
if (isSnapshot(jarFile)) {
addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
} else {
addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
}
return true;
},
shouldRetryPredicate(),
NUM_RETRIES
@ -607,50 +601,45 @@ public class JobHelper
{
try {
return RetryUtils.retry(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
final boolean needRename;
() -> {
final boolean needRename;
if (outputFS.exists(finalIndexZipFilePath)) {
// NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
if (outputFS.exists(finalIndexZipFilePath)) {
// NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
|| zipFile.getLen() != finalIndexZipFile.getLen()) {
log.info(
"File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
finalIndexZipFile.getPath(),
DateTimes.utc(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen(),
zipFile.getPath(),
DateTimes.utc(zipFile.getModificationTime()),
zipFile.getLen()
);
outputFS.delete(finalIndexZipFilePath, false);
needRename = true;
} else {
log.info(
"File[%s / %s / %sB] existed and will be kept",
finalIndexZipFile.getPath(),
DateTimes.utc(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen()
);
needRename = false;
}
} else {
if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
|| zipFile.getLen() != finalIndexZipFile.getLen()) {
log.info(
"File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
finalIndexZipFile.getPath(),
DateTimes.utc(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen(),
zipFile.getPath(),
DateTimes.utc(zipFile.getModificationTime()),
zipFile.getLen()
);
outputFS.delete(finalIndexZipFilePath, false);
needRename = true;
}
if (needRename) {
log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
} else {
return true;
log.info(
"File[%s / %s / %sB] existed and will be kept",
finalIndexZipFile.getPath(),
DateTimes.utc(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen()
);
needRename = false;
}
} else {
needRename = true;
}
if (needRename) {
log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
} else {
return true;
}
},
FileUtils.IS_EXCEPTION,
@ -821,14 +810,7 @@ public class JobHelper
{
try {
return RetryUtils.retry(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return fs.delete(path, recursive);
}
},
() -> fs.delete(path, recursive),
shouldRetryPredicate(),
NUM_RETRIES
);

View File

@ -80,33 +80,28 @@ public class OverlordResourceTestClient
{
try {
return RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "task"))
.setContent(
"application/json",
StringUtils.toUtf8(task)
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting task to indexer response [%s %s]",
response.getStatus(),
response.getContent()
);
}
Map<String, String> responseData = jsonMapper.readValue(
response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
() -> {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "task"))
.setContent(
"application/json",
StringUtils.toUtf8(task)
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting task to indexer response [%s %s]",
response.getStatus(),
response.getContent()
);
String taskID = responseData.get("task");
LOG.info("Submitted task with TaskID[%s]", taskID);
return taskID;
}
Map<String, String> responseData = jsonMapper.readValue(
response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
);
String taskID = responseData.get("task");
LOG.info("Submitted task with TaskID[%s]", taskID);
return taskID;
},
Predicates.<Throwable>alwaysTrue(),
5

View File

@ -36,7 +36,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
@ -156,14 +155,7 @@ public class CompressionUtils
if (!cacheLocally) {
try {
return RetryUtils.retry(
new Callable<FileUtils.FileCopyResult>()
{
@Override
public FileUtils.FileCopyResult call() throws Exception
{
return unzip(byteSource.openStream(), outDir);
}
},
() -> unzip(byteSource.openStream(), outDir),
shouldRetry,
DEFAULT_RETRY_COUNT
);

View File

@ -24,12 +24,31 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import io.druid.java.util.common.logger.Logger;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import java.util.concurrent.ThreadLocalRandom;
public class RetryUtils
{
public static final Logger log = new Logger(RetryUtils.class);
public static final long MAX_SLEEP_MILLIS = 60000;
public static final long BASE_SLEEP_MILLIS = 1000;
public interface Task<T>
{
/**
* This method is tried up to maxTries times unless it succeeds.
*/
T perform() throws Exception;
}
public interface CleanupAfterFailure
{
/**
* This is called once {@link Task#perform()} fails. Retrying is stopped once this method throws an exception,
* so errors inside this method should be ignored if you don't want to stop retrying.
*/
void cleanup() throws Exception;
}
/**
* Retry an operation using fuzzy exponentially increasing backoff. The wait time after the nth failed attempt is
@ -49,22 +68,29 @@ public class RetryUtils
* @throws Exception if maxTries is exhausted, or shouldRetry returns false
*/
public static <T> T retry(
final Callable<T> f,
Predicate<Throwable> shouldRetry,
final Task<T> f,
final Predicate<Throwable> shouldRetry,
final int quietTries,
final int maxTries
final int maxTries,
@Nullable final CleanupAfterFailure cleanupAfterFailure,
@Nullable final String messageOnRetry
) throws Exception
{
Preconditions.checkArgument(maxTries > 0, "maxTries > 0");
Preconditions.checkArgument(quietTries >= 0, "quietTries >= 0");
int nTry = 0;
final int maxRetries = maxTries - 1;
while (true) {
try {
nTry++;
return f.call();
return f.perform();
}
catch (Throwable e) {
if (cleanupAfterFailure != null) {
cleanupAfterFailure.cleanup();
}
if (nTry < maxTries && shouldRetry.apply(e)) {
awaitNextRetry(e, nTry, nTry <= quietTries);
awaitNextRetry(e, messageOnRetry, nTry, maxRetries, nTry <= quietTries);
} else {
Throwables.propagateIfInstanceOf(e, Exception.class);
throw Throwables.propagate(e);
@ -73,23 +99,69 @@ public class RetryUtils
}
}
/**
* Same as {@link #retry(Callable, Predicate, int, int)} with quietTries = 0.
*/
public static <T> T retry(final Callable<T> f, Predicate<Throwable> shouldRetry, final int maxTries) throws Exception
public static <T> T retry(final Task<T> f, Predicate<Throwable> shouldRetry, final int maxTries) throws Exception
{
return retry(f, shouldRetry, 0, maxTries);
}
private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException
public static <T> T retry(
final Task<T> f,
final Predicate<Throwable> shouldRetry,
final int quietTries,
final int maxTries
) throws Exception
{
return retry(f, shouldRetry, quietTries, maxTries, null, null);
}
public static <T> T retry(
final Task<T> f,
final Predicate<Throwable> shouldRetry,
final int maxTries,
final String messageOnRetry
) throws Exception
{
return retry(f, shouldRetry, 0, maxTries, null, messageOnRetry);
}
public static <T> T retry(
final Task<T> f,
final Predicate<Throwable> shouldRetry,
final CleanupAfterFailure onEachFailure,
final int maxTries,
final String messageOnRetry
) throws Exception
{
return retry(f, shouldRetry, 0, maxTries, onEachFailure, messageOnRetry);
}
public static void awaitNextRetry(
final Throwable e,
@Nullable final String messageOnRetry,
final int nTry,
final int maxRetries,
final boolean quiet
) throws InterruptedException
{
final long sleepMillis = nextRetrySleepMillis(nTry);
final String fullMessage;
if (messageOnRetry == null) {
fullMessage = StringUtils.format("Retrying (%d of %d) in %,dms.", nTry, maxRetries, sleepMillis);
} else {
fullMessage = StringUtils.format(
"%s, retrying (%d of %d) in %,dms.",
messageOnRetry,
nTry,
maxRetries,
sleepMillis
);
}
if (quiet) {
log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
log.debug(e, fullMessage);
} else {
log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
log.warn(e, fullMessage);
}
Thread.sleep(sleepMillis);
@ -97,10 +169,8 @@ public class RetryUtils
public static long nextRetrySleepMillis(final int nTry)
{
final long baseSleepMillis = 1000;
final long maxSleepMillis = 60000;
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2);
final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry - 1))
final long sleepMillis = (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1))
* fuzzyMultiplier);
return sleepMillis;
}

View File

@ -28,7 +28,6 @@ import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
/**
*/
@ -76,18 +75,13 @@ public class StreamUtils
{
try {
return RetryUtils.retry(
new Callable<Long>()
{
@Override
public Long call() throws Exception
{
try (InputStream inputStream = byteSource.openStream()) {
try (OutputStream outputStream = byteSink.openStream()) {
final long retval = ByteStreams.copy(inputStream, outputStream);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
outputStream.flush();
return retval;
}
() -> {
try (InputStream inputStream = byteSource.openStream()) {
try (OutputStream outputStream = byteSink.openStream()) {
final long retval = ByteStreams.copy(inputStream, outputStream);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
outputStream.flush();
return retval;
}
}
},

View File

@ -24,7 +24,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
public class RetryUtilsTest
@ -43,14 +42,9 @@ public class RetryUtilsTest
{
final AtomicInteger count = new AtomicInteger();
final String result = RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
count.incrementAndGet();
return "hey";
}
() -> {
count.incrementAndGet();
return "hey";
},
isTransient,
2
@ -66,14 +60,9 @@ public class RetryUtilsTest
boolean threwExpectedException = false;
try {
RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
count.incrementAndGet();
throw new IOException("what");
}
() -> {
count.incrementAndGet();
throw new IOException("what");
},
isTransient,
2
@ -91,16 +80,11 @@ public class RetryUtilsTest
{
final AtomicInteger count = new AtomicInteger();
final String result = RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
if (count.incrementAndGet() >= 2) {
return "hey";
} else {
throw new IOException("what");
}
() -> {
if (count.incrementAndGet() >= 2) {
return "hey";
} else {
throw new IOException("what");
}
},
isTransient,
@ -117,16 +101,11 @@ public class RetryUtilsTest
boolean threwExpectedException = false;
try {
RetryUtils.retry(
new Callable<String>()
{
@Override
public String call() throws Exception
{
if (count.incrementAndGet() >= 2) {
return "hey";
} else {
throw new IOException("uhh");
}
() -> {
if (count.incrementAndGet() >= 2) {
return "hey";
} else {
throw new IOException("uhh");
}
},
isTransient,
@ -139,5 +118,4 @@ public class RetryUtilsTest
Assert.assertTrue("threw expected exception", threwExpectedException);
Assert.assertEquals("count", 1, count.get());
}
}

View File

@ -47,7 +47,6 @@ import java.sql.SQLTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
public abstract class SQLMetadataConnector implements MetadataStorageConnector
{
@ -127,16 +126,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
final Predicate<Throwable> myShouldRetry
)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return getDBI().withHandle(callback);
}
};
try {
return RetryUtils.retry(call, myShouldRetry, DEFAULT_MAX_TRIES);
return RetryUtils.retry(() -> getDBI().withHandle(callback), myShouldRetry, DEFAULT_MAX_TRIES);
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -150,16 +141,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public <T> T retryTransaction(final TransactionCallback<T> callback, final int quietTries, final int maxTries)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return getDBI().inTransaction(callback);
}
};
try {
return RetryUtils.retry(call, shouldRetry, quietTries, maxTries);
return RetryUtils.retry(() -> getDBI().inTransaction(callback), shouldRetry, quietTries, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -20,7 +20,6 @@
package io.druid.segment.loading;
import com.google.common.base.Throwables;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.java.util.common.RetryUtils;
@ -31,7 +30,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
/**
@ -81,17 +79,10 @@ public class LocalFileTimestampVersionFinder extends LocalDataSegmentPuller
final File file = new File(uri);
try {
return RetryUtils.retry(
new Callable<URI>()
{
@Override
public URI call() throws Exception
{
return mostRecentInDir(
file.isDirectory() ? file.toPath() : file.getParentFile().toPath(),
pattern
);
}
},
() -> mostRecentInDir(
file.isDirectory() ? file.toPath() : file.getParentFile().toPath(),
pattern
),
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT
);

View File

@ -21,19 +21,27 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import org.apache.http.HttpHeaders;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URLConnection;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
{
private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris;
private final boolean supportContentRange;
@JsonCreator
public HttpFirehoseFactory(
@ -43,10 +51,15 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry
)
) throws IOException
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.uris = uris;
Preconditions.checkArgument(uris.size() > 0, "Empty URIs");
final URLConnection connection = uris.get(0).toURL().openConnection();
final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
this.supportContentRange = acceptRanges != null && acceptRanges.equalsIgnoreCase("bytes");
}
@JsonProperty
@ -67,6 +80,28 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
return object.toURL().openConnection().getInputStream();
}
@Override
protected InputStream openObjectStream(URI object, long start) throws IOException
{
if (supportContentRange) {
final URLConnection connection = object.toURL().openConnection();
// Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1
connection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
return connection.getInputStream();
} else {
log.warn(
"Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+ " a lot."
);
final InputStream in = openObjectStream(object);
in.skip(start);
return in;
}
}
@Override
protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException
{
@ -105,4 +140,10 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchRetry()
);
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return e -> e instanceof IOException;
}
}