diff --git a/api/src/main/java/io/druid/data/input/FirehoseFactory.java b/api/src/main/java/io/druid/data/input/FirehoseFactory.java index c68ad6be9a2..f99369bee4b 100644 --- a/api/src/main/java/io/druid/data/input/FirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/FirehoseFactory.java @@ -25,6 +25,7 @@ import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.guice.annotations.ExtensionPoint; import io.druid.java.util.common.parsers.ParseException; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -67,7 +68,7 @@ public interface FirehoseFactory * @param parser an input row parser * @param temporaryDirectory a directory where temporary files are stored */ - default Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException + default Firehose connect(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException { return connect(parser); } diff --git a/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java index aa95f2204af..4059715a22a 100644 --- a/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java @@ -106,8 +106,6 @@ public abstract class AbstractTextFilesFirehoseFactory * @param object an object to be read * * @return an input stream for the object - * - * @throws IOException */ protected abstract InputStream openObjectStream(T object) throws IOException; @@ -117,8 +115,7 @@ public abstract class AbstractTextFilesFirehoseFactory * * @param object an input object * @param stream a stream for the object - * @return - * @throws IOException + * @return an wrapped input stream */ protected abstract InputStream wrapObjectStream(T object, InputStream stream) throws IOException; } diff --git a/api/src/main/java/io/druid/data/input/impl/prefetch/Fetcher.java b/api/src/main/java/io/druid/data/input/impl/prefetch/Fetcher.java index d418bedaa17..c1833c0a9dc 100644 --- a/api/src/main/java/io/druid/data/input/impl/prefetch/Fetcher.java +++ b/api/src/main/java/io/druid/data/input/impl/prefetch/Fetcher.java @@ -19,11 +19,16 @@ package io.druid.data.input.impl.prefetch; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import org.apache.commons.io.IOUtils; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; @@ -54,6 +59,8 @@ public class Fetcher implements Iterator> private final CacheManager cacheManager; private final List objects; private final ExecutorService fetchExecutor; + + @Nullable private final File temporaryDirectory; // A roughly max size of total fetched objects, but the actual fetched size can be bigger. The reason is our current @@ -80,6 +87,7 @@ public class Fetcher implements Iterator> private final AtomicLong fetchedBytes = new AtomicLong(0); private final ObjectOpenFunction openObjectFunction; + private final Predicate retryCondition; private final byte[] buffer; private Future fetchFuture; @@ -94,12 +102,13 @@ public class Fetcher implements Iterator> CacheManager cacheManager, List objects, ExecutorService fetchExecutor, - File temporaryDirectory, + @Nullable File temporaryDirectory, long maxFetchCapacityBytes, long prefetchTriggerBytes, long fetchTimeout, int maxFetchRetry, - ObjectOpenFunction openObjectFunction + ObjectOpenFunction openObjectFunction, + Predicate retryCondition ) { this.cacheManager = cacheManager; @@ -111,6 +120,7 @@ public class Fetcher implements Iterator> this.fetchTimeout = fetchTimeout; this.maxFetchRetry = maxFetchRetry; this.openObjectFunction = openObjectFunction; + this.retryCondition = retryCondition; this.buffer = new byte[BUFFER_SIZE]; this.prefetchEnabled = maxFetchCapacityBytes > 0; @@ -120,6 +130,10 @@ public class Fetcher implements Iterator> this.fetchedFiles.addAll(cacheManager.getFiles()); this.nextFetchIndex = fetchedFiles.size(); + if (cacheManager.isEnabled() || prefetchEnabled) { + Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory"); + } + if (prefetchEnabled) { fetchIfNeeded(0L); } @@ -155,7 +169,7 @@ public class Fetcher implements Iterator> final T object = objects.get(nextFetchIndex); LOG.info("Fetching [%d]th object[%s], fetchedBytes[%d]", nextFetchIndex, object, fetchedBytes.get()); final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory); - fetchedBytes.addAndGet(download(object, outFile, 0)); + fetchedBytes.addAndGet(download(object, outFile)); fetchedFiles.put(new FetchedFile<>(object, outFile, getFileCloser(outFile, fetchedBytes))); } } @@ -166,26 +180,27 @@ public class Fetcher implements Iterator> * * @param object an object to be downloaded * @param outFile a file which the object data is stored - * @param tryCount current retry count * * @return number of downloaded bytes */ - private long download(T object, File outFile, int tryCount) throws IOException + private long download(T object, File outFile) throws IOException { - try (final InputStream is = openObjectFunction.open(object); - final OutputStream os = new FileOutputStream(outFile)) { - return IOUtils.copyLarge(is, os, buffer); + try { + return RetryUtils.retry( + () -> { + try (final InputStream is = openObjectFunction.open(object); + final OutputStream os = new FileOutputStream(outFile)) { + return IOUtils.copyLarge(is, os, buffer); + } + }, + retryCondition, + outFile::delete, + maxFetchRetry + 1, + StringUtils.format("Failed to download object[%s]", object) + ); } - catch (IOException e) { - final int nextTry = tryCount + 1; - if (!Thread.currentThread().isInterrupted() && nextTry < maxFetchRetry) { - LOG.error(e, "Failed to download object[%s], retrying (%d of %d)", object, nextTry, maxFetchRetry); - outFile.delete(); - return download(object, outFile, nextTry); - } else { - LOG.error(e, "Failed to download object[%s], retries exhausted, aborting", object); - throw e; - } + catch (Exception e) { + throw new IOException(e); } } @@ -289,7 +304,11 @@ public class Fetcher implements Iterator> final T object = objects.get(nextFetchIndex); LOG.info("Reading [%d]th object[%s]", nextFetchIndex, object); nextFetchIndex++; - return new OpenedObject<>(object, openObjectFunction.open(object), getNoopCloser()); + return new OpenedObject<>( + object, + new RetryingInputStream<>(object, openObjectFunction, retryCondition, maxFetchRetry), + getNoopCloser() + ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/prefetch/ObjectOpenFunction.java b/api/src/main/java/io/druid/data/input/impl/prefetch/ObjectOpenFunction.java index 52cfbce8afb..45c06653300 100644 --- a/api/src/main/java/io/druid/data/input/impl/prefetch/ObjectOpenFunction.java +++ b/api/src/main/java/io/druid/data/input/impl/prefetch/ObjectOpenFunction.java @@ -25,4 +25,6 @@ import java.io.InputStream; interface ObjectOpenFunction { InputStream open(T object) throws IOException; + + InputStream open(T object, long start) throws IOException; } diff --git a/api/src/main/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java index eb694cf85b7..78f91423280 100644 --- a/api/src/main/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java @@ -22,6 +22,7 @@ package io.druid.data.input.impl.prefetch; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import io.druid.data.input.Firehose; import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory; @@ -32,6 +33,7 @@ import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.logger.Logger; import org.apache.commons.io.LineIterator; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -159,22 +161,25 @@ public abstract class PrefetchableTextFilesFirehoseFactory } @Override - public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException + public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException { - if (!cacheManager.isEnabled() && maxFetchCapacityBytes == 0) { - return super.connect(firehoseParser, temporaryDirectory); - } - if (objects == null) { objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects")); } - Preconditions.checkState(temporaryDirectory.exists(), "temporaryDirectory[%s] does not exist", temporaryDirectory); - Preconditions.checkState( - temporaryDirectory.isDirectory(), - "temporaryDirectory[%s] is not a directory", - temporaryDirectory - ); + if (cacheManager.isEnabled() || maxFetchCapacityBytes > 0) { + Preconditions.checkNotNull(temporaryDirectory, "temporaryDirectory"); + Preconditions.checkArgument( + temporaryDirectory.exists(), + "temporaryDirectory[%s] does not exist", + temporaryDirectory + ); + Preconditions.checkArgument( + temporaryDirectory.isDirectory(), + "temporaryDirectory[%s] is not a directory", + temporaryDirectory + ); + } LOG.info("Create a new firehose for [%d] objects", objects.size()); @@ -189,7 +194,21 @@ public abstract class PrefetchableTextFilesFirehoseFactory prefetchTriggerBytes, fetchTimeout, maxFetchRetry, - this::openObjectStream + new ObjectOpenFunction() + { + @Override + public InputStream open(T object) throws IOException + { + return openObjectStream(object); + } + + @Override + public InputStream open(T object, long start) throws IOException + { + return openObjectStream(object, start); + } + }, + getRetryCondition() ); return new FileIteratingFirehose( @@ -240,6 +259,23 @@ public abstract class PrefetchableTextFilesFirehoseFactory ); } + /** + * Returns a predicate describing retry conditions. {@link Fetcher} and {@link RetryingInputStream} will retry on the + * errors satisfying this condition. + */ + protected abstract Predicate getRetryCondition(); + + /** + * Open an input stream from the given object. If the object is compressed, this method should return a byte stream + * as it is compressed. The object compression should be handled in {@link #wrapObjectStream(Object, InputStream)}. + * + * @param object an object to be read + * @param start start offset + * + * @return an input stream for the object + */ + protected abstract InputStream openObjectStream(T object, long start) throws IOException; + /** * This class calls the {@link Closeable#close()} method of the resourceCloser when it is closed. */ diff --git a/api/src/main/java/io/druid/data/input/impl/prefetch/RetryingInputStream.java b/api/src/main/java/io/druid/data/input/impl/prefetch/RetryingInputStream.java new file mode 100644 index 00000000000..20bb6158ae6 --- /dev/null +++ b/api/src/main/java/io/druid/data/input/impl/prefetch/RetryingInputStream.java @@ -0,0 +1,184 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.impl.prefetch; + +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.io.CountingInputStream; +import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.logger.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketException; + +/** + * This class is used by {@link Fetcher} when prefetch is disabled. It's responsible for re-opening the underlying input + * stream for the input object on the socket connection reset as well as the given {@link #retryCondition}. + * + * @param object type + */ +class RetryingInputStream extends InputStream +{ + private static final Logger log = new Logger(RetryingInputStream.class); + + private final T object; + private final ObjectOpenFunction objectOpenFunction; + private final Predicate retryCondition; + private final int maxRetry; + + private CountingInputStream delegate; + private long startOffset; + + RetryingInputStream( + T object, + ObjectOpenFunction objectOpenFunction, + Predicate retryCondition, + int maxRetry + ) throws IOException + { + this.object = object; + this.objectOpenFunction = objectOpenFunction; + this.retryCondition = retryCondition; + this.maxRetry = maxRetry; + this.delegate = new CountingInputStream(objectOpenFunction.open(object)); + } + + private boolean isConnectionReset(Throwable t) + { + return (t instanceof SocketException && (t.getMessage() != null && t.getMessage().contains("Connection reset"))) || + (t.getCause() != null && isConnectionReset(t.getCause())); + } + + private void waitOrThrow(Throwable t, int nTry) throws IOException + { + final boolean isConnectionReset = isConnectionReset(t); + if (isConnectionReset || retryCondition.apply(t)) { + if (isConnectionReset) { + // Re-open the input stream on connection reset + startOffset += delegate.getCount(); + try { + delegate.close(); + } + catch (IOException e) { + // ignore this exception + log.warn(e, "Error while closing the delegate input stream"); + } + } + try { + // Wait for the next try + RetryUtils.awaitNextRetry(t, null, nTry + 1, maxRetry, false); + + if (isConnectionReset) { + log.info("retrying from offset[%d]", startOffset); + delegate = new CountingInputStream(objectOpenFunction.open(object, startOffset)); + } + } + catch (InterruptedException | IOException e) { + t.addSuppressed(e); + throwAsIOException(t); + } + } else { + throwAsIOException(t); + } + } + + private static void throwAsIOException(Throwable t) throws IOException + { + Throwables.propagateIfInstanceOf(t, IOException.class); + throw new IOException(t); + } + + @Override + public int read() throws IOException + { + for (int nTry = 0; nTry < maxRetry; nTry++) { + try { + return delegate.read(); + } + catch (Throwable t) { + waitOrThrow(t, nTry); + } + } + return delegate.read(); + } + + @Override + public int read(byte b[]) throws IOException + { + for (int nTry = 0; nTry < maxRetry; nTry++) { + try { + return delegate.read(b); + } + catch (Throwable t) { + waitOrThrow(t, nTry); + } + } + return delegate.read(b); + } + + @Override + public int read(byte b[], int off, int len) throws IOException + { + for (int nTry = 0; nTry < maxRetry; nTry++) { + try { + return delegate.read(b, off, len); + } + catch (Throwable t) { + waitOrThrow(t, nTry); + } + } + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException + { + for (int nTry = 0; nTry < maxRetry; nTry++) { + try { + return delegate.skip(n); + } + catch (Throwable t) { + waitOrThrow(t, nTry); + } + } + return delegate.skip(n); + } + + @Override + public int available() throws IOException + { + for (int nTry = 0; nTry < maxRetry; nTry++) { + try { + return delegate.available(); + } + catch (Throwable t) { + waitOrThrow(t, nTry); + } + } + return delegate.available(); + } + + @Override + public void close() throws IOException + { + delegate.close(); + } +} diff --git a/api/src/test/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest.java b/api/src/test/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest.java index ddec6a27c4e..fc8c5a7b519 100644 --- a/api/src/test/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest.java +++ b/api/src/test/java/io/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactoryTest.java @@ -21,6 +21,7 @@ package io.druid.data.input.impl.prefetch; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.io.CountingOutputStream; import io.druid.data.input.Firehose; @@ -30,6 +31,7 @@ import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; @@ -47,6 +49,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.Writer; +import java.net.SocketException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; @@ -187,6 +190,25 @@ public class PrefetchableTextFilesFirehoseFactoryTest assertNumRemainingCacheFiles(firehoseTmpDir, 0); } + @Test + public void testWithoutCacheAndFetchAgainstConnectionReset() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.withConnectionResets(TEST_DIR, 0, 0, 2); + + final List rows = new ArrayList<>(); + final File firehoseTmpDir = createFirehoseTmpDir("testWithoutCacheAndFetch"); + try (Firehose firehose = factory.connect(parser, firehoseTmpDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + Assert.assertEquals(0, factory.getCacheManager().getTotalCachedBytes()); + assertResult(rows); + assertNumRemainingCacheFiles(firehoseTmpDir, 0); + } + @Test public void testWithoutCache() throws IOException { @@ -377,10 +399,10 @@ public class PrefetchableTextFilesFirehoseFactoryTest static class TestPrefetchableTextFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { - private static final long defaultTimeout = 1000; private final long sleepMillis; private final File baseDir; - private int openExceptionCount; + private int numOpenExceptions; + private int maxConnectionResets; static TestPrefetchableTextFilesFirehoseFactory with(File baseDir, long cacheCapacity, long fetchCapacity) { @@ -389,9 +411,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest 1024, cacheCapacity, fetchCapacity, - defaultTimeout, 3, 0, + 0, 0 ); } @@ -403,9 +425,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest 1024, 2048, 2048, - defaultTimeout, 3, 0, + 0, 0 ); } @@ -417,9 +439,28 @@ public class PrefetchableTextFilesFirehoseFactoryTest 1024, 2048, 2048, - defaultTimeout, 3, count, + 0, + 0 + ); + } + + static TestPrefetchableTextFilesFirehoseFactory withConnectionResets( + File baseDir, + long cacheCapacity, + long fetchCapacity, + int numConnectionResets + ) + { + return new TestPrefetchableTextFilesFirehoseFactory( + baseDir, + fetchCapacity / 2, + cacheCapacity, + fetchCapacity, + 3, + 0, + numConnectionResets, 0 ); } @@ -434,18 +475,54 @@ public class PrefetchableTextFilesFirehoseFactoryTest 100, 3, 0, + 0, ms ); } - public TestPrefetchableTextFilesFirehoseFactory( + private static long computeTimeout(int maxRetry) + { + // See RetryUtils.nextRetrySleepMillis() + final double maxFuzzyMultiplier = 2.; + return (long) Math.min( + RetryUtils.MAX_SLEEP_MILLIS, + RetryUtils.BASE_SLEEP_MILLIS * Math.pow(2, maxRetry - 1) * maxFuzzyMultiplier + ); + } + + TestPrefetchableTextFilesFirehoseFactory( + File baseDir, + long prefetchTriggerThreshold, + long maxCacheCapacityBytes, + long maxFetchCapacityBytes, + int maxRetry, + int numOpenExceptions, + int numConnectionResets, + long sleepMillis + ) + { + this( + baseDir, + prefetchTriggerThreshold, + maxCacheCapacityBytes, + maxFetchCapacityBytes, + computeTimeout(maxRetry), + maxRetry, + numOpenExceptions, + numConnectionResets, + sleepMillis + ); + } + + TestPrefetchableTextFilesFirehoseFactory( File baseDir, long prefetchTriggerThreshold, long maxCacheCapacityBytes, long maxFetchCapacityBytes, long timeout, int maxRetry, - int openExceptionCount, + int numOpenExceptions, + int maxConnectionResets, long sleepMillis ) { @@ -456,7 +533,8 @@ public class PrefetchableTextFilesFirehoseFactoryTest timeout, maxRetry ); - this.openExceptionCount = openExceptionCount; + this.numOpenExceptions = numOpenExceptions; + this.maxConnectionResets = maxConnectionResets; this.sleepMillis = sleepMillis; this.baseDir = baseDir; } @@ -474,8 +552,8 @@ public class PrefetchableTextFilesFirehoseFactoryTest @Override protected InputStream openObjectStream(File object) throws IOException { - if (openExceptionCount > 0) { - openExceptionCount--; + if (numOpenExceptions > 0) { + numOpenExceptions--; throw new IOException("Exception for retry test"); } if (sleepMillis > 0) { @@ -486,7 +564,9 @@ public class PrefetchableTextFilesFirehoseFactoryTest throw new RuntimeException(e); } } - return FileUtils.openInputStream(object); + return maxConnectionResets > 0 ? + new TestInputStream(FileUtils.openInputStream(object), maxConnectionResets) : + FileUtils.openInputStream(object); } @Override @@ -494,5 +574,76 @@ public class PrefetchableTextFilesFirehoseFactoryTest { return stream; } + + @Override + protected Predicate getRetryCondition() + { + return e -> e instanceof IOException; + } + + @Override + protected InputStream openObjectStream(File object, long start) throws IOException + { + if (numOpenExceptions > 0) { + numOpenExceptions--; + throw new IOException("Exception for retry test"); + } + if (sleepMillis > 0) { + try { + Thread.sleep(sleepMillis); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + final InputStream in = FileUtils.openInputStream(object); + in.skip(start); + + return maxConnectionResets > 0 ? new TestInputStream(in, maxConnectionResets) : in; + } + + private int readCount; + private int numConnectionResets; + + private class TestInputStream extends InputStream + { + private static final int NUM_READ_COUNTS_BEFORE_ERROR = 10; + private final InputStream delegate; + private final int maxConnectionResets; + + TestInputStream( + InputStream delegate, + int maxConnectionResets + ) + { + this.delegate = delegate; + this.maxConnectionResets = maxConnectionResets; + } + + @Override + public int read() throws IOException + { + if (readCount++ % NUM_READ_COUNTS_BEFORE_ERROR == 0) { + if (numConnectionResets++ < maxConnectionResets) { + // Simulate connection reset + throw new SocketException("Test Connection reset"); + } + } + return delegate.read(); + } + + @Override + public int read(byte b[], int off, int len) throws IOException + { + if (readCount++ % NUM_READ_COUNTS_BEFORE_ERROR == 0) { + if (numConnectionResets++ < maxConnectionResets) { + // Simulate connection reset + throw new SocketException("Test Connection reset"); + } + } + return delegate.read(b, off, len); + } + } } } diff --git a/api/src/test/java/io/druid/data/input/impl/prefetch/RetryingInputStreamTest.java b/api/src/test/java/io/druid/data/input/impl/prefetch/RetryingInputStreamTest.java new file mode 100644 index 00000000000..5f54b19e781 --- /dev/null +++ b/api/src/test/java/io/druid/data/input/impl/prefetch/RetryingInputStreamTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.impl.prefetch; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class RetryingInputStreamTest +{ + private static final int MAX_RETRY = 5; + private static final int MAX_ERROR = 4; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private File testFile; + private DataInputStream inputStream; + + @Before + public void setup() throws IOException + { + testFile = temporaryFolder.newFile(); + + try (FileOutputStream fis = new FileOutputStream(testFile); + GZIPOutputStream gis = new GZIPOutputStream(fis); + DataOutputStream dis = new DataOutputStream(gis)) { + for (int i = 0; i < 10000; i++) { + dis.writeInt(i); + } + } + + throwError = false; + + final InputStream retryingInputStream = new RetryingInputStream<>( + testFile, + new ObjectOpenFunction() + { + @Override + public InputStream open(File object) throws IOException + { + return new TestInputStream(new FileInputStream(object)); + } + + @Override + public InputStream open(File object, long start) throws IOException + { + final FileInputStream fis = new FileInputStream(object); + Preconditions.checkState(fis.skip(start) == start); + return new TestInputStream(fis); + } + }, + e -> e instanceof IOException, + MAX_RETRY + ); + + inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream)); + + throwError = true; + } + + @After + public void teardown() throws IOException + { + inputStream.close(); + FileUtils.forceDelete(testFile); + } + + @Test + public void testReadRetry() throws IOException + { + for (int i = 0; i < 10000; i++) { + Assert.assertEquals(i, inputStream.readInt()); + } + } + + private boolean throwError = true; + private int errorCount = 0; + + private class TestInputStream extends InputStream + { + private final InputStream delegate; + + TestInputStream(InputStream delegate) + { + this.delegate = delegate; + } + + @Override + public int read() throws IOException + { + return delegate.read(); + } + + @Override + public int read(byte b[], int off, int len) throws IOException + { + if (throwError) { + throwError = false; + errorCount++; + if (errorCount % 2 == 0) { + throw new IOException("test retry"); + } else { + delegate.close(); + throw new SocketException("Test Connection reset"); + } + } else { + throwError = errorCount < MAX_ERROR; + return delegate.read(b, off, len); + } + } + } +} diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index ff5fdc444ca..e3ad8b432aa 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -22,10 +22,13 @@ package io.druid.firehose.azure; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.storage.azure.AzureByteSource; import io.druid.storage.azure.AzureStorage; +import io.druid.storage.azure.AzureUtils; import java.io.IOException; import java.io.InputStream; @@ -75,6 +78,16 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi return makeByteSource(azureStorage, object).openStream(); } + @Override + protected InputStream openObjectStream(AzureBlob object, long start) throws IOException + { + // BlobInputStream.skip() moves the next read offset instead of skipping first 'start' bytes. + final InputStream in = openObjectStream(object); + final long skip = in.skip(start); + Preconditions.checkState(skip == start, "start offset was [%s] but [%s] bytes were skipped", start, skip); + return in; + } + @Override protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException { @@ -124,4 +137,10 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi getMaxFetchRetry() ); } + + @Override + protected Predicate getRetryCondition() + { + return AzureUtils.AZURE_RETRY; + } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 39ab34258e0..dc14667607d 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.microsoft.azure.storage.StorageException; - import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; @@ -40,7 +39,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; public class AzureDataSegmentPusher implements DataSegmentPusher { @@ -149,14 +147,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher final Map azurePaths = getAzurePaths(segment); return AzureUtils.retryAzureOperation( - new Callable() - { - @Override - public DataSegment call() throws Exception - { - return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting); - } - }, + () -> uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting), config.getMaxTries() ); } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 50092ceff62..9b9ceff44c2 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -33,7 +33,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.util.concurrent.Callable; public class AzureTaskLogs implements TaskLogs { @@ -58,7 +57,7 @@ public class AzureTaskLogs implements TaskLogs try { AzureUtils.retryAzureOperation( - (Callable) () -> { + () -> { azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true); return null; }, diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java index 0bbabd6594e..d1893e69e98 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java @@ -21,12 +21,11 @@ package io.druid.storage.azure; import com.google.common.base.Predicate; import com.microsoft.azure.storage.StorageException; - import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.RetryUtils.Task; import java.io.IOException; import java.net.URISyntaxException; -import java.util.concurrent.Callable; public class AzureUtils { @@ -53,7 +52,7 @@ public class AzureUtils } }; - public static T retryAzureOperation(Callable f, int maxTries) throws Exception + public static T retryAzureOperation(Task f, int maxTries) throws Exception { return RetryUtils.retry(f, AZURE_RETRY, maxTries); } diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java index 64f8920a47a..80ad7dab7ab 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java @@ -35,7 +35,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Callable; /** * Cassandra Segment Puller @@ -77,20 +76,15 @@ public class CassandraDataSegmentPuller extends CassandraStorage implements Data final FileUtils.FileCopyResult localResult; try { localResult = RetryUtils.retry( - new Callable() - { - @Override - public FileUtils.FileCopyResult call() throws Exception - { - try (OutputStream os = new FileOutputStream(tmpFile)) { - ChunkedStorage - .newReader(indexStorage, key, os) - .withBatchSize(BATCH_SIZE) - .withConcurrencyLevel(CONCURRENCY) - .call(); - } - return new FileUtils.FileCopyResult(tmpFile); + () -> { + try (OutputStream os = new FileOutputStream(tmpFile)) { + ChunkedStorage + .newReader(indexStorage, key, os) + .withBatchSize(BATCH_SIZE) + .withConcurrencyLevel(CONCURRENCY) + .call(); } + return new FileUtils.FileCopyResult(tmpFile); }, Predicates.alwaysTrue(), 10 diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index b7b80044415..5f39e7e5a44 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -22,11 +22,13 @@ package io.druid.firehose.cloudfiles; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Predicate; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; import io.druid.storage.cloudfiles.CloudFilesByteSource; import io.druid.storage.cloudfiles.CloudFilesObjectApiProxy; +import io.druid.storage.cloudfiles.CloudFilesUtils; import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; import java.io.IOException; @@ -72,6 +74,17 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho @Override protected InputStream openObjectStream(CloudFilesBlob object) throws IOException + { + return openObjectStream(object, 0); + } + + @Override + protected InputStream openObjectStream(CloudFilesBlob object, long start) throws IOException + { + return createCloudFilesByteSource(object).openStream(start); + } + + private CloudFilesByteSource createCloudFilesByteSource(CloudFilesBlob object) { final String region = object.getRegion(); final String container = object.getContainer(); @@ -82,9 +95,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho ); CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy( cloudFilesApi, region, container); - final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); - - return byteSource.openStream(); + return new CloudFilesByteSource(objectApi, path); } @Override @@ -125,4 +136,10 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho getMaxFetchRetry() ); } + + @Override + protected Predicate getRetryCondition() + { + return CloudFilesUtils.CLOUDFILESRETRY; + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java index 882b315a28a..7a36dad3b34 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java @@ -51,7 +51,12 @@ public class CloudFilesByteSource extends ByteSource @Override public InputStream openStream() throws IOException { - payload = (payload == null) ? objectApi.get(path).getPayload() : payload; + return openStream(0); + } + + public InputStream openStream(long start) throws IOException + { + payload = (payload == null) ? objectApi.get(path, start).getPayload() : payload; try { return payload.openStream(); diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index e224a5b83b7..8edabff1759 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.util.Map; -import java.util.concurrent.Callable; public class CloudFilesDataSegmentPusher implements DataSegmentPusher { @@ -90,41 +89,37 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath); return CloudFilesUtils.retryCloudFilesOperation( - new Callable() - { - @Override - public DataSegment call() throws Exception - { - CloudFilesObject segmentData = new CloudFilesObject( - segmentPath, outFile, objectApi.getRegion(), - objectApi.getContainer() + () -> { + CloudFilesObject segmentData = new CloudFilesObject( + segmentPath, outFile, objectApi.getRegion(), + objectApi.getContainer() + ); + + if (!replaceExisting && objectApi.exists(segmentData.getPath())) { + log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath()); + } else { + log.info("Pushing %s.", segmentData.getPath()); + objectApi.put(segmentData); + + // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in + // runtime, and because Guava deletes methods over time, that causes incompatibilities. + Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); + CloudFilesObject descriptorData = new CloudFilesObject( + segmentPath, descFile, + objectApi.getRegion(), objectApi.getContainer() ); - - if (!replaceExisting && objectApi.exists(segmentData.getPath())) { - log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath()); - } else { - log.info("Pushing %s.", segmentData.getPath()); - objectApi.put(segmentData); - - // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in - // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); - CloudFilesObject descriptorData = new CloudFilesObject( - segmentPath, descFile, - objectApi.getRegion(), objectApi.getContainer() - ); - log.info("Pushing %s.", descriptorData.getPath()); - objectApi.put(descriptorData); - } - - final DataSegment outSegment = inSegment - .withSize(indexSize) - .withLoadSpec(makeLoadSpec(new URI(segmentData.getPath()))) - .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); - - return outSegment; + log.info("Pushing %s.", descriptorData.getPath()); + objectApi.put(descriptorData); } - }, this.config.getOperationMaxRetries() + + final DataSegment outSegment = inSegment + .withSize(indexSize) + .withLoadSpec(makeLoadSpec(new URI(segmentData.getPath()))) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + + return outSegment; + }, + this.config.getOperationMaxRetries() ); } catch (Exception e) { diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java index d495fcc1c77..8b9f8ede968 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java @@ -19,6 +19,7 @@ package io.druid.storage.cloudfiles; +import org.jclouds.http.options.GetOptions; import org.jclouds.io.Payload; import org.jclouds.openstack.swift.v1.domain.SwiftObject; import org.jclouds.openstack.swift.v1.features.ObjectApi; @@ -52,9 +53,14 @@ public class CloudFilesObjectApiProxy return objectApi.put(cloudFilesObject.getPath(), cloudFilesObject.getPayload()); } - public CloudFilesObject get(String path) + public CloudFilesObject get(String path, long start) { - SwiftObject swiftObject = objectApi.get(path); + final SwiftObject swiftObject; + if (start == 0) { + swiftObject = objectApi.get(path); + } else { + swiftObject = objectApi.get(path, new GetOptions().startAt(start)); + } Payload payload = swiftObject.getPayload(); return new CloudFilesObject(payload, this.region, this.container, path); } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java index e409964de39..9d6e26d7103 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java @@ -20,11 +20,10 @@ package io.druid.storage.cloudfiles; import com.google.common.base.Predicate; - import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.RetryUtils.Task; import java.io.IOException; -import java.util.concurrent.Callable; /** * @@ -50,7 +49,7 @@ public class CloudFilesUtils /** * Retries CloudFiles operations that fail due to io-related exceptions. */ - public static T retryCloudFilesOperation(Callable f, final int maxTries) throws Exception + public static T retryCloudFilesOperation(Task f, final int maxTries) throws Exception { return RetryUtils.retry(f, CLOUDFILESRETRY, maxTries); } diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java index dbf69111113..217d312479b 100644 --- a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java +++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java @@ -42,7 +42,7 @@ public class CloudFilesByteSourceTest extends EasyMockSupport Payload payload = createMock(Payload.class); InputStream stream = createMock(InputStream.class); - expect(objectApi.get(path)).andReturn(cloudFilesObject); + expect(objectApi.get(path, 0)).andReturn(cloudFilesObject); expect(cloudFilesObject.getPayload()).andReturn(payload); expect(payload.openStream()).andReturn(stream); payload.close(); @@ -66,7 +66,7 @@ public class CloudFilesByteSourceTest extends EasyMockSupport Payload payload = createMock(Payload.class); InputStream stream = createMock(InputStream.class); - expect(objectApi.get(path)).andReturn(cloudFilesObject); + expect(objectApi.get(path, 0)).andReturn(cloudFilesObject); expect(cloudFilesObject.getPayload()).andReturn(payload); expect(payload.openStream()).andThrow(new IOException()).andReturn(stream); payload.close(); diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java index 8273827fcc5..759df1b5bd9 100644 --- a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java +++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java @@ -51,7 +51,7 @@ public class CloudFilesObjectApiProxyTest extends EasyMockSupport replayAll(); CloudFilesObjectApiProxy cfoApiProxy = new CloudFilesObjectApiProxy(cloudFilesApi, region, container); - CloudFilesObject cloudFilesObject = cfoApiProxy.get(path); + CloudFilesObject cloudFilesObject = cfoApiProxy.get(path, 0); assertEquals(cloudFilesObject.getPayload(), payload); assertEquals(cloudFilesObject.getRegion(), region); diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 7ee1b72b4f4..0d5d9995960 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -22,10 +22,12 @@ package io.druid.firehose.google; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Predicate; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.storage.google.GoogleByteSource; import io.druid.storage.google.GoogleStorage; +import io.druid.storage.google.GoogleUtils; import java.io.IOException; import java.io.InputStream; @@ -68,13 +70,24 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF @Override protected InputStream openObjectStream(GoogleBlob object) throws IOException + { + return openObjectStream(object, 0); + } + + @Override + protected InputStream openObjectStream(GoogleBlob object, long start) throws IOException + { + return createGoogleByteSource(object).openStream(start); + } + + private GoogleByteSource createGoogleByteSource(GoogleBlob object) { final String bucket = object.getBucket(); final String path = object.getPath().startsWith("/") ? object.getPath().substring(1) : object.getPath(); - return new GoogleByteSource(storage, bucket, path).openStream(); + return new GoogleByteSource(storage, bucket, path); } @Override @@ -115,5 +128,11 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF getMaxFetchRetry() ); } + + @Override + protected Predicate getRetryCondition() + { + return GoogleUtils.GOOGLE_RETRY; + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java index 047698f2e63..95a2b4f4056 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleByteSource.java @@ -42,4 +42,9 @@ public class GoogleByteSource extends ByteSource { return storage.get(bucket, path); } + + public InputStream openStream(long start) throws IOException + { + return storage.get(bucket, path, start); + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java index e42940393ec..7423ee6274a 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleStorage.java @@ -21,6 +21,7 @@ package io.druid.storage.google; import com.google.api.client.http.AbstractInputStreamContent; import com.google.api.services.storage.Storage; +import com.google.api.services.storage.Storage.Objects.Get; import java.io.IOException; import java.io.InputStream; @@ -44,9 +45,17 @@ public class GoogleStorage public InputStream get(final String bucket, final String path) throws IOException { - Storage.Objects.Get getObject = storage.objects().get(bucket, path); - getObject.getMediaHttpDownloader().setDirectDownloadEnabled(false); - return getObject.executeMediaAsInputStream(); + return get(bucket, path, 0); + } + + public InputStream get(final String bucket, final String path, long start) throws IOException + { + final Get get = storage.objects().get(bucket, path); + if (start > 0) { + get.getMediaHttpDownloader().setBytesDownloaded(start); + } + get.getMediaHttpDownloader().setDirectDownloadEnabled(false); + return get.executeMediaAsInputStream(); } public void delete(final String bucket, final String path) throws IOException diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index 47fe6bb830f..1ec9e16b451 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -48,7 +48,6 @@ import java.io.OutputStream; import java.io.Reader; import java.io.Writer; import java.net.URI; -import java.util.concurrent.Callable; /** */ @@ -183,40 +182,34 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller try { return RetryUtils.retry( - new Callable() - { - @Override - public FileUtils.FileCopyResult call() throws Exception - { - if (!fs.exists(path)) { - throw new SegmentLoadingException("No files found at [%s]", path.toString()); - } - - final RemoteIterator children = fs.listFiles(path, false); - final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); - while (children.hasNext()) { - final LocatedFileStatus child = children.next(); - final Path childPath = child.getPath(); - final String fname = childPath.getName(); - if (fs.isDirectory(childPath)) { - log.warn("[%s] is a child directory, skipping", childPath.toString()); - } else { - final File outFile = new File(outDir, fname); - - // Actual copy - fs.copyToLocalFile(childPath, new Path(outFile.toURI())); - result.addFile(outFile); - } - } - log.info( - "Copied %d bytes from [%s] to [%s]", - result.size(), - path.toString(), - outDir.getAbsolutePath() - ); - return result; + () -> { + if (!fs.exists(path)) { + throw new SegmentLoadingException("No files found at [%s]", path.toString()); } + final RemoteIterator children = fs.listFiles(path, false); + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + while (children.hasNext()) { + final LocatedFileStatus child = children.next(); + final Path childPath = child.getPath(); + final String fname = childPath.getName(); + if (fs.isDirectory(childPath)) { + log.warn("[%s] is a child directory, skipping", childPath.toString()); + } else { + final File outFile = new File(outDir, fname); + + // Actual copy + fs.copyToLocalFile(childPath, new Path(outFile.toURI())); + result.addFile(outFile); + } + } + log.info( + "Copied %d bytes from [%s] to [%s]", + result.size(), + path.toString(), + outDir.getAbsolutePath() + ); + return result; }, shouldRetryPredicate(), DEFAULT_RETRY_COUNT diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java index adb11b4dbc4..b232ee0c11a 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java @@ -21,10 +21,8 @@ package io.druid.storage.hdfs; import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.data.SearchableVersionedDataFinder; import io.druid.java.util.common.RetryUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -34,7 +32,6 @@ import org.apache.hadoop.fs.PathFilter; import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; -import java.util.concurrent.Callable; import java.util.regex.Pattern; /** @@ -89,17 +86,12 @@ public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implem final Path path = new Path(uri); try { return RetryUtils.retry( - new Callable() - { - @Override - public URI call() throws Exception - { - final FileSystem fs = path.getFileSystem(config); - if (!fs.exists(path)) { - return null; - } - return mostRecentInDir(fs.isDirectory(path) ? path : path.getParent(), pattern); + () -> { + final FileSystem fs = path.getFileSystem(config); + if (!fs.exists(path)) { + return null; } + return mostRecentInDir(fs.isDirectory(path) ? path : path.getParent(), pattern); }, shouldRetryPredicate(), DEFAULT_RETRY_COUNT diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index 35229ab604d..fa649dc495c 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; +import io.druid.storage.s3.S3Utils; import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; @@ -188,6 +190,28 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor } } + @Override + protected InputStream openObjectStream(S3Object object, long start) throws IOException + { + try { + final S3Object result = s3Client.getObject( + object.getBucketName(), + object.getKey(), + null, + null, + null, + null, + start, + null + ); + + return result.getDataInputStream(); + } + catch (ServiceException e) { + throw new IOException(e); + } + } + @Override protected InputStream wrapObjectStream(S3Object object, InputStream stream) throws IOException { @@ -228,4 +252,10 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor getMaxFetchRetry() ); } + + @Override + protected Predicate getRetryCondition() + { + return S3Utils.S3RETRY; + } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 983837865e8..1c22418e2b4 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -41,7 +41,6 @@ import org.jets3t.service.model.S3Object; import java.io.IOException; import java.util.Map; -import java.util.concurrent.Callable; public class S3DataSegmentMover implements DataSegmentMover { @@ -118,7 +117,7 @@ public class S3DataSegmentMover implements DataSegmentMover { try { S3Utils.retryS3Operation( - (Callable) () -> { + () -> { final String copyMsg = StringUtils.format( "[s3://%s/%s] to [s3://%s/%s]", s3Bucket, @@ -228,7 +227,7 @@ public class S3DataSegmentMover implements DataSegmentMover private void deleteWithRetries(final String s3Bucket, final String s3Path) throws Exception { RetryUtils.retry( - (Callable) () -> { + () -> { try { s3Client.deleteObject(s3Bucket, s3Path); return null; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index 190dd313e37..4e2ae23e16b 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -52,7 +52,6 @@ import java.io.Reader; import java.io.Writer; import java.net.URI; import java.util.Map; -import java.util.concurrent.Callable; /** * A data segment puller that also hanldes URI data pulls. @@ -310,14 +309,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller { try { return S3Utils.retryS3Operation( - new Callable() - { - @Override - public Boolean call() throws Exception - { - return S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path); - } - } + () -> S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path) ); } catch (S3ServiceException | IOException e) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 333e7fae5dd..36731bdd813 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -41,7 +41,6 @@ import java.net.URI; import java.nio.file.Files; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher { @@ -100,39 +99,34 @@ public class S3DataSegmentPusher implements DataSegmentPusher try { return S3Utils.retryS3Operation( - new Callable() - { - @Override - public DataSegment call() throws Exception - { - S3Object toPush = new S3Object(zipOutFile); - putObject(config.getBucket(), s3Path, toPush, replaceExisting); + () -> { + S3Object toPush = new S3Object(zipOutFile); + putObject(config.getBucket(), s3Path, toPush, replaceExisting); - final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey())) - .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + final DataSegment outSegment = inSegment.withSize(indexSize) + .withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey())) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); - File descriptorFile = File.createTempFile("druid", "descriptor.json"); - // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in - // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment)); - S3Object descriptorObject = new S3Object(descriptorFile); + File descriptorFile = File.createTempFile("druid", "descriptor.json"); + // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in + // runtime, and because Guava deletes methods over time, that causes incompatibilities. + Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment)); + S3Object descriptorObject = new S3Object(descriptorFile); - putObject( - config.getBucket(), - S3Utils.descriptorPathForSegmentPath(s3Path), - descriptorObject, - replaceExisting - ); + putObject( + config.getBucket(), + S3Utils.descriptorPathForSegmentPath(s3Path), + descriptorObject, + replaceExisting + ); - log.info("Deleting zipped index File[%s]", zipOutFile); - zipOutFile.delete(); + log.info("Deleting zipped index File[%s]", zipOutFile); + zipOutFile.delete(); - log.info("Deleting descriptor file[%s]", descriptorFile); - descriptorFile.delete(); + log.info("Deleting descriptor file[%s]", descriptorFile); + descriptorFile.delete(); - return outSegment; - } + return outSegment; } ); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index ce2643cb1ef..3676d0942f4 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -23,7 +23,6 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.google.inject.Inject; - import io.druid.java.util.common.IOE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; @@ -36,7 +35,6 @@ import org.jets3t.service.model.StorageObject; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.Callable; /** * Provides task logs archived on S3. @@ -118,16 +116,11 @@ public class S3TaskLogs implements TaskLogs try { S3Utils.retryS3Operation( - new Callable() - { - @Override - public Void call() throws Exception - { - final StorageObject object = new StorageObject(logFile); - object.setKey(taskKey); - service.putObject(config.getS3Bucket(), object); - return null; - } + () -> { + final StorageObject object = new StorageObject(logFile); + object.setKey(taskKey); + service.putObject(config.getS3Bucket(), object); + return null; } ); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java index 9a0b2b92afe..8014ec8ac88 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java @@ -21,17 +21,14 @@ package io.druid.storage.s3; import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.data.SearchableVersionedDataFinder; import io.druid.java.util.common.RetryUtils; - import io.druid.java.util.common.StringUtils; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; import javax.annotation.Nullable; import java.net.URI; -import java.util.concurrent.Callable; import java.util.regex.Pattern; /** @@ -64,35 +61,30 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen { try { return RetryUtils.retry( - new Callable() - { - @Override - public URI call() throws Exception - { - final S3Coords coords = new S3Coords(checkURI(uri)); - long mostRecent = Long.MIN_VALUE; - URI latest = null; - S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null); - if (objects == null) { - return null; - } - for (S3Object storageObject : objects) { - storageObject.closeDataInputStream(); - String keyString = storageObject.getKey().substring(coords.path.length()); - if (keyString.startsWith("/")) { - keyString = keyString.substring(1); - } - if (pattern != null && !pattern.matcher(keyString).matches()) { - continue; - } - final long latestModified = storageObject.getLastModifiedDate().getTime(); - if (latestModified >= mostRecent) { - mostRecent = latestModified; - latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey())); - } - } - return latest; + () -> { + final S3Coords coords = new S3Coords(checkURI(uri)); + long mostRecent = Long.MIN_VALUE; + URI latest = null; + S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null); + if (objects == null) { + return null; } + for (S3Object storageObject : objects) { + storageObject.closeDataInputStream(); + String keyString = storageObject.getKey().substring(coords.path.length()); + if (keyString.startsWith("/")) { + keyString = keyString.substring(1); + } + if (pattern != null && !pattern.matcher(keyString).matches()) { + continue; + } + final long latestModified = storageObject.getLastModifiedDate().getTime(); + if (latestModified >= mostRecent) { + mostRecent = latestModified; + latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey())); + } + } + return latest; }, shouldRetryPredicate(), DEFAULT_RETRY_COUNT diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 5d97e79ab25..2a9372e96d0 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -22,8 +22,8 @@ package io.druid.storage.s3; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Throwables; - import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.RetryUtils.Task; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -32,7 +32,6 @@ import org.jets3t.service.model.StorageObject; import java.io.IOException; import java.util.Iterator; -import java.util.concurrent.Callable; /** * @@ -83,7 +82,7 @@ public class S3Utils * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ - public static T retryS3Operation(Callable f) throws Exception + public static T retryS3Operation(Task f) throws Exception { final int maxTries = 10; return RetryUtils.retry(f, S3RETRY, maxTries); @@ -147,15 +146,7 @@ public class S3Utils { try { return retryS3Operation( - new Callable() - { - @Override - public StorageObjectsChunk call() throws Exception - { - return s3Client.listObjectsChunked( - bucket, prefix, null, maxListingLength, priorLastKey); - } - } + () -> s3Client.listObjectsChunked(bucket, prefix, null, maxListingLength, priorLastKey) ); } catch (Exception e) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 7ce678045c7..16809561cbb 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -64,7 +64,6 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -160,18 +159,13 @@ public class JobHelper if (jarFile.getName().endsWith(".jar")) { try { RetryUtils.retry( - new Callable() - { - @Override - public Boolean call() throws Exception - { - if (isSnapshot(jarFile)) { - addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); - } else { - addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); - } - return true; + () -> { + if (isSnapshot(jarFile)) { + addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); + } else { + addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); } + return true; }, shouldRetryPredicate(), NUM_RETRIES @@ -607,50 +601,45 @@ public class JobHelper { try { return RetryUtils.retry( - new Callable() - { - @Override - public Boolean call() throws Exception - { - final boolean needRename; + () -> { + final boolean needRename; - if (outputFS.exists(finalIndexZipFilePath)) { - // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first - final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); - final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); + if (outputFS.exists(finalIndexZipFilePath)) { + // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first + final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); + final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); - if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() - || zipFile.getLen() != finalIndexZipFile.getLen()) { - log.info( - "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", - finalIndexZipFile.getPath(), - DateTimes.utc(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen(), - zipFile.getPath(), - DateTimes.utc(zipFile.getModificationTime()), - zipFile.getLen() - ); - outputFS.delete(finalIndexZipFilePath, false); - needRename = true; - } else { - log.info( - "File[%s / %s / %sB] existed and will be kept", - finalIndexZipFile.getPath(), - DateTimes.utc(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen() - ); - needRename = false; - } - } else { + if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() + || zipFile.getLen() != finalIndexZipFile.getLen()) { + log.info( + "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", + finalIndexZipFile.getPath(), + DateTimes.utc(finalIndexZipFile.getModificationTime()), + finalIndexZipFile.getLen(), + zipFile.getPath(), + DateTimes.utc(zipFile.getModificationTime()), + zipFile.getLen() + ); + outputFS.delete(finalIndexZipFilePath, false); needRename = true; - } - - if (needRename) { - log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); - return outputFS.rename(indexZipFilePath, finalIndexZipFilePath); } else { - return true; + log.info( + "File[%s / %s / %sB] existed and will be kept", + finalIndexZipFile.getPath(), + DateTimes.utc(finalIndexZipFile.getModificationTime()), + finalIndexZipFile.getLen() + ); + needRename = false; } + } else { + needRename = true; + } + + if (needRename) { + log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); + return outputFS.rename(indexZipFilePath, finalIndexZipFilePath); + } else { + return true; } }, FileUtils.IS_EXCEPTION, @@ -821,14 +810,7 @@ public class JobHelper { try { return RetryUtils.retry( - new Callable() - { - @Override - public Boolean call() throws Exception - { - return fs.delete(path, recursive); - } - }, + () -> fs.delete(path, recursive), shouldRetryPredicate(), NUM_RETRIES ); diff --git a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java index f47d97e4f53..82e5456abcc 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java @@ -80,33 +80,28 @@ public class OverlordResourceTestClient { try { return RetryUtils.retry( - new Callable() - { - @Override - public String call() throws Exception - { - StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(getIndexerURL() + "task")) - .setContent( - "application/json", - StringUtils.toUtf8(task) - ), - responseHandler - ).get(); - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while submitting task to indexer response [%s %s]", - response.getStatus(), - response.getContent() - ); - } - Map responseData = jsonMapper.readValue( - response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING + () -> { + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(getIndexerURL() + "task")) + .setContent( + "application/json", + StringUtils.toUtf8(task) + ), + responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while submitting task to indexer response [%s %s]", + response.getStatus(), + response.getContent() ); - String taskID = responseData.get("task"); - LOG.info("Submitted task with TaskID[%s]", taskID); - return taskID; } + Map responseData = jsonMapper.readValue( + response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING + ); + String taskID = responseData.get("task"); + LOG.info("Submitted task with TaskID[%s]", taskID); + return taskID; }, Predicates.alwaysTrue(), 5 diff --git a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java index fd7bc1e6ec5..40bbef91b82 100644 --- a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Enumeration; -import java.util.concurrent.Callable; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; @@ -156,14 +155,7 @@ public class CompressionUtils if (!cacheLocally) { try { return RetryUtils.retry( - new Callable() - { - @Override - public FileUtils.FileCopyResult call() throws Exception - { - return unzip(byteSource.openStream(), outDir); - } - }, + () -> unzip(byteSource.openStream(), outDir), shouldRetry, DEFAULT_RETRY_COUNT ); diff --git a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java index f282ead3a5b..992ab85eb92 100644 --- a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java @@ -24,12 +24,31 @@ import com.google.common.base.Predicate; import com.google.common.base.Throwables; import io.druid.java.util.common.logger.Logger; -import java.util.concurrent.Callable; +import javax.annotation.Nullable; import java.util.concurrent.ThreadLocalRandom; public class RetryUtils { public static final Logger log = new Logger(RetryUtils.class); + public static final long MAX_SLEEP_MILLIS = 60000; + public static final long BASE_SLEEP_MILLIS = 1000; + + public interface Task + { + /** + * This method is tried up to maxTries times unless it succeeds. + */ + T perform() throws Exception; + } + + public interface CleanupAfterFailure + { + /** + * This is called once {@link Task#perform()} fails. Retrying is stopped once this method throws an exception, + * so errors inside this method should be ignored if you don't want to stop retrying. + */ + void cleanup() throws Exception; + } /** * Retry an operation using fuzzy exponentially increasing backoff. The wait time after the nth failed attempt is @@ -49,22 +68,29 @@ public class RetryUtils * @throws Exception if maxTries is exhausted, or shouldRetry returns false */ public static T retry( - final Callable f, - Predicate shouldRetry, + final Task f, + final Predicate shouldRetry, final int quietTries, - final int maxTries + final int maxTries, + @Nullable final CleanupAfterFailure cleanupAfterFailure, + @Nullable final String messageOnRetry ) throws Exception { Preconditions.checkArgument(maxTries > 0, "maxTries > 0"); + Preconditions.checkArgument(quietTries >= 0, "quietTries >= 0"); int nTry = 0; + final int maxRetries = maxTries - 1; while (true) { try { nTry++; - return f.call(); + return f.perform(); } catch (Throwable e) { + if (cleanupAfterFailure != null) { + cleanupAfterFailure.cleanup(); + } if (nTry < maxTries && shouldRetry.apply(e)) { - awaitNextRetry(e, nTry, nTry <= quietTries); + awaitNextRetry(e, messageOnRetry, nTry, maxRetries, nTry <= quietTries); } else { Throwables.propagateIfInstanceOf(e, Exception.class); throw Throwables.propagate(e); @@ -73,23 +99,69 @@ public class RetryUtils } } - /** - * Same as {@link #retry(Callable, Predicate, int, int)} with quietTries = 0. - */ - public static T retry(final Callable f, Predicate shouldRetry, final int maxTries) throws Exception + public static T retry(final Task f, Predicate shouldRetry, final int maxTries) throws Exception { return retry(f, shouldRetry, 0, maxTries); } - private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException + public static T retry( + final Task f, + final Predicate shouldRetry, + final int quietTries, + final int maxTries + ) throws Exception { + return retry(f, shouldRetry, quietTries, maxTries, null, null); + } + public static T retry( + final Task f, + final Predicate shouldRetry, + final int maxTries, + final String messageOnRetry + ) throws Exception + { + return retry(f, shouldRetry, 0, maxTries, null, messageOnRetry); + } + + public static T retry( + final Task f, + final Predicate shouldRetry, + final CleanupAfterFailure onEachFailure, + final int maxTries, + final String messageOnRetry + ) throws Exception + { + return retry(f, shouldRetry, 0, maxTries, onEachFailure, messageOnRetry); + } + + public static void awaitNextRetry( + final Throwable e, + @Nullable final String messageOnRetry, + final int nTry, + final int maxRetries, + final boolean quiet + ) throws InterruptedException + { final long sleepMillis = nextRetrySleepMillis(nTry); + final String fullMessage; + + if (messageOnRetry == null) { + fullMessage = StringUtils.format("Retrying (%d of %d) in %,dms.", nTry, maxRetries, sleepMillis); + } else { + fullMessage = StringUtils.format( + "%s, retrying (%d of %d) in %,dms.", + messageOnRetry, + nTry, + maxRetries, + sleepMillis + ); + } if (quiet) { - log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); + log.debug(e, fullMessage); } else { - log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); + log.warn(e, fullMessage); } Thread.sleep(sleepMillis); @@ -97,10 +169,8 @@ public class RetryUtils public static long nextRetrySleepMillis(final int nTry) { - final long baseSleepMillis = 1000; - final long maxSleepMillis = 60000; final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2); - final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry - 1)) + final long sleepMillis = (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1)) * fuzzyMultiplier); return sleepMillis; } diff --git a/java-util/src/main/java/io/druid/java/util/common/StreamUtils.java b/java-util/src/main/java/io/druid/java/util/common/StreamUtils.java index 079d6c9c810..0fb9a68388a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/StreamUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/StreamUtils.java @@ -28,7 +28,6 @@ import com.google.common.io.ByteStreams; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.Callable; /** */ @@ -76,18 +75,13 @@ public class StreamUtils { try { return RetryUtils.retry( - new Callable() - { - @Override - public Long call() throws Exception - { - try (InputStream inputStream = byteSource.openStream()) { - try (OutputStream outputStream = byteSink.openStream()) { - final long retval = ByteStreams.copy(inputStream, outputStream); - // Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf - outputStream.flush(); - return retval; - } + () -> { + try (InputStream inputStream = byteSource.openStream()) { + try (OutputStream outputStream = byteSink.openStream()) { + final long retval = ByteStreams.copy(inputStream, outputStream); + // Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf + outputStream.flush(); + return retval; } } }, diff --git a/java-util/src/test/java/io/druid/java/util/common/RetryUtilsTest.java b/java-util/src/test/java/io/druid/java/util/common/RetryUtilsTest.java index 3087a7ec03d..9118dab919c 100644 --- a/java-util/src/test/java/io/druid/java/util/common/RetryUtilsTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/RetryUtilsTest.java @@ -24,7 +24,6 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; public class RetryUtilsTest @@ -43,14 +42,9 @@ public class RetryUtilsTest { final AtomicInteger count = new AtomicInteger(); final String result = RetryUtils.retry( - new Callable() - { - @Override - public String call() throws Exception - { - count.incrementAndGet(); - return "hey"; - } + () -> { + count.incrementAndGet(); + return "hey"; }, isTransient, 2 @@ -66,14 +60,9 @@ public class RetryUtilsTest boolean threwExpectedException = false; try { RetryUtils.retry( - new Callable() - { - @Override - public String call() throws Exception - { - count.incrementAndGet(); - throw new IOException("what"); - } + () -> { + count.incrementAndGet(); + throw new IOException("what"); }, isTransient, 2 @@ -91,16 +80,11 @@ public class RetryUtilsTest { final AtomicInteger count = new AtomicInteger(); final String result = RetryUtils.retry( - new Callable() - { - @Override - public String call() throws Exception - { - if (count.incrementAndGet() >= 2) { - return "hey"; - } else { - throw new IOException("what"); - } + () -> { + if (count.incrementAndGet() >= 2) { + return "hey"; + } else { + throw new IOException("what"); } }, isTransient, @@ -117,16 +101,11 @@ public class RetryUtilsTest boolean threwExpectedException = false; try { RetryUtils.retry( - new Callable() - { - @Override - public String call() throws Exception - { - if (count.incrementAndGet() >= 2) { - return "hey"; - } else { - throw new IOException("uhh"); - } + () -> { + if (count.incrementAndGet() >= 2) { + return "hey"; + } else { + throw new IOException("uhh"); } }, isTransient, @@ -139,5 +118,4 @@ public class RetryUtilsTest Assert.assertTrue("threw expected exception", threwExpectedException); Assert.assertEquals("count", 1, count.get()); } - } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java index e648e928f6e..517f40cc2d8 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java @@ -47,7 +47,6 @@ import java.sql.SQLTransientException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; public abstract class SQLMetadataConnector implements MetadataStorageConnector { @@ -127,16 +126,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector final Predicate myShouldRetry ) { - final Callable call = new Callable() - { - @Override - public T call() throws Exception - { - return getDBI().withHandle(callback); - } - }; try { - return RetryUtils.retry(call, myShouldRetry, DEFAULT_MAX_TRIES); + return RetryUtils.retry(() -> getDBI().withHandle(callback), myShouldRetry, DEFAULT_MAX_TRIES); } catch (Exception e) { throw Throwables.propagate(e); @@ -150,16 +141,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector public T retryTransaction(final TransactionCallback callback, final int quietTries, final int maxTries) { - final Callable call = new Callable() - { - @Override - public T call() throws Exception - { - return getDBI().inTransaction(callback); - } - }; try { - return RetryUtils.retry(call, shouldRetry, quietTries, maxTries); + return RetryUtils.retry(() -> getDBI().inTransaction(callback), shouldRetry, quietTries, maxTries); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java b/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java index 3ba917a21eb..5b94017c5ac 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java +++ b/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java @@ -20,7 +20,6 @@ package io.druid.segment.loading; import com.google.common.base.Throwables; - import io.druid.data.SearchableVersionedDataFinder; import io.druid.java.util.common.RetryUtils; @@ -31,7 +30,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.nio.file.Path; -import java.util.concurrent.Callable; import java.util.regex.Pattern; /** @@ -81,17 +79,10 @@ public class LocalFileTimestampVersionFinder extends LocalDataSegmentPuller final File file = new File(uri); try { return RetryUtils.retry( - new Callable() - { - @Override - public URI call() throws Exception - { - return mostRecentInDir( - file.isDirectory() ? file.toPath() : file.getParentFile().toPath(), - pattern - ); - } - }, + () -> mostRecentInDir( + file.isDirectory() ? file.toPath() : file.getParentFile().toPath(), + pattern + ), shouldRetryPredicate(), DEFAULT_RETRY_COUNT ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 204598ac14b..cf3ecea6fba 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -21,19 +21,27 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import org.apache.http.HttpHeaders; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URLConnection; import java.util.Collection; import java.util.List; import java.util.Objects; public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { + private static final Logger log = new Logger(HttpFirehoseFactory.class); private final List uris; + private final boolean supportContentRange; @JsonCreator public HttpFirehoseFactory( @@ -43,10 +51,15 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory 0, "Empty URIs"); + final URLConnection connection = uris.get(0).toURL().openConnection(); + final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES); + this.supportContentRange = acceptRanges != null && acceptRanges.equalsIgnoreCase("bytes"); } @JsonProperty @@ -67,6 +80,28 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory-". + // See https://tools.ietf.org/html/rfc7233#section-2.1 + connection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start)); + return connection.getInputStream(); + } else { + log.warn( + "Since the input source doesn't support range requests, the object input stream is opened from the start and " + + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" + + " a lot." + ); + final InputStream in = openObjectStream(object); + in.skip(start); + return in; + } + } + @Override protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException { @@ -105,4 +140,10 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory getRetryCondition() + { + return e -> e instanceof IOException; + } }