RetryingInputEntity to retry on transient errors (#8923)

* RetryingInputEntity to retry on transient errors

* fix some javadoc and httpEntity

* Make it interface

* Javadoc for offset
This commit is contained in:
Jihoon Son 2019-11-21 21:32:18 -08:00 committed by GitHub
parent dc6178d1f2
commit 934547a215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 192 additions and 114 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -63,7 +64,7 @@ public interface InputEntity
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
* @see #fetch as an alternative way to read data.
* @see #fetch
*/
InputStream open() throws IOException;
@ -89,7 +90,7 @@ public interface InputEntity
is,
tempFile,
fetchBuffer,
getFetchRetryCondition(),
getRetryCondition(),
DEFAULT_MAX_NUM_FETCH_TRIES,
StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath())
);
@ -114,7 +115,12 @@ public interface InputEntity
}
/**
* {@link #fetch} will retry during the fetch if it sees an exception matching to the returned predicate.
* Returns a retry condition that the caller should retry on.
* The returned condition should be used when reading data from this InputEntity such as in {@link #fetch}
* or {@link RetryingInputEntity}.
*/
Predicate<Throwable> getFetchRetryCondition();
default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;
import java.io.IOException;
import java.io.InputStream;
public interface RetryingInputEntity extends InputEntity
{
@Override
default InputStream open() throws IOException
{
return new RetryingInputStream<>(
this,
new RetryingInputEntityOpenFunction(),
getRetryCondition(),
RetryUtils.DEFAULT_MAX_TRIES
);
}
/**
* Directly opens an {@link InputStream} on the input entity.
*/
default InputStream readFromStart() throws IOException
{
return readFrom(0);
}
/**
* Directly opens an {@link InputStream} starting at the given offset on the input entity.
*
* @param offset an offset to start reading from. A non-negative integer counting
* the number of bytes from the beginning of the entity
*/
InputStream readFrom(long offset) throws IOException;
@Override
Predicate<Throwable> getRetryCondition();
class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
{
@Override
public InputStream open(RetryingInputEntity object) throws IOException
{
return object.readFromStart();
}
@Override
public InputStream open(RetryingInputEntity object, long start) throws IOException
{
return object.readFrom(start);
}
}
}

View File

@ -19,8 +19,6 @@
package org.apache.druid.data.input.impl;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.io.ByteBufferInputStream;
@ -60,10 +58,4 @@ public class ByteEntity implements InputEntity
{
return new ByteBufferInputStream(buffer);
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return Predicates.alwaysFalse();
}
}

View File

@ -19,8 +19,6 @@
package org.apache.druid.data.input.impl;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.utils.CompressionUtils;
@ -69,10 +67,4 @@ public class FileEntity implements InputEntity
{
return CompressionUtils.decompress(new FileInputStream(file), file.getName());
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return Predicates.alwaysFalse();
}
}

View File

@ -21,8 +21,11 @@ package org.apache.druid.data.input.impl;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import org.apache.druid.data.input.InputEntity;
import com.google.common.net.HttpHeaders;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
@ -33,8 +36,10 @@ import java.net.URI;
import java.net.URLConnection;
import java.util.Base64;
public class HttpEntity implements InputEntity
public class HttpEntity implements RetryingInputEntity
{
private static final Logger LOG = new Logger(HttpEntity.class);
private final URI uri;
@Nullable
private final String httpAuthenticationUsername;
@ -59,29 +64,52 @@ public class HttpEntity implements InputEntity
}
@Override
public InputStream open() throws IOException
public InputStream readFrom(long offset) throws IOException
{
return CompressionUtils.decompress(
openURLConnection(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider).getInputStream(),
openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset),
uri.toString()
);
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
public Predicate<Throwable> getRetryCondition()
{
return t -> t instanceof IOException;
}
public static URLConnection openURLConnection(URI object, String userName, PasswordProvider passwordProvider)
public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset)
throws IOException
{
URLConnection urlConnection = object.toURL().openConnection();
final URLConnection urlConnection = object.toURL().openConnection();
if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) {
String userPass = userName + ":" + passwordProvider.getPassword();
String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
urlConnection.setRequestProperty("Authorization", basicAuthString);
}
return urlConnection;
final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges);
if (withRanges && offset > 0) {
// Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", offset));
return urlConnection.getInputStream();
} else {
if (!withRanges && offset > 0) {
LOG.warn(
"Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+ " a lot."
);
}
final InputStream in = urlConnection.getInputStream();
final long skipped = in.skip(offset);
if (skipped != offset) {
throw new ISE("Requested to skip [%s] bytes, but actual number of bytes skipped is [%s]", offset, skipped);
}
return in;
}
}
}

View File

@ -17,11 +17,13 @@
* under the License.
*/
package org.apache.druid.data.input.impl.prefetch;
package org.apache.druid.data.input.impl;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.io.CountingInputStream;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -35,7 +37,7 @@ import java.net.SocketException;
*
* @param <T> object type
*/
class RetryingInputStream<T> extends InputStream
public class RetryingInputStream<T> extends InputStream
{
private static final Logger log = new Logger(RetryingInputStream.class);
@ -47,7 +49,7 @@ class RetryingInputStream<T> extends InputStream
private CountingInputStream delegate;
private long startOffset;
RetryingInputStream(
public RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
Predicate<Throwable> retryCondition,

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl.prefetch;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;

View File

@ -28,6 +28,7 @@ import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.FileIteratingFirehose;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;

View File

@ -32,6 +32,7 @@ public class RetryUtils
public static final Logger log = new Logger(RetryUtils.class);
public static final long MAX_SLEEP_MILLIS = 60000;
public static final long BASE_SLEEP_MILLIS = 1000;
public static final int DEFAULT_MAX_TRIES = 10;
public interface Task<T>
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl.prefetch;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

View File

@ -20,7 +20,7 @@
package org.apache.druid.data.input.google;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
@ -31,7 +31,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class GoogleCloudStorageEntity implements InputEntity
public class GoogleCloudStorageEntity implements RetryingInputEntity
{
private final GoogleStorage storage;
private final URI uri;
@ -50,17 +50,17 @@ public class GoogleCloudStorageEntity implements InputEntity
}
@Override
public InputStream open() throws IOException
public InputStream readFrom(long offset) throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri);
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return CompressionUtils.decompress(byteSource.openStream(), uri.getPath());
return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath());
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
public Predicate<Throwable> getRetryCondition()
{
return GoogleUtils.GOOGLE_RETRY;
}

View File

@ -96,7 +96,10 @@ public class GoogleCloudStorageInputSource extends AbstractInputSource implement
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(storage, split.get())),
createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(
storage,
split.get()
)),
temporaryDirectory
);
}

View File

@ -20,10 +20,11 @@
package org.apache.druid.inputsource.hdfs;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -31,7 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class HdfsInputEntity implements InputEntity
public class HdfsInputEntity implements RetryingInputEntity
{
private final Configuration conf;
private final Path path;
@ -49,14 +50,16 @@ public class HdfsInputEntity implements InputEntity
}
@Override
public InputStream open() throws IOException
public InputStream readFrom(long offset) throws IOException
{
FileSystem fs = path.getFileSystem(conf);
return CompressionUtils.decompress(fs.open(path), path.getName());
final FileSystem fs = path.getFileSystem(conf);
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(offset);
return CompressionUtils.decompress(inputStream, path.getName());
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
public Predicate<Throwable> getRetryCondition()
{
return HdfsDataSegmentPuller.RETRY_PREDICATE;
}

View File

@ -82,8 +82,7 @@ public class S3Utils
*/
public static <T> T retryS3Operation(Task<T> f) throws Exception
{
final int maxTries = 10;
return RetryUtils.retry(f, S3RETRY, maxTries);
return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
static boolean isObjectInBucketIgnoringPermission(

View File

@ -36,7 +36,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -180,7 +180,7 @@ public class OverlordResourceTestClient
public void waitUntilTaskCompletes(final String taskID, final int millisEach, final int numTimes)
{
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override

View File

@ -25,10 +25,10 @@ import org.apache.druid.java.util.common.logger.Logger;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class RetryUtil
public class ITRetryUtil
{
private static final Logger LOG = new Logger(RetryUtil.class);
private static final Logger LOG = new Logger(ITRetryUtil.class);
public static final int DEFAULT_RETRY_COUNT = 30;

View File

@ -49,7 +49,7 @@ public class ServerDiscoveryUtil
public static void waitUntilInstanceReady(final ServerDiscoverySelector serviceProvider, String instanceType)
{
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override

View File

@ -31,7 +31,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.internal.IConfiguration;
@ -118,7 +118,7 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
public void waitUntilInstanceReady(final HttpClient client, final String host)
{
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
try {
StatusResponseHolder response = client.go(

View File

@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.testng.annotations.AfterClass;
@ -76,7 +76,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest
final String taskID = indexer.submitTask(indexerSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID, 10000, 120);
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(BATCH_DATASOURCE),
true,
20000,

View File

@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -229,7 +229,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
// this method could return too early because the coordinator is merely reporting that all the
// original segments have loaded.
if (waitForNewVersion) {
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(
coordinator.getAvailableSegments(dataSourceName)
@ -246,7 +246,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
);
}
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
);
}

View File

@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@ -143,7 +143,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
indexer.waitUntilTaskCompletes(taskID);
// task should complete only after the segments are loaded by historical node
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
true,
10000,

View File

@ -28,7 +28,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
@ -85,7 +85,7 @@ public abstract class AbstractIndexerTest
waitForAllTasksToComplete();
Interval interval = Intervals.of(start + "/" + end);
coordinator.unloadSegmentsForDataSource(dataSource);
RetryUtil.retryUntilFalse(
ITRetryUtil.retryUntilFalse(
new Callable<Boolean>()
{
@Override
@ -101,7 +101,7 @@ public abstract class AbstractIndexerTest
protected void waitForAllTasksToComplete()
{
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
int numTasks = indexer.getPendingTasks().size() +
indexer.getRunningTasks().size() +

View File

@ -35,7 +35,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -257,7 +257,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
// wait for all kafka indexing tasks to finish
LOG.info("Waiting for all kafka indexing tasks to finish");
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> (indexer.getPendingTasks().size()
+ indexer.getRunningTasks().size()
+ indexer.getWaitingTasks().size()) == 0,
@ -266,7 +266,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
// wait for segments to be handed off
try {
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
true,
10000,

View File

@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Guice;
@ -101,7 +101,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Load"
);
@ -116,7 +116,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
@ -124,7 +124,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void checkCompactionFinished(int numExpectedSegments)
{
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size();
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
@ -136,7 +136,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void checkCompactionIntervals(List<String> expectedIntervals)
{
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsAfterCompaction.sort(null);

View File

@ -28,7 +28,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeSuite;
@ -104,7 +104,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load"
);
}

View File

@ -36,7 +36,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -113,7 +113,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
}
// wait until all events are ingested
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
() -> {
for (int i = 0; i < numTasks; i++) {
final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
@ -157,7 +157,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
// task should complete only after the segments are loaded by historical node
for (int i = 0; i < numTasks; i++) {
final int taskNum = i;
RetryUtil.retryUntil(
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(fullDatasourceName + taskNum),
true,
10000,
@ -204,7 +204,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
LOG.info("Event Receiver Found at host [%s]", host);
LOG.info("Checking worker /status/health for [%s]", host);
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> {
try {
StatusResponseHolder response = httpClient.go(

View File

@ -23,7 +23,7 @@ import com.google.inject.Inject;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeMethod;
@ -49,12 +49,12 @@ public class ITSystemTableQueryTest
public void before()
{
// ensure that wikipedia segments are loaded completely
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
// ensure that the twitter segments are loaded completely
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load"
);
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.tests.query;
import com.google.inject.Inject;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeMethod;
@ -44,7 +44,7 @@ public class ITTwitterQueryTest
public void before()
{
// ensure that the twitter segments are loaded completely
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load"
);
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.tests.query;
import com.google.inject.Inject;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeMethod;
@ -48,11 +48,11 @@ public class ITWikipediaQueryTest
{
// ensure that wikipedia segments are loaded completely
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
}

View File

@ -45,7 +45,7 @@ import org.apache.druid.sql.avatica.DruidAvaticaHandler;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -123,7 +123,7 @@ public class ITBasicAuthConfigurationTest
public void before()
{
// ensure that auth_test segments are loaded completely, we use them for testing system schema tables
RetryUtil.retryUntilTrue(
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded("auth_test"), "auth_test segment load"
);
}

View File

@ -23,13 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.net.HttpHeaders;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.HttpEntity;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
@ -38,7 +36,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URLConnection;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -108,31 +105,7 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@Override
protected InputStream openObjectStream(URI object, long start) throws IOException
{
URLConnection urlConnection = HttpEntity.openURLConnection(
object,
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
);
final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges);
if (withRanges && start > 0) {
// Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
return urlConnection.getInputStream();
} else {
if (!withRanges && start > 0) {
log.warn(
"Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+ " a lot."
);
}
final InputStream in = urlConnection.getInputStream();
in.skip(start);
return in;
}
return HttpEntity.openInputStream(object, httpAuthenticationUsername, httpAuthenticationPasswordProvider, start);
}
@Override