mirror of
https://github.com/apache/druid.git
synced 2025-02-25 20:48:05 +00:00
add a unique string to intermediate path for the hadoop indexing task
This commit is contained in:
parent
b9760e1219
commit
6cfaf59d7e
@ -476,10 +476,11 @@ public class HadoopDruidIndexerConfig
|
|||||||
{
|
{
|
||||||
return new Path(
|
return new Path(
|
||||||
String.format(
|
String.format(
|
||||||
"%s/%s/%s",
|
"%s/%s/%s/%s",
|
||||||
getWorkingPath(),
|
getWorkingPath(),
|
||||||
schema.getDataSchema().getDataSource(),
|
schema.getDataSchema().getDataSource(),
|
||||||
schema.getTuningConfig().getVersion().replace(":", "")
|
schema.getTuningConfig().getVersion().replace(":", ""),
|
||||||
|
schema.getUniqueId()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
|
import io.druid.common.utils.UUIDUtils;
|
||||||
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
|
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
|
||||||
import io.druid.indexer.hadoop.WindowedDataSegment;
|
import io.druid.indexer.hadoop.WindowedDataSegment;
|
||||||
import io.druid.indexer.path.UsedSegmentLister;
|
import io.druid.indexer.path.UsedSegmentLister;
|
||||||
@ -45,11 +46,15 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||||||
private final HadoopIOConfig ioConfig;
|
private final HadoopIOConfig ioConfig;
|
||||||
private final HadoopTuningConfig tuningConfig;
|
private final HadoopTuningConfig tuningConfig;
|
||||||
|
|
||||||
|
//this is used in the temporary paths on the hdfs unique to an hadoop indexing task
|
||||||
|
private final String uniqueId;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public HadoopIngestionSpec(
|
public HadoopIngestionSpec(
|
||||||
@JsonProperty("dataSchema") DataSchema dataSchema,
|
@JsonProperty("dataSchema") DataSchema dataSchema,
|
||||||
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
|
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
|
||||||
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig
|
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
|
||||||
|
@JsonProperty("uniqueId") String uniqueId
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSchema, ioConfig, tuningConfig);
|
super(dataSchema, ioConfig, tuningConfig);
|
||||||
@ -57,6 +62,17 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||||||
this.dataSchema = dataSchema;
|
this.dataSchema = dataSchema;
|
||||||
this.ioConfig = ioConfig;
|
this.ioConfig = ioConfig;
|
||||||
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
|
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
|
||||||
|
this.uniqueId = uniqueId == null ? UUIDUtils.generateUuid() : uniqueId;
|
||||||
|
}
|
||||||
|
|
||||||
|
//for unit tests
|
||||||
|
public HadoopIngestionSpec(
|
||||||
|
DataSchema dataSchema,
|
||||||
|
HadoopIOConfig ioConfig,
|
||||||
|
HadoopTuningConfig tuningConfig
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this(dataSchema, ioConfig, tuningConfig, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty("dataSchema")
|
@JsonProperty("dataSchema")
|
||||||
@ -80,12 +96,19 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||||||
return tuningConfig;
|
return tuningConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty("uniqueId")
|
||||||
|
public String getUniqueId()
|
||||||
|
{
|
||||||
|
return uniqueId;
|
||||||
|
}
|
||||||
|
|
||||||
public HadoopIngestionSpec withDataSchema(DataSchema schema)
|
public HadoopIngestionSpec withDataSchema(DataSchema schema)
|
||||||
{
|
{
|
||||||
return new HadoopIngestionSpec(
|
return new HadoopIngestionSpec(
|
||||||
schema,
|
schema,
|
||||||
ioConfig,
|
ioConfig,
|
||||||
tuningConfig
|
tuningConfig,
|
||||||
|
uniqueId
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,7 +117,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||||||
return new HadoopIngestionSpec(
|
return new HadoopIngestionSpec(
|
||||||
dataSchema,
|
dataSchema,
|
||||||
config,
|
config,
|
||||||
tuningConfig
|
tuningConfig,
|
||||||
|
uniqueId
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,7 +127,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||||||
return new HadoopIngestionSpec(
|
return new HadoopIngestionSpec(
|
||||||
dataSchema,
|
dataSchema,
|
||||||
ioConfig,
|
ioConfig,
|
||||||
config
|
config,
|
||||||
|
uniqueId
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ package io.druid.indexer;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.InjectableValues;
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import io.druid.indexer.partitions.HashedPartitionsSpec;
|
import io.druid.indexer.partitions.HashedPartitionsSpec;
|
||||||
@ -238,6 +239,32 @@ public class HadoopIngestionSpecTest
|
|||||||
schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(),
|
schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(),
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Assert.assertFalse(Strings.isNullOrEmpty(schema.getUniqueId()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUniqueId() throws Exception
|
||||||
|
{
|
||||||
|
final HadoopIngestionSpec schema = jsonReadWriteRead(
|
||||||
|
"{\"uniqueId\" : \"test_unique_id\"}",
|
||||||
|
HadoopIngestionSpec.class
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals("test_unique_id", schema.getUniqueId());
|
||||||
|
|
||||||
|
//test uniqueId assigned is really unique
|
||||||
|
final String id1 = jsonReadWriteRead(
|
||||||
|
"{}",
|
||||||
|
HadoopIngestionSpec.class
|
||||||
|
).getUniqueId();
|
||||||
|
|
||||||
|
final String id2 = jsonReadWriteRead(
|
||||||
|
"{}",
|
||||||
|
HadoopIngestionSpec.class
|
||||||
|
).getUniqueId();
|
||||||
|
|
||||||
|
Assert.assertNotEquals(id1, id2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user