From 078de4fcf9402c5cc13c0aabf2753b5712e84ddf Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Fri, 7 Oct 2016 13:59:14 -0700 Subject: [PATCH] Use explicit version from HadoopIngestionSpec. (#3554) --- .../input/orc/OrcIndexGeneratorJobTest.java | 1 + .../io/druid/indexer/HadoopTuningConfig.java | 21 +++++++++++++++---- .../indexer/BatchDeltaIngestionTest.java | 1 + .../DetermineHashedPartitionsJobTest.java | 1 + .../indexer/DeterminePartitionsJobTest.java | 1 + .../indexer/HadoopDruidIndexerConfigTest.java | 2 ++ .../druid/indexer/HadoopTuningConfigTest.java | 2 ++ .../druid/indexer/IndexGeneratorJobTest.java | 3 ++- .../java/io/druid/indexer/JobHelperTest.java | 1 + .../indexer/path/GranularityPathSpecTest.java | 1 + .../updater/HadoopConverterJobTest.java | 1 + .../indexing/common/task/HadoopIndexTask.java | 14 ++++++++++++- 12 files changed, 43 insertions(+), 6 deletions(-) diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index c1c5fdf36a3..076bd248039 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -216,6 +216,7 @@ public class OrcIndexGeneratorJobTest null, true, null, + false, false ) ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 655546ec16c..f636f55ed78 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -65,6 +65,7 @@ public class HadoopTuningConfig implements TuningConfig null, DEFAULT_BUILD_V9_DIRECTLY, DEFAULT_NUM_BACKGROUND_PERSIST_THREADS, + false, false ); } @@ -85,6 +86,7 @@ public class HadoopTuningConfig implements TuningConfig private final Boolean buildV9Directly; private final int numBackgroundPersistThreads; private final boolean forceExtendableShardSpecs; + private final boolean useExplicitVersion; @JsonCreator public HadoopTuningConfig( @@ -105,7 +107,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT, final @JsonProperty("buildV9Directly") Boolean buildV9Directly, final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads, - final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs + final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs, + final @JsonProperty("useExplicitVersion") boolean useExplicitVersion ) { this.workingPath = workingPath; @@ -131,6 +134,7 @@ public class HadoopTuningConfig implements TuningConfig : numBackgroundPersistThreads; this.forceExtendableShardSpecs = forceExtendableShardSpecs; Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0"); + this.useExplicitVersion = useExplicitVersion; } @JsonProperty @@ -229,6 +233,12 @@ public class HadoopTuningConfig implements TuningConfig return forceExtendableShardSpecs; } + @JsonProperty + public boolean isUseExplicitVersion() + { + return useExplicitVersion; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -248,7 +258,8 @@ public class HadoopTuningConfig implements TuningConfig null, buildV9Directly, numBackgroundPersistThreads, - forceExtendableShardSpecs + forceExtendableShardSpecs, + useExplicitVersion ); } @@ -271,7 +282,8 @@ public class HadoopTuningConfig implements TuningConfig null, buildV9Directly, numBackgroundPersistThreads, - forceExtendableShardSpecs + forceExtendableShardSpecs, + useExplicitVersion ); } @@ -294,7 +306,8 @@ public class HadoopTuningConfig implements TuningConfig null, buildV9Directly, numBackgroundPersistThreads, - forceExtendableShardSpecs + forceExtendableShardSpecs, + useExplicitVersion ); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 9e01792ed92..afb79d4c193 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -385,6 +385,7 @@ public class BatchDeltaIngestionTest null, null, null, + false, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 9e1d9c14138..6b34777a2ce 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -164,6 +164,7 @@ public class DetermineHashedPartitionsJobTest null, null, null, + false, false ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 6bc5e737cf0..082f9e7b9a3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -268,6 +268,7 @@ public class DeterminePartitionsJobTest null, null, null, + false, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index a1cc4374996..a6febf016e2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -227,6 +227,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + false, false ) ); @@ -298,6 +299,7 @@ public class HadoopDruidIndexerConfigTest null, null, null, + false, false ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index c5e5118ee24..c36af52fc95 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -56,6 +56,7 @@ public class HadoopTuningConfigTest null, null, null, + true, true ); @@ -76,6 +77,7 @@ public class HadoopTuningConfigTest Assert.assertEquals(true, actual.getUseCombiner()); Assert.assertEquals(0, actual.getNumBackgroundPersistThreads()); Assert.assertEquals(true, actual.isForceExtendableShardSpecs()); + Assert.assertEquals(true, actual.isUseExplicitVersion()); } public static T jsonReadWriteRead(String s, Class klass) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 3c1d60207f7..0e0e38ff591 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -517,7 +517,8 @@ public class IndexGeneratorJobTest null, buildV9Directly, null, - forceExtendableShardSpecs + forceExtendableShardSpecs, + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 6ef92465f13..d5ec35e7752 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -119,6 +119,7 @@ public class JobHelperTest null, null, null, + false, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index 7be061d5528..e08a5a0d256 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -137,6 +137,7 @@ public class GranularityPathSpecTest null, null, null, + false, false ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 473711c8eed..9884b3bcf09 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -207,6 +207,7 @@ public class HadoopConverterJobTest null, null, null, + false, false ) ) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 41886d9a227..b2f9f32eb6a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -188,7 +188,7 @@ public class HadoopIndexTask extends HadoopTask // We should have a lock from before we started running only if interval was specified - final String version; + String version; if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval( JodaUtils.condenseIntervals( @@ -203,6 +203,18 @@ public class HadoopIndexTask extends HadoopTask version = myLock.getVersion(); } + final String specVersion = indexerSchema.getTuningConfig().getVersion(); + if (indexerSchema.getTuningConfig().isUseExplicitVersion() && version.compareTo(specVersion) > 0) { + version = specVersion; + } else { + log.error( + "Spec version can not be greater than lock version, Spec version: [%s] Lock version: [%s].", + specVersion, + version + ); + return TaskStatus.failure(getId()); + } + log.info("Setting version to: %s", version); final String segments = invokeForeignLoader(