fix more serde

This commit is contained in:
fjy 2014-05-06 15:17:38 -07:00
parent 1100d2f2a1
commit d75cc7b9b8
3 changed files with 36 additions and 24 deletions

View File

@ -67,6 +67,14 @@ public class HadoopIndexTask extends AbstractTask
public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0"; public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
{
if (spec != null) {
return spec.getDataSchema().getDataSource();
}
return config.getDataSchema().getDataSource();
}
@JsonIgnore @JsonIgnore
private final HadoopIngestionSpec spec; private final HadoopIngestionSpec spec;
@JsonIgnore @JsonIgnore
@ -74,36 +82,39 @@ public class HadoopIndexTask extends AbstractTask
/** /**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified. * for creating Druid index segments. It may be modified.
* <p/> * <p/>
* Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the * Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method * job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database. * segments, and let the indexing service report these segments to the database.
*/ */
@JsonCreator @JsonCreator
public HadoopIndexTask( public HadoopIndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("spec") HadoopIngestionSpec spec, @JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
@JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
) )
{ {
super( super(
id != null ? id : String.format("index_hadoop_%s_%s", spec.getDataSchema().getDataSource(), new DateTime()), id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec, config), new DateTime()),
spec.getDataSchema().getDataSource() getTheDataSource(spec, config)
); );
this.spec = spec == null ? config : spec;
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service // Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument( Preconditions.checkArgument(
spec.getIOConfig().getSegmentOutputPath() == null, this.spec.getIOConfig().getSegmentOutputPath() == null,
"segmentOutputPath must be absent" "segmentOutputPath must be absent"
); );
Preconditions.checkArgument(spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent"); Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
this.spec = spec;
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList( this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates
) : hadoopDependencyCoordinates; ) : hadoopDependencyCoordinates;
@ -131,7 +142,7 @@ public class HadoopIndexTask extends AbstractTask
} }
} }
@JsonProperty("schema") @JsonProperty("spec")
public HadoopIngestionSpec getSpec() public HadoopIngestionSpec getSpec()
{ {
return spec; return spec;
@ -194,7 +205,7 @@ public class HadoopIndexTask extends AbstractTask
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs}); String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper() HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopIngestionSpec.class); .readValue(config, HadoopIngestionSpec.class);
// We should have a lock from before we started running only if interval was specified // We should have a lock from before we started running only if interval was specified
@ -247,10 +258,10 @@ public class HadoopIndexTask extends AbstractTask
String version = args[1]; String version = args[1];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
schema, schema,
HadoopIngestionSpec.class HadoopIngestionSpec.class
); );
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema( final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema theSchema
.withTuningConfig(theSchema.getTuningConfig().withVersion(version)) .withTuningConfig(theSchema.getTuningConfig().withVersion(version))
@ -276,10 +287,10 @@ public class HadoopIndexTask extends AbstractTask
final String segmentOutputPath = args[2]; final String segmentOutputPath = args[2];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
schema, schema,
HadoopIngestionSpec.class HadoopIngestionSpec.class
); );
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema( final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema theSchema
.withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath)) .withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath))

View File

@ -396,6 +396,7 @@ public class TaskSerdeTest
public void testHadoopIndexTaskSerde() throws Exception public void testHadoopIndexTaskSerde() throws Exception
{ {
final HadoopIndexTask task = new HadoopIndexTask( final HadoopIndexTask task = new HadoopIndexTask(
null,
null, null,
new HadoopIngestionSpec( new HadoopIngestionSpec(
null, null, null, null, null, null,

View File

@ -30,7 +30,7 @@
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection> <connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url> <url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.100-SNAPSHOT</tag> <tag>druid-0.6.107-SNAPSHOT</tag>
</scm> </scm>
<prerequisites> <prerequisites>
@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.5</metamx.java-util.version> <metamx.java-util.version>0.25.5</metamx.java-util.version>
<apache.curator.version>2.4.0</apache.curator.version> <apache.curator.version>2.4.0</apache.curator.version>
<druid.api.version>0.2.0-SNAPSHOT</druid.api.version> <druid.api.version>0.2.0</druid.api.version>
</properties> </properties>
<modules> <modules>