mirror of https://github.com/apache/druid.git
HadoopIndexTask: Jackson fixes and general overriding of storage-specific stuff
This commit is contained in:
parent
f862d9205d
commit
e40fba4de2
|
@ -198,7 +198,7 @@ public class HadoopDruidIndexerConfig
|
|||
final @JsonProperty("pathSpec") PathSpec pathSpec,
|
||||
final @JsonProperty("workingPath") String jobOutputDir,
|
||||
final @JsonProperty("segmentOutputPath") String segmentOutputDir,
|
||||
final @JsonProperty("version") DateTime version,
|
||||
final @JsonProperty("version") String version,
|
||||
final @JsonProperty("partitionDimension") String partitionDimension,
|
||||
final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
|
||||
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||
|
@ -220,7 +220,7 @@ public class HadoopDruidIndexerConfig
|
|||
this.pathSpec = pathSpec;
|
||||
this.jobOutputDir = jobOutputDir;
|
||||
this.segmentOutputDir = segmentOutputDir;
|
||||
this.version = version == null ? new DateTime().toString() : version.toString();
|
||||
this.version = version == null ? new DateTime().toString() : version;
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
this.leaveIntermediate = leaveIntermediate;
|
||||
this.cleanupOnFailure = cleanupOnFailure;
|
||||
|
|
|
@ -15,6 +15,9 @@ public abstract class TaskConfig
|
|||
@Default("500000")
|
||||
public abstract int getDefaultRowFlushBoundary();
|
||||
|
||||
@Config("druid.merger.hadoopWorkingPath")
|
||||
public abstract String getHadoopWorkingPath();
|
||||
|
||||
public File getTaskDir(final Task task) {
|
||||
return new File(getBaseTaskDir(), task.getId());
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -14,20 +16,17 @@ import com.metamx.druid.merger.common.TaskToolbox;
|
|||
import com.metamx.druid.merger.common.actions.LockListAction;
|
||||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.utils.JodaUtils;
|
||||
import org.codehaus.jackson.annotate.JsonCreator;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class HadoopIndexTask extends AbstractTask
|
||||
{
|
||||
@JsonProperty
|
||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||
|
||||
@JsonProperty
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
|
||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||
|
||||
/**
|
||||
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
* for creating Druid index segments. It may be modified.
|
||||
|
@ -44,16 +43,16 @@ public class HadoopIndexTask extends AbstractTask
|
|||
)
|
||||
{
|
||||
super(
|
||||
String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime()),
|
||||
String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
|
||||
config.getDataSource(),
|
||||
JodaUtils.umbrellaInterval(config.getIntervals())
|
||||
);
|
||||
|
||||
if (config.isUpdaterJobSpecSet()) {
|
||||
throw new IllegalArgumentException(
|
||||
"The UpdaterJobSpec field of the Hadoop Druid indexer config must be set to null"
|
||||
);
|
||||
}
|
||||
// Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service
|
||||
Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent");
|
||||
Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent");
|
||||
Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent");
|
||||
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
|
@ -66,10 +65,21 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
// Copy config so we don't needlessly modify our provided one
|
||||
// Also necessary to make constructor validations work upon serde-after-run
|
||||
final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper()
|
||||
.readValue(
|
||||
toolbox.getObjectMapper().writeValueAsBytes(config),
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
|
||||
// We should have a lock from before we started running
|
||||
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
|
||||
log.info("Setting version to: %s", myLock.getVersion());
|
||||
config.setVersion(myLock.getVersion());
|
||||
configCopy.setVersion(myLock.getVersion());
|
||||
|
||||
// Set workingPath to some reasonable default
|
||||
configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath());
|
||||
|
||||
if (toolbox.getSegmentPusher() instanceof S3DataSegmentPusher) {
|
||||
// Hack alert! Bypassing DataSegmentPusher...
|
||||
|
@ -82,14 +92,15 @@ public class HadoopIndexTask extends AbstractTask
|
|||
);
|
||||
|
||||
log.info("Setting segment output path to: %s", s3Path);
|
||||
config.setSegmentOutputDir(s3Path);
|
||||
configCopy.setSegmentOutputDir(s3Path);
|
||||
} else {
|
||||
throw new IllegalStateException("Sorry, we only work with S3DataSegmentPushers! Bummer!");
|
||||
}
|
||||
|
||||
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
|
||||
log.debug("Starting a hadoop index generator job...");
|
||||
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy);
|
||||
configCopy.verify();
|
||||
|
||||
log.info("Starting a hadoop index generator job...");
|
||||
if (job.run()) {
|
||||
List<DataSegment> publishedSegments = job.getPublishedSegments();
|
||||
|
||||
|
|
|
@ -37,7 +37,8 @@ import org.joda.time.Interval;
|
|||
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
|
||||
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
|
||||
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
|
||||
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class)
|
||||
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
|
||||
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class)
|
||||
})
|
||||
public interface Task
|
||||
{
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.common.Granularity;
|
||||
import com.metamx.druid.QueryGranularity;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import com.metamx.druid.indexer.data.JSONDataSpec;
|
||||
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
|
||||
import com.metamx.druid.indexer.path.StaticPathSpec;
|
||||
import com.metamx.druid.indexer.rollup.DataRollupSpec;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.realtime.Schema;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
|
@ -32,13 +37,17 @@ public class TaskSerdeTest
|
|||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P2D")), task.getFixedInterval());
|
||||
|
||||
Assert.assertEquals(task.getId(), task2.getId());
|
||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
|
||||
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -61,11 +70,13 @@ public class TaskSerdeTest
|
|||
final String json = jsonMapper.writeValueAsString(task);
|
||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
|
||||
|
||||
Assert.assertEquals(task.getId(), task2.getId());
|
||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
|
||||
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -82,11 +93,13 @@ public class TaskSerdeTest
|
|||
final String json = jsonMapper.writeValueAsString(task);
|
||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
|
||||
|
||||
Assert.assertEquals(task.getId(), task2.getId());
|
||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
|
||||
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -99,7 +112,6 @@ public class TaskSerdeTest
|
|||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
System.out.println(json);
|
||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals(task.getId(), task2.getId());
|
||||
|
@ -108,4 +120,47 @@ public class TaskSerdeTest
|
|||
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
|
||||
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHadoopIndexTaskSerde() throws Exception
|
||||
{
|
||||
final HadoopIndexTask task = new HadoopIndexTask(
|
||||
new HadoopDruidIndexerConfig(
|
||||
null,
|
||||
"foo",
|
||||
"timestamp",
|
||||
"auto",
|
||||
new JSONDataSpec(ImmutableList.of("foo")),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
|
||||
new StaticPathSpec("bar"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
true,
|
||||
null,
|
||||
false,
|
||||
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
|
||||
null,
|
||||
false,
|
||||
ImmutableList.<String>of()
|
||||
)
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
|
||||
|
||||
Assert.assertEquals(task.getId(), task2.getId());
|
||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -300,6 +300,12 @@ public class RemoteTaskRunnerTest
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHadoopWorkingPath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}, null, null, null, null, null, jsonMapper
|
||||
),
|
||||
Executors.newSingleThreadExecutor()
|
||||
|
|
|
@ -110,6 +110,12 @@ public class TaskLifecycleTest
|
|||
{
|
||||
return 50000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHadoopWorkingPath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
},
|
||||
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
|
||||
newMockEmitter(),
|
||||
|
|
Loading…
Reference in New Issue