Use explicit version from HadoopIngestionSpec. (#3554)

This commit is contained in:
Akash Dwivedi 2016-10-07 13:59:14 -07:00 committed by cheddar
parent 7e6824501c
commit 078de4fcf9
12 changed files with 43 additions and 6 deletions

View File

@ -216,6 +216,7 @@ public class OrcIndexGeneratorJobTest
null,
true,
null,
false,
false
)
)

View File

@ -65,6 +65,7 @@ public class HadoopTuningConfig implements TuningConfig
null,
DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS,
false,
false
);
}
@ -85,6 +86,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Boolean buildV9Directly;
private final int numBackgroundPersistThreads;
private final boolean forceExtendableShardSpecs;
private final boolean useExplicitVersion;
@JsonCreator
public HadoopTuningConfig(
@ -105,7 +107,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads,
final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs
final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs,
final @JsonProperty("useExplicitVersion") boolean useExplicitVersion
)
{
this.workingPath = workingPath;
@ -131,6 +134,7 @@ public class HadoopTuningConfig implements TuningConfig
: numBackgroundPersistThreads;
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
this.useExplicitVersion = useExplicitVersion;
}
@JsonProperty
@ -229,6 +233,12 @@ public class HadoopTuningConfig implements TuningConfig
return forceExtendableShardSpecs;
}
@JsonProperty
public boolean isUseExplicitVersion()
{
return useExplicitVersion;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -248,7 +258,8 @@ public class HadoopTuningConfig implements TuningConfig
null,
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs
forceExtendableShardSpecs,
useExplicitVersion
);
}
@ -271,7 +282,8 @@ public class HadoopTuningConfig implements TuningConfig
null,
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs
forceExtendableShardSpecs,
useExplicitVersion
);
}
@ -294,7 +306,8 @@ public class HadoopTuningConfig implements TuningConfig
null,
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs
forceExtendableShardSpecs,
useExplicitVersion
);
}
}

View File

@ -385,6 +385,7 @@ public class BatchDeltaIngestionTest
null,
null,
null,
false,
false
)
)

View File

@ -164,6 +164,7 @@ public class DetermineHashedPartitionsJobTest
null,
null,
null,
false,
false
)
);

View File

@ -268,6 +268,7 @@ public class DeterminePartitionsJobTest
null,
null,
null,
false,
false
)
)

View File

@ -227,6 +227,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
false,
false
)
);
@ -298,6 +299,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
false,
false
)
);

View File

@ -56,6 +56,7 @@ public class HadoopTuningConfigTest
null,
null,
null,
true,
true
);
@ -76,6 +77,7 @@ public class HadoopTuningConfigTest
Assert.assertEquals(true, actual.getUseCombiner());
Assert.assertEquals(0, actual.getNumBackgroundPersistThreads());
Assert.assertEquals(true, actual.isForceExtendableShardSpecs());
Assert.assertEquals(true, actual.isUseExplicitVersion());
}
public static <T> T jsonReadWriteRead(String s, Class<T> klass)

View File

@ -517,7 +517,8 @@ public class IndexGeneratorJobTest
null,
buildV9Directly,
null,
forceExtendableShardSpecs
forceExtendableShardSpecs,
false
)
)
);

View File

@ -119,6 +119,7 @@ public class JobHelperTest
null,
null,
null,
false,
false
)
)

View File

@ -137,6 +137,7 @@ public class GranularityPathSpecTest
null,
null,
null,
false,
false
)
);

View File

@ -207,6 +207,7 @@ public class HadoopConverterJobTest
null,
null,
null,
false,
false
)
)

View File

@ -188,7 +188,7 @@ public class HadoopIndexTask extends HadoopTask
// We should have a lock from before we started running only if interval was specified
final String version;
String version;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
@ -203,6 +203,18 @@ public class HadoopIndexTask extends HadoopTask
version = myLock.getVersion();
}
final String specVersion = indexerSchema.getTuningConfig().getVersion();
if (indexerSchema.getTuningConfig().isUseExplicitVersion() && version.compareTo(specVersion) > 0) {
version = specVersion;
} else {
log.error(
"Spec version can not be greater than lock version, Spec version: [%s] Lock version: [%s].",
specVersion,
version
);
return TaskStatus.failure(getId());
}
log.info("Setting version to: %s", version);
final String segments = invokeForeignLoader(