fix working path default bug

This commit is contained in:
fjy 2014-12-15 14:51:58 -08:00
parent a520835972
commit e872952390
4 changed files with 34 additions and 25 deletions

View File

@ -83,6 +83,8 @@ public class HadoopDruidIndexerConfig
public static final Joiner tabJoiner = Joiner.on("\t"); public static final Joiner tabJoiner = Joiner.on("\t");
public static final ObjectMapper jsonMapper; public static final ObjectMapper jsonMapper;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
static { static {
injector = Initialization.makeInjectorWithModules( injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), GuiceInjectors.makeStartupInjector(),
@ -117,7 +119,7 @@ public class HadoopDruidIndexerConfig
// Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without // Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without
// the Map<> intermediary // the Map<> intermediary
if(argSpec.containsKey("spec")){ if (argSpec.containsKey("spec")) {
return HadoopDruidIndexerConfig.jsonMapper.convertValue( return HadoopDruidIndexerConfig.jsonMapper.convertValue(
argSpec, argSpec,
HadoopDruidIndexerConfig.class HadoopDruidIndexerConfig.class
@ -138,8 +140,8 @@ public class HadoopDruidIndexerConfig
return fromMap( return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue( (Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>() file, new TypeReference<Map<String, Object>>()
{ {
} }
) )
); );
} }
@ -175,7 +177,7 @@ public class HadoopDruidIndexerConfig
private volatile HadoopIngestionSpec schema; private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec; private volatile PathSpec pathSpec;
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap(); private volatile Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap(); private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran; private final QueryGranularity rollupGran;
@ -193,17 +195,17 @@ public class HadoopDruidIndexerConfig
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec(); final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
shardSpecLookups.put( shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup( entry.getKey(), actualSpec.getLookup(
Lists.transform( Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>() entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{ {
@Override @Override
public ShardSpec apply(HadoopyShardSpec input) public ShardSpec apply(HadoopyShardSpec input)
{ {
return input.getActualSpec(); return input.getActualSpec();
} }
} }
)
) )
)
); );
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) { for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec); hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
@ -212,7 +214,7 @@ public class HadoopDruidIndexerConfig
this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity(); this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity();
} }
@JsonProperty(value="spec") @JsonProperty(value = "spec")
public HadoopIngestionSpec getSchema() public HadoopIngestionSpec getSchema()
{ {
return schema; return schema;
@ -333,7 +335,11 @@ public class HadoopDruidIndexerConfig
return Optional.absent(); return Optional.absent();
} }
final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart())
.getShardSpec(
rollupGran.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec); final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
return Optional.of( return Optional.of(
@ -403,6 +409,12 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().isPersistInHeap(); return schema.getTuningConfig().isPersistInHeap();
} }
public String getWorkingPath()
{
final String workingPath = schema.getTuningConfig().getWorkingPath();
return workingPath == null ? DEFAULT_WORKING_PATH : workingPath;
}
/****************************************** /******************************************
Path helper logic Path helper logic
******************************************/ ******************************************/
@ -418,7 +430,7 @@ public class HadoopDruidIndexerConfig
return new Path( return new Path(
String.format( String.format(
"%s/%s/%s", "%s/%s/%s",
schema.getTuningConfig().getWorkingPath(), getWorkingPath(),
schema.getDataSchema().getDataSource(), schema.getDataSchema().getDataSource(),
schema.getTuningConfig().getVersion().replace(":", "") schema.getTuningConfig().getVersion().replace(":", "")
) )

View File

@ -36,7 +36,6 @@ import java.util.Map;
@JsonTypeName("hadoop") @JsonTypeName("hadoop")
public class HadoopTuningConfig implements TuningConfig public class HadoopTuningConfig implements TuningConfig
{ {
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(); private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
@ -46,7 +45,7 @@ public class HadoopTuningConfig implements TuningConfig
public static HadoopTuningConfig makeDefaultTuningConfig() public static HadoopTuningConfig makeDefaultTuningConfig()
{ {
return new HadoopTuningConfig( return new HadoopTuningConfig(
DEFAULT_WORKING_PATH, null,
new DateTime().toString(), new DateTime().toString(),
DEFAULT_PARTITIONS_SPEC, DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS, DEFAULT_SHARD_SPECS,
@ -99,7 +98,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
) )
{ {
this.workingPath = workingPath == null ? DEFAULT_WORKING_PATH : workingPath; this.workingPath = workingPath;
this.version = version == null ? new DateTime().toString() : version; this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;

View File

@ -63,7 +63,7 @@ public class JobHelper
final Configuration conf = groupByJob.getConfiguration(); final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf); final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getSchema().getTuningConfig().getWorkingPath(), "classpath"); Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
if (fs instanceof LocalFileSystem) { if (fs instanceof LocalFileSystem) {
return; return;

View File

@ -111,13 +111,11 @@ public class HadoopIndexTask extends AbstractTask
this.spec = spec; this.spec = spec;
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service // Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
if (this.spec.getTuningConfig().getWorkingPath() != null) {
log.error("workingPath should be absent in your spec! Ignoring");
}
Preconditions.checkArgument( Preconditions.checkArgument(
this.spec.getIOConfig().getSegmentOutputPath() == null, this.spec.getIOConfig().getSegmentOutputPath() == null,
"segmentOutputPath must be absent" "segmentOutputPath must be absent"
); );
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument( Preconditions.checkArgument(
this.spec.getIOConfig().getMetadataUpdateSpec() == null, this.spec.getIOConfig().getMetadataUpdateSpec() == null,
"metadataUpdateSpec must be absent" "metadataUpdateSpec must be absent"