From e8729523902ffbebacd502bd4ea73099d6224f16 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 15 Dec 2014 14:51:58 -0800 Subject: [PATCH] fix working path default bug --- .../indexer/HadoopDruidIndexerConfig.java | 48 ++++++++++++------- .../io/druid/indexer/HadoopTuningConfig.java | 5 +- .../main/java/io/druid/indexer/JobHelper.java | 2 +- .../indexing/common/task/HadoopIndexTask.java | 4 +- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index f7b168d0f3f..8638e53b1d4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -83,6 +83,8 @@ public class HadoopDruidIndexerConfig public static final Joiner tabJoiner = Joiner.on("\t"); public static final ObjectMapper jsonMapper; + private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; + static { injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -116,8 +118,8 @@ public class HadoopDruidIndexerConfig { // Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without // the Map<> intermediary - - if(argSpec.containsKey("spec")){ + + if (argSpec.containsKey("spec")) { return HadoopDruidIndexerConfig.jsonMapper.convertValue( argSpec, HadoopDruidIndexerConfig.class @@ -138,8 +140,8 @@ public class HadoopDruidIndexerConfig return fromMap( (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( file, new TypeReference>() - { - } + { + } ) ); } @@ -175,7 +177,7 @@ public class HadoopDruidIndexerConfig private volatile HadoopIngestionSpec schema; private volatile PathSpec pathSpec; - private volatile Map shardSpecLookups = Maps.newHashMap(); + private volatile Map shardSpecLookups = Maps.newHashMap(); private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); private final QueryGranularity rollupGran; @@ -193,17 +195,17 @@ public class HadoopDruidIndexerConfig final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec(); shardSpecLookups.put( entry.getKey(), actualSpec.getLookup( - Lists.transform( - entry.getValue(), new Function() - { - @Override - public ShardSpec apply(HadoopyShardSpec input) - { - return input.getActualSpec(); - } - } + Lists.transform( + entry.getValue(), new Function() + { + @Override + public ShardSpec apply(HadoopyShardSpec input) + { + return input.getActualSpec(); + } + } + ) ) - ) ); for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) { hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); @@ -212,7 +214,7 @@ public class HadoopDruidIndexerConfig this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity(); } - @JsonProperty(value="spec") + @JsonProperty(value = "spec") public HadoopIngestionSpec getSchema() { return schema; @@ -333,7 +335,11 @@ public class HadoopDruidIndexerConfig return Optional.absent(); } - final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); + final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()) + .getShardSpec( + rollupGran.truncate(inputRow.getTimestampFromEpoch()), + inputRow + ); final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); return Optional.of( @@ -403,6 +409,12 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().isPersistInHeap(); } + public String getWorkingPath() + { + final String workingPath = schema.getTuningConfig().getWorkingPath(); + return workingPath == null ? DEFAULT_WORKING_PATH : workingPath; + } + /****************************************** Path helper logic ******************************************/ @@ -418,7 +430,7 @@ public class HadoopDruidIndexerConfig return new Path( String.format( "%s/%s/%s", - schema.getTuningConfig().getWorkingPath(), + getWorkingPath(), schema.getDataSchema().getDataSource(), schema.getTuningConfig().getVersion().replace(":", "") ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 9a2787780bc..382d24f0b5b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -36,7 +36,6 @@ import java.util.Map; @JsonTypeName("hadoop") public class HadoopTuningConfig implements TuningConfig { - private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.>of(); private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; @@ -46,7 +45,7 @@ public class HadoopTuningConfig implements TuningConfig public static HadoopTuningConfig makeDefaultTuningConfig() { return new HadoopTuningConfig( - DEFAULT_WORKING_PATH, + null, new DateTime().toString(), DEFAULT_PARTITIONS_SPEC, DEFAULT_SHARD_SPECS, @@ -99,7 +98,7 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio ) { - this.workingPath = workingPath == null ? DEFAULT_WORKING_PATH : workingPath; + this.workingPath = workingPath; this.version = version == null ? new DateTime().toString() : version; this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index b1ffabaa1f4..fa0d9b9772d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -63,7 +63,7 @@ public class JobHelper final Configuration conf = groupByJob.getConfiguration(); final FileSystem fs = FileSystem.get(conf); - Path distributedClassPath = new Path(config.getSchema().getTuningConfig().getWorkingPath(), "classpath"); + Path distributedClassPath = new Path(config.getWorkingPath(), "classpath"); if (fs instanceof LocalFileSystem) { return; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 086aef984b4..e27d70648b1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -111,13 +111,11 @@ public class HadoopIndexTask extends AbstractTask this.spec = spec; // Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service - if (this.spec.getTuningConfig().getWorkingPath() != null) { - log.error("workingPath should be absent in your spec! Ignoring"); - } Preconditions.checkArgument( this.spec.getIOConfig().getSegmentOutputPath() == null, "segmentOutputPath must be absent" ); + Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); Preconditions.checkArgument( this.spec.getIOConfig().getMetadataUpdateSpec() == null, "metadataUpdateSpec must be absent"