diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 6309ffad852..470e04060aa 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; @@ -63,7 +64,7 @@ public interface InputEntity * Opens an {@link InputStream} on the input entity directly. * This is the basic way to read the given entity. * - * @see #fetch as an alternative way to read data. + * @see #fetch */ InputStream open() throws IOException; @@ -89,7 +90,7 @@ public interface InputEntity is, tempFile, fetchBuffer, - getFetchRetryCondition(), + getRetryCondition(), DEFAULT_MAX_NUM_FETCH_TRIES, StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath()) ); @@ -114,7 +115,12 @@ public interface InputEntity } /** - * {@link #fetch} will retry during the fetch if it sees an exception matching to the returned predicate. + * Returns a retry condition that the caller should retry on. + * The returned condition should be used when reading data from this InputEntity such as in {@link #fetch} + * or {@link RetryingInputEntity}. */ - Predicate getFetchRetryCondition(); + default Predicate getRetryCondition() + { + return Predicates.alwaysFalse(); + } } diff --git a/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java new file mode 100644 index 00000000000..6a47f0b5593 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.base.Predicate; +import org.apache.druid.data.input.impl.RetryingInputStream; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.RetryUtils; + +import java.io.IOException; +import java.io.InputStream; + +public interface RetryingInputEntity extends InputEntity +{ + @Override + default InputStream open() throws IOException + { + return new RetryingInputStream<>( + this, + new RetryingInputEntityOpenFunction(), + getRetryCondition(), + RetryUtils.DEFAULT_MAX_TRIES + ); + } + + /** + * Directly opens an {@link InputStream} on the input entity. + */ + default InputStream readFromStart() throws IOException + { + return readFrom(0); + } + + /** + * Directly opens an {@link InputStream} starting at the given offset on the input entity. + * + * @param offset an offset to start reading from. A non-negative integer counting + * the number of bytes from the beginning of the entity + */ + InputStream readFrom(long offset) throws IOException; + + @Override + Predicate getRetryCondition(); + + class RetryingInputEntityOpenFunction implements ObjectOpenFunction + { + @Override + public InputStream open(RetryingInputEntity object) throws IOException + { + return object.readFromStart(); + } + + @Override + public InputStream open(RetryingInputEntity object, long start) throws IOException + { + return object.readFrom(start); + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java b/core/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java index 52c187ebdad..13376912b9a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java @@ -19,8 +19,6 @@ package org.apache.druid.data.input.impl; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import org.apache.druid.data.input.InputEntity; import org.apache.druid.io.ByteBufferInputStream; @@ -60,10 +58,4 @@ public class ByteEntity implements InputEntity { return new ByteBufferInputStream(buffer); } - - @Override - public Predicate getFetchRetryCondition() - { - return Predicates.alwaysFalse(); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java b/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java index d48345710a3..ec57dc6283e 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FileEntity.java @@ -19,8 +19,6 @@ package org.apache.druid.data.input.impl; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import org.apache.druid.data.input.InputEntity; import org.apache.druid.utils.CompressionUtils; @@ -69,10 +67,4 @@ public class FileEntity implements InputEntity { return CompressionUtils.decompress(new FileInputStream(file), file.getName()); } - - @Override - public Predicate getFetchRetryCondition() - { - return Predicates.alwaysFalse(); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index 220b5c99dc4..22ce64afb8c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -21,8 +21,11 @@ package org.apache.druid.data.input.impl; import com.google.common.base.Predicate; import com.google.common.base.Strings; -import org.apache.druid.data.input.InputEntity; +import com.google.common.net.HttpHeaders; +import org.apache.druid.data.input.RetryingInputEntity; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.utils.CompressionUtils; @@ -33,8 +36,10 @@ import java.net.URI; import java.net.URLConnection; import java.util.Base64; -public class HttpEntity implements InputEntity +public class HttpEntity implements RetryingInputEntity { + private static final Logger LOG = new Logger(HttpEntity.class); + private final URI uri; @Nullable private final String httpAuthenticationUsername; @@ -59,29 +64,52 @@ public class HttpEntity implements InputEntity } @Override - public InputStream open() throws IOException + public InputStream readFrom(long offset) throws IOException { return CompressionUtils.decompress( - openURLConnection(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider).getInputStream(), + openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset), uri.toString() ); } @Override - public Predicate getFetchRetryCondition() + public Predicate getRetryCondition() { return t -> t instanceof IOException; } - public static URLConnection openURLConnection(URI object, String userName, PasswordProvider passwordProvider) + public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset) throws IOException { - URLConnection urlConnection = object.toURL().openConnection(); + final URLConnection urlConnection = object.toURL().openConnection(); if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) { String userPass = userName + ":" + passwordProvider.getPassword(); String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass)); urlConnection.setRequestProperty("Authorization", basicAuthString); } - return urlConnection; + final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES); + final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges); + if (withRanges && offset > 0) { + // Set header for range request. + // Since we need to set only the start offset, the header is "bytes=-". + // See https://tools.ietf.org/html/rfc7233#section-2.1 + urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", offset)); + return urlConnection.getInputStream(); + } else { + if (!withRanges && offset > 0) { + LOG.warn( + "Since the input source doesn't support range requests, the object input stream is opened from the start and " + + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" + + " a lot." + ); + } + final InputStream in = urlConnection.getInputStream(); + final long skipped = in.skip(offset); + if (skipped != offset) { + throw new ISE("Requested to skip [%s] bytes, but actual number of bytes skipped is [%s]", offset, skipped); + } + return in; + } + } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java b/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java similarity index 95% rename from core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java rename to core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java index af401e9eb12..89532819047 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStream.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java @@ -17,11 +17,13 @@ * under the License. */ -package org.apache.druid.data.input.impl.prefetch; +package org.apache.druid.data.input.impl; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.io.CountingInputStream; +import org.apache.druid.data.input.impl.prefetch.Fetcher; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -35,7 +37,7 @@ import java.net.SocketException; * * @param object type */ -class RetryingInputStream extends InputStream +public class RetryingInputStream extends InputStream { private static final Logger log = new Logger(RetryingInputStream.class); @@ -47,7 +49,7 @@ class RetryingInputStream extends InputStream private CountingInputStream delegate; private long startOffset; - RetryingInputStream( + public RetryingInputStream( T object, ObjectOpenFunction objectOpenFunction, Predicate retryCondition, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java index be192bb901b..7d869dacf71 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/FileFetcher.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl.prefetch; import com.google.common.base.Predicate; +import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java index b467997718a..50eefae7366 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java @@ -28,6 +28,7 @@ import org.apache.commons.io.LineIterator; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory; import org.apache.druid.data.input.impl.FileIteratingFirehose; +import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; diff --git a/core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java b/core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java index 9d16bc60749..37648bf4725 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java @@ -32,6 +32,7 @@ public class RetryUtils public static final Logger log = new Logger(RetryUtils.class); public static final long MAX_SLEEP_MILLIS = 60000; public static final long BASE_SLEEP_MILLIS = 1000; + public static final int DEFAULT_MAX_TRIES = 10; public interface Task { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java index a01b092915d..fb32132316a 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/RetryingInputStreamTest.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl.prefetch; import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; +import org.apache.druid.data.input.impl.RetryingInputStream; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 5a3256eb374..6144857d302 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -20,7 +20,7 @@ package org.apache.druid.data.input.google; import com.google.common.base.Predicate; -import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; @@ -31,7 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -public class GoogleCloudStorageEntity implements InputEntity +public class GoogleCloudStorageEntity implements RetryingInputEntity { private final GoogleStorage storage; private final URI uri; @@ -50,17 +50,17 @@ public class GoogleCloudStorageEntity implements InputEntity } @Override - public InputStream open() throws IOException + public InputStream readFrom(long offset) throws IOException { // Get data of the given object and open an input stream final String bucket = uri.getAuthority(); final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); - return CompressionUtils.decompress(byteSource.openStream(), uri.getPath()); + return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath()); } @Override - public Predicate getFetchRetryCondition() + public Predicate getRetryCondition() { return GoogleUtils.GOOGLE_RETRY; } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 1bc99ad8b4b..24cc882a868 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -96,7 +96,10 @@ public class GoogleCloudStorageInputSource extends AbstractInputSource implement return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(storage, split.get())), + createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity( + storage, + split.get() + )), temporaryDirectory ); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java index 039ca23f8cf..9a3786ed3dc 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java @@ -20,10 +20,11 @@ package org.apache.druid.inputsource.hdfs; import com.google.common.base.Predicate; -import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,7 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -public class HdfsInputEntity implements InputEntity +public class HdfsInputEntity implements RetryingInputEntity { private final Configuration conf; private final Path path; @@ -49,14 +50,16 @@ public class HdfsInputEntity implements InputEntity } @Override - public InputStream open() throws IOException + public InputStream readFrom(long offset) throws IOException { - FileSystem fs = path.getFileSystem(conf); - return CompressionUtils.decompress(fs.open(path), path.getName()); + final FileSystem fs = path.getFileSystem(conf); + final FSDataInputStream inputStream = fs.open(path); + inputStream.seek(offset); + return CompressionUtils.decompress(inputStream, path.getName()); } @Override - public Predicate getFetchRetryCondition() + public Predicate getRetryCondition() { return HdfsDataSegmentPuller.RETRY_PREDICATE; } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 71ceba67446..2f4db984f5f 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -82,8 +82,7 @@ public class S3Utils */ public static T retryS3Operation(Task f) throws Exception { - final int maxTries = 10; - return RetryUtils.retry(f, S3RETRY, maxTries); + return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES); } static boolean isObjectInBucketIgnoringPermission( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 1f24236577c..21433022fcd 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -36,7 +36,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -180,7 +180,7 @@ public class OverlordResourceTestClient public void waitUntilTaskCompletes(final String taskID, final int millisEach, final int numTimes) { - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( new Callable() { @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/RetryUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java similarity index 96% rename from integration-tests/src/main/java/org/apache/druid/testing/utils/RetryUtil.java rename to integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java index 49336f94200..356cfdd4fe2 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/RetryUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java @@ -25,10 +25,10 @@ import org.apache.druid.java.util.common.logger.Logger; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -public class RetryUtil +public class ITRetryUtil { - private static final Logger LOG = new Logger(RetryUtil.class); + private static final Logger LOG = new Logger(ITRetryUtil.class); public static final int DEFAULT_RETRY_COUNT = 30; diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ServerDiscoveryUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ServerDiscoveryUtil.java index ea43f8e3071..56b475f102f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ServerDiscoveryUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ServerDiscoveryUtil.java @@ -49,7 +49,7 @@ public class ServerDiscoveryUtil public static void waitUntilInstanceReady(final ServerDiscoverySelector serviceProvider, String instanceType) { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( new Callable() { @Override diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java index bc2e4f5e987..d6b244813bd 100644 --- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java +++ b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java @@ -31,7 +31,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.testng.internal.IConfiguration; @@ -118,7 +118,7 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory public void waitUntilInstanceReady(final HttpClient client, final String host) { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { try { StatusResponseHolder response = client.go( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java index f96b8fd7383..38fbaf19ad8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java @@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractIndexerTest; import org.testng.annotations.AfterClass; @@ -76,7 +76,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest final String taskID = indexer.submitTask(indexerSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID, 10000, 120); - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( () -> coordinator.areSegmentsLoaded(BATCH_DATASOURCE), true, 20000, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 2a7e0f5956f..aae59573402 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; @@ -229,7 +229,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest // this method could return too early because the coordinator is merely reporting that all the // original segments have loaded. if (waitForNewVersion) { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments( coordinator.getAvailableSegments(dataSourceName) @@ -246,7 +246,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest ); } - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load" ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index e3db0cff0d5..f4af31ceb86 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -143,7 +143,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes indexer.waitUntilTaskCompletes(taskID); // task should complete only after the segments are loaded by historical node - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( () -> coordinator.areSegmentsLoaded(fullDatasourceName), true, 10000, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 30b7439bb7c..8723f75c6b7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -28,7 +28,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.joda.time.Interval; @@ -85,7 +85,7 @@ public abstract class AbstractIndexerTest waitForAllTasksToComplete(); Interval interval = Intervals.of(start + "/" + end); coordinator.unloadSegmentsForDataSource(dataSource); - RetryUtil.retryUntilFalse( + ITRetryUtil.retryUntilFalse( new Callable() { @Override @@ -101,7 +101,7 @@ public abstract class AbstractIndexerTest protected void waitForAllTasksToComplete() { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { int numTasks = indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index 04b3a9f7499..87a2cceece2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -35,7 +35,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -257,7 +257,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest // wait for all kafka indexing tasks to finish LOG.info("Waiting for all kafka indexing tasks to finish"); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks().size()) == 0, @@ -266,7 +266,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest // wait for segments to be handed off try { - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( () -> coordinator.areSegmentsLoaded(fullDatasourceName), true, 10000, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 477dd78a90c..b1ca281fde4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; @@ -101,7 +101,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); @@ -116,7 +116,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest LOG.info("TaskID for compaction task %s", taskID); indexer.waitUntilTaskCompletes(taskID); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction" ); @@ -124,7 +124,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void checkCompactionFinished(int numExpectedSegments) { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size(); LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); @@ -136,7 +136,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void checkCompactionIntervals(List expectedIntervals) { - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { final List intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsAfterCompaction.sort(null); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java index 853b1f0dc21..19e0626bac1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java @@ -28,7 +28,7 @@ import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.BeforeSuite; @@ -104,7 +104,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java index 4dd49b9ac6c..4bf68ad8834 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java @@ -36,7 +36,7 @@ import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.ServerDiscoveryUtil; import org.apache.druid.tests.TestNGGroup; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -113,7 +113,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest } // wait until all events are ingested - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( () -> { for (int i = 0; i < numTasks; i++) { final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01"); @@ -157,7 +157,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest // task should complete only after the segments are loaded by historical node for (int i = 0; i < numTasks; i++) { final int taskNum = i; - RetryUtil.retryUntil( + ITRetryUtil.retryUntil( () -> coordinator.areSegmentsLoaded(fullDatasourceName + taskNum), true, 10000, @@ -204,7 +204,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest LOG.info("Event Receiver Found at host [%s]", host); LOG.info("Checking worker /status/health for [%s]", host); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> { try { StatusResponseHolder response = httpClient.go( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSystemTableQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSystemTableQueryTest.java index 83850b34939..6beb9566624 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSystemTableQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSystemTableQueryTest.java @@ -23,7 +23,7 @@ import com.google.inject.Inject; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.BeforeMethod; @@ -49,12 +49,12 @@ public class ITSystemTableQueryTest public void before() { // ensure that wikipedia segments are loaded completely - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load" ); // ensure that the twitter segments are loaded completely - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load" ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITTwitterQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITTwitterQueryTest.java index ad47348ff48..402b1bbc731 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITTwitterQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITTwitterQueryTest.java @@ -22,7 +22,7 @@ package org.apache.druid.tests.query; import com.google.inject.Inject; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.BeforeMethod; @@ -44,7 +44,7 @@ public class ITTwitterQueryTest public void before() { // ensure that the twitter segments are loaded completely - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load" ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index f0af551dd29..700a35e1a47 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -22,7 +22,7 @@ package org.apache.druid.tests.query; import com.google.inject.Inject; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.BeforeMethod; @@ -48,11 +48,11 @@ public class ITWikipediaQueryTest { // ensure that wikipedia segments are loaded completely - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load" ); coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE); - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load" ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java index 7d9b864264d..e074b2f64c1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java @@ -45,7 +45,7 @@ import org.apache.druid.sql.avatica.DruidAvaticaHandler; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; +import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -123,7 +123,7 @@ public class ITBasicAuthConfigurationTest public void before() { // ensure that auth_test segments are loaded completely, we use them for testing system schema tables - RetryUtil.retryUntilTrue( + ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded("auth_test"), "auth_test segment load" ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 3d536d3b00c..7ff5e957d66 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -23,13 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; -import com.google.common.net.HttpHeaders; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.HttpEntity; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.utils.CompressionUtils; @@ -38,7 +36,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.net.URLConnection; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -108,31 +105,7 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory 0) { - // Set header for range request. - // Since we need to set only the start offset, the header is "bytes=-". - // See https://tools.ietf.org/html/rfc7233#section-2.1 - urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start)); - return urlConnection.getInputStream(); - } else { - if (!withRanges && start > 0) { - log.warn( - "Since the input source doesn't support range requests, the object input stream is opened from the start and " - + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" - + " a lot." - ); - } - final InputStream in = urlConnection.getInputStream(); - in.skip(start); - return in; - } + return HttpEntity.openInputStream(object, httpAuthenticationUsername, httpAuthenticationPasswordProvider, start); } @Override