Move retries into DataSegmentPusher implementations. (#15938)

* Move retries into DataSegmentPusher implementations.

The individual implementations know better when they should and should
not retry. They can also generate better error messages.

The inspiration for this patch was a situation where EntityTooLarge was
generated by the S3DataSegmentPusher, and retried uselessly by the
retry harness in PartialSegmentMergeTask.

* Fix missing var.

* Adjust imports.

* Tests, comments, style.

* Remove unused import.
This commit is contained in:
Gian Merlino 2024-03-04 10:36:21 -08:00 committed by GitHub
parent ced8be3044
commit 930655ff18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 342 additions and 135 deletions

View File

@ -49,7 +49,9 @@ You may need to consider the followings to optimize your segments.
which in turn means how well the query execution is parallelized.
- Segment byte size: it's recommended to set 300 ~ 700MB. If this value
doesn't match with the "number of rows per segment", please consider optimizing
number of rows per segment rather than this value.
number of rows per segment rather than this value. Note that certain deep storage
implementations also impose an upper limit on segment size. For example, S3 deep
storage imposes an upper limit of 5GB.
:::info
The above recommendation works in general, but the optimal setting can

View File

@ -23,11 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
@ -113,7 +115,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
uniquePrefix
);
return pushToFilePath(inDir, segment, outIndexFilePathSuffix);
return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix);
}
@Override
@ -125,10 +127,28 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
storageDirSuffix.replace(':', '_'),
segment.getShardSpec().getPartitionNum()
);
return pushToFilePath(inDir, segment, outIndexFilePath);
return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
}
private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, String outIndexFilePath)
throws IOException
{
// Retry any HDFS errors that occur, up to 5 times.
try {
return RetryUtils.retry(
() -> pushToFilePath(inDir, segment, outIndexFilePath),
exception -> exception instanceof Exception,
5
);
}
catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfInstanceOf(e, RuntimeException.class);
throw new RuntimeException(e);
}
}
private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException
{
log.debug(

View File

@ -23,6 +23,7 @@ import com.amazonaws.AmazonServiceException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.SegmentUtils;
@ -105,6 +106,19 @@ public class S3DataSegmentPusher implements DataSegmentPusher
);
}
catch (AmazonServiceException e) {
if (S3Utils.ERROR_ENTITY_TOO_LARGE.equals(S3Utils.getS3ErrorCode(e))) {
throw DruidException
.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(
e,
"Got error[%s] from S3 when uploading segment of size[%,d] bytes. This typically happens when segment "
+ "size is above 5GB. Try reducing your segment size by lowering the target number of rows per "
+ "segment.",
S3Utils.ERROR_ENTITY_TOO_LARGE,
indexSize
);
}
throw new IOException(e);
}
catch (Exception e) {

View File

@ -65,6 +65,11 @@ public class S3Utils
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Logger log = new Logger(S3Utils.class);
/**
* Error for calling putObject with an entity over 5GB in size.
*/
public static final String ERROR_ENTITY_TOO_LARGE = "EntityTooLarge";
public static final Predicate<Throwable> S3RETRY = new Predicate<Throwable>()
{
@Override
@ -113,6 +118,18 @@ public class S3Utils
return RetryUtils.retry(f, S3RETRY, maxRetries);
}
@Nullable
public static String getS3ErrorCode(final Throwable e)
{
if (e == null) {
return null;
} else if (e instanceof AmazonS3Exception) {
return ((AmazonS3Exception) e).getErrorCode();
} else {
return getS3ErrorCode(e.getCause());
}
}
static boolean isObjectInBucketIgnoringPermission(
ServerSideEncryptingAmazonS3 s3Client,
String bucketName,
@ -234,11 +251,11 @@ public class S3Utils
/**
* Delete the files from S3 in a specified bucket, matching a specified prefix and filter
*
* @param s3Client s3 client
* @param maxListingLength maximum number of keys to fetch and delete at a time
* @param bucket s3 bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
* @param s3Client s3 client
* @param maxListingLength maximum number of keys to fetch and delete at a time
* @param bucket s3 bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
*
* @throws Exception in case of errors
*/
@ -299,7 +316,9 @@ public class S3Utils
throws Exception
{
if (keysToDelete != null && log.isDebugEnabled()) {
List<String> keys = keysToDelete.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
List<String> keys = keysToDelete.stream()
.map(DeleteObjectsRequest.KeyVersion::getKey)
.collect(Collectors.toList());
log.debug("Deleting keys from bucket: [%s], keys: [%s]", bucket, keys);
}
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);

View File

@ -20,61 +20,96 @@
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.google.common.io.Files;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.regex.Pattern;
/**
*
*/
public class S3DataSegmentPusherTest
{
private static class ValueContainer<T>
{
private T value;
public T getValue()
{
return value;
}
public void setValue(T value)
{
this.value = value;
}
}
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testPush() throws Exception
{
testPushInternal(false, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip");
testPushInternal(
false,
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip",
client ->
EasyMock.expect(client.putObject(EasyMock.anyObject()))
.andReturn(new PutObjectResult())
.once()
);
}
@Test
public void testPushUseUniquePath() throws Exception
{
testPushInternal(true, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip");
testPushInternal(
true,
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip",
client ->
EasyMock.expect(client.putObject(EasyMock.anyObject()))
.andReturn(new PutObjectResult())
.once()
);
}
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
@Test
public void testEntityTooLarge()
{
final DruidException exception = Assert.assertThrows(
DruidException.class,
() ->
testPushInternal(
false,
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip",
client -> {
final AmazonS3Exception e = new AmazonS3Exception("whoa too many bytes");
e.setErrorCode(S3Utils.ERROR_ENTITY_TOO_LARGE);
EasyMock.expect(client.putObject(EasyMock.anyObject()))
.andThrow(e)
.once();
}
)
);
MatcherAssert.assertThat(
exception,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Got error[EntityTooLarge] from S3"))
);
}
private void testPushInternal(
boolean useUniquePath,
String matcher,
Consumer<ServerSideEncryptingAmazonS3> clientDecorator
) throws Exception
{
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
@ -83,9 +118,7 @@ public class S3DataSegmentPusherTest
acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once();
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
.andReturn(new PutObjectResult())
.once();
clientDecorator.accept(s3Client);
EasyMock.replay(s3Client);

View File

@ -37,7 +37,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@ -270,29 +269,24 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
.map(AggregatorFactory::getName)
.collect(Collectors.toList());
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
() -> segmentPusher.push(
mergedFileAndDimensionNames.lhs,
new DataSegment(
getDataSource(),
interval,
Preconditions.checkNotNull(
AbstractBatchIndexTask.findVersion(intervalToVersion, interval),
"version for interval[%s]",
interval
),
null, // will be filled in the segmentPusher
mergedFileAndDimensionNames.rhs,
metricNames,
createShardSpec(toolbox, interval, bucketId),
null, // will be filled in the segmentPusher
0 // will be filled in the segmentPusher
final DataSegment segment = segmentPusher.push(
mergedFileAndDimensionNames.lhs,
new DataSegment(
getDataSource(),
interval,
Preconditions.checkNotNull(
AbstractBatchIndexTask.findVersion(intervalToVersion, interval),
"version for interval[%s]",
interval
),
false
null, // will be filled in the segmentPusher
mergedFileAndDimensionNames.rhs,
metricNames,
createShardSpec(toolbox, interval, bucketId),
null, // will be filled in the segmentPusher
0 // will be filled in the segmentPusher
),
exception -> !(exception instanceof NullPointerException) && exception instanceof Exception,
5
false
);
long pushFinishTime = System.nanoTime();
pushedSegments.add(segment);

View File

@ -45,7 +45,7 @@ public interface DataSegmentPusher
String getPathForHadoop();
/**
* Pushes index files and segment descriptor to deep storage.
* Pushes index files and segment descriptor to deep storage. Expected to perform its own retries, if appropriate.
* @param file directory containing index files
* @param segment segment descriptor
* @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica

View File

@ -46,7 +46,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
@ -950,19 +949,12 @@ public class AppenderatorImpl implements Appenderator
IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())
);
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
// The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the
// Kafka indexing task, pushers must use unique file paths in deep storage in order to maintain exactly-once
// semantics.
() -> dataSegmentPusher.push(
mergedFile,
segmentToPush,
useUniquePath
),
exception -> exception instanceof Exception,
5
);
// The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the
// Kafka indexing task, pushers must use unique file paths in deep storage in order to maintain exactly-once
// semantics.
//
// dataSegmentPusher retries internally when appropriate; no need for retries here.
final DataSegment segment = dataSegmentPusher.push(mergedFile, segmentToPush, useUniquePath);
if (!isOpenSegments()) {
// Drop the queryable indexes behind the hydrants... they are not needed anymore and their

View File

@ -42,7 +42,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
@ -820,20 +819,17 @@ public class BatchAppenderator implements Appenderator
closer.close();
}
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
// This appenderator is used only for the local indexing task so unique paths are not required
() -> dataSegmentPusher.push(
mergedFile,
sink.getSegment()
.withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(
// dataSegmentPusher retries internally when appropriate; no need for retries here.
final DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment()
.withDimensions(
IndexMerger.getMergedDimensionsFromQueryableIndexes(
indexes,
schema.getDimensionsSpec()
)),
false
),
exception -> exception instanceof Exception,
5
)
),
false
);
// Drop the queryable indexes behind the hydrants... they are not needed anymore and their

View File

@ -46,7 +46,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@ -924,19 +923,8 @@ public class StreamAppenderator implements Appenderator
IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes, schema.getDimensionsSpec())
);
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
// The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the
// Kafka indexing task, pushers must use unique file paths in deep storage in order to maintain exactly-once
// semantics.
() -> dataSegmentPusher.push(
mergedFile,
segmentToPush,
useUniquePath
),
exception -> exception instanceof Exception,
5
);
// dataSegmentPusher retries internally when appropriate; no need for retries here.
final DataSegment segment = dataSegmentPusher.push(mergedFile, segmentToPush, useUniquePath);
final long pushFinishTime = System.nanoTime();

View File

@ -32,15 +32,21 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHandlingTest
@ -54,7 +60,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testSimpleIngestion() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, true)) {
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
@ -127,6 +133,84 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
}
}
@Test
public void testPushFailure() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
Assert.assertNull(appenderator.startJob());
// getDataSource
Assert.assertEquals(ClosedSegmensSinksBatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add #1
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);
// add #2
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList())
);
// add #3, this hits max rows in memory:
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), null)
.getNumRowsInSegment()
);
// since we just added three rows and the max rows in memory is three, all the segments (sinks etc.)
// above should be cleared now
Assert.assertEquals(
Collections.emptyList(),
((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
);
// add #4, this will add one more temporary segment:
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), null)
.getNumRowsInSegment()
);
// push all
final ListenableFuture<SegmentsAndCommitMetadata> segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
);
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
segmentsAndCommitMetadata::get
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class)))
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
CoreMatchers.startsWith("Push failure test"))))
);
}
}
/**
* Test the case when a segment identifier contains non UTC timestamps in its interval. This can happen
* when a custom segment granularity for an interval with a non UTC Chronlogy is created by
@ -135,7 +219,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testPeriodGranularityNonUTCIngestion() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(1, true)) {
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(1, false)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
@ -565,7 +649,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testMaxRowsInMemory() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, true)) {
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
@ -631,7 +715,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testTotalRowsPerSegment() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, true)) {
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
@ -691,7 +775,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testRestoreFromDisk() throws Exception
{
final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(2, true);
final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(2, false);
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
@ -730,7 +814,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test
public void testCleanupFromDiskAfterClose() throws Exception
{
final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(2, true);
final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(2, false);
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
@ -770,7 +854,7 @@ public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHan
@Test(timeout = 5000L)
public void testTotalRowCount() throws Exception
{
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, true)) {
try (final ClosedSegmensSinksBatchAppenderatorTester tester = new ClosedSegmensSinksBatchAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
Assert.assertEquals(0, appenderator.getTotalRowCount());

View File

@ -173,8 +173,6 @@ public class OpenAndClosedSegmentsAppenderatorTester implements AutoCloseable
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
@ -191,11 +189,8 @@ public class OpenAndClosedSegmentsAppenderatorTester implements AutoCloseable
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
if (enablePushFailure) {
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;

View File

@ -25,6 +25,7 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@ -54,12 +55,17 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -67,6 +73,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -88,7 +95,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
{
try (final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.enablePushFailure(true)
.basePersistDirectory(temporaryFolder.newFolder())
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -174,6 +180,88 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
}
}
@Test
public void testPushFailure() throws Exception
{
try (final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
commitMetadata.put("x", "1");
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "2");
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "3");
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 4), committerSupplier)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2), sorted(appenderator.getSegments()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final ListenableFuture<SegmentsAndCommitMetadata> segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
committerSupplier.get(),
false
);
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
segmentsAndCommitMetadata::get
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(CoreMatchers.instanceOf(IOException.class)))
);
MatcherAssert.assertThat(
e,
ThrowableCauseMatcher.hasCause(ThrowableCauseMatcher.hasCause(ThrowableMessageMatcher.hasMessage(
CoreMatchers.startsWith("Push failure test"))))
);
}
}
@Test
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
@ -182,7 +270,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(1024)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.skipBytesInMemoryOverheadCheck(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -232,7 +319,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(1024)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.skipBytesInMemoryOverheadCheck(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -279,7 +365,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(15000)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -383,7 +468,7 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
}
}
@Test(expected = RuntimeException.class)
@Test
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{
try (
@ -391,7 +476,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(5180)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -415,7 +499,10 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
Assert.assertThrows(
RuntimeException.class,
() -> appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier)
);
}
}
@ -427,7 +514,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(10)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.skipBytesInMemoryOverheadCheck(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -475,7 +561,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(10000)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -526,7 +611,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(31100)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -672,7 +756,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
.maxSizeInBytes(-1)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -725,7 +808,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
@ -832,7 +914,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
tuningConfig = tester.getTuningConfig();
@ -879,8 +960,7 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester2 =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(tuningConfig.getBasePersistDirectory())
.enablePushFailure(true)
.build()) {
.build()) {
final Appenderator appenderator2 = tester2.getAppenderator();
Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());
@ -896,7 +976,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
@ -992,7 +1071,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.withSegmentDropDelayInMilli(1000)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -1061,7 +1139,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -1202,7 +1279,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -1373,7 +1449,6 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
try (final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.enablePushFailure(true)
.basePersistDirectory(temporaryFolder.newFolder())
.build(dataSegmentAnnouncer, CentralizedDatasourceSchemaConfig.create())) {
final StreamAppenderator appenderator = (StreamAppenderator) tester.getAppenderator();

View File

@ -181,8 +181,6 @@ public class StreamAppenderatorTester implements AutoCloseable
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
@ -199,11 +197,8 @@ public class StreamAppenderatorTester implements AutoCloseable
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
if (enablePushFailure) {
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;