Merge pull request #2049 from himanshug/hadoop_indexing_unique_path

add a unique string to intermediate path for the hadoop indexing task
This commit is contained in:
Fangjin Yang 2015-12-07 11:46:16 -08:00
commit d957a6602c
3 changed files with 59 additions and 6 deletions

View File

@ -478,10 +478,11 @@ public class HadoopDruidIndexerConfig
{
return new Path(
String.format(
"%s/%s/%s",
"%s/%s/%s/%s",
getWorkingPath(),
schema.getDataSchema().getDataSource(),
schema.getTuningConfig().getVersion().replace(":", "")
schema.getTuningConfig().getVersion().replace(":", ""),
schema.getUniqueId()
)
);
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.common.utils.UUIDUtils;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.indexer.path.UsedSegmentLister;
@ -47,11 +48,15 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
private final HadoopIOConfig ioConfig;
private final HadoopTuningConfig tuningConfig;
//this is used in the temporary paths on the hdfs unique to an hadoop indexing task
private final String uniqueId;
@JsonCreator
public HadoopIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
@JsonProperty("uniqueId") String uniqueId
)
{
super(dataSchema, ioConfig, tuningConfig);
@ -59,6 +64,17 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
this.uniqueId = uniqueId == null ? UUIDUtils.generateUuid() : uniqueId;
}
//for unit tests
public HadoopIngestionSpec(
DataSchema dataSchema,
HadoopIOConfig ioConfig,
HadoopTuningConfig tuningConfig
)
{
this(dataSchema, ioConfig, tuningConfig, null);
}
@JsonProperty("dataSchema")
@ -82,12 +98,19 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return tuningConfig;
}
@JsonProperty("uniqueId")
public String getUniqueId()
{
return uniqueId;
}
public HadoopIngestionSpec withDataSchema(DataSchema schema)
{
return new HadoopIngestionSpec(
schema,
ioConfig,
tuningConfig
tuningConfig,
uniqueId
);
}
@ -96,7 +119,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return new HadoopIngestionSpec(
dataSchema,
config,
tuningConfig
tuningConfig,
uniqueId
);
}
@ -105,7 +129,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return new HadoopIngestionSpec(
dataSchema,
ioConfig,
config
config,
uniqueId
);
}

View File

@ -21,6 +21,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.indexer.partitions.HashedPartitionsSpec;
@ -240,6 +241,32 @@ public class HadoopIngestionSpecTest
schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(),
false
);
Assert.assertFalse(Strings.isNullOrEmpty(schema.getUniqueId()));
}
@Test
public void testUniqueId() throws Exception
{
final HadoopIngestionSpec schema = jsonReadWriteRead(
"{\"uniqueId\" : \"test_unique_id\"}",
HadoopIngestionSpec.class
);
Assert.assertEquals("test_unique_id", schema.getUniqueId());
//test uniqueId assigned is really unique
final String id1 = jsonReadWriteRead(
"{}",
HadoopIngestionSpec.class
).getUniqueId();
final String id2 = jsonReadWriteRead(
"{}",
HadoopIngestionSpec.class
).getUniqueId();
Assert.assertNotEquals(id1, id2);
}
@Test