rename configs to make a bit more sense

This commit is contained in:
fjy 2014-05-06 14:52:50 -07:00
parent b6fb4245aa
commit 1100d2f2a1
27 changed files with 249 additions and 258 deletions

View File

@ -103,7 +103,7 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSchema schema)
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSpec schema)
{
return new HadoopDruidIndexerConfig(schema);
}
@ -154,12 +154,12 @@ public class HadoopDruidIndexerConfig
return retVal;
}
private volatile HadoopIngestionSchema schema;
private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("schema") HadoopIngestionSchema schema
final @JsonProperty("schema") HadoopIngestionSpec schema
)
{
this.schema = schema;
@ -167,7 +167,7 @@ public class HadoopDruidIndexerConfig
}
@JsonProperty
public HadoopIngestionSchema getSchema()
public HadoopIngestionSpec getSchema()
{
return schema;
}
@ -190,28 +190,28 @@ public class HadoopDruidIndexerConfig
public PartitionsSpec getPartitionsSpec()
{
return schema.getDriverConfig().getPartitionsSpec();
return schema.getTuningConfig().getPartitionsSpec();
}
public boolean isOverwriteFiles()
{
return schema.getDriverConfig().isOverwriteFiles();
return schema.getTuningConfig().isOverwriteFiles();
}
public boolean isIgnoreInvalidRows()
{
return schema.getDriverConfig().isIgnoreInvalidRows();
return schema.getTuningConfig().isIgnoreInvalidRows();
}
public void setVersion(String version)
{
this.schema = schema.withDriverConfig(schema.getDriverConfig().withVersion(version));
this.schema = schema.withTuningConfig(schema.getTuningConfig().withVersion(version));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withDriverConfig(schema.getDriverConfig().withShardSpecs(shardSpecs));
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
@ -227,17 +227,17 @@ public class HadoopDruidIndexerConfig
public boolean isDeterminingPartitions()
{
return schema.getDriverConfig().getPartitionsSpec().isDeterminingPartitions();
return schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions();
}
public Long getTargetPartitionSize()
{
return schema.getDriverConfig().getPartitionsSpec().getTargetPartitionSize();
return schema.getTuningConfig().getPartitionsSpec().getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return schema.getDriverConfig().getPartitionsSpec().getMaxPartitionSize();
return schema.getTuningConfig().getPartitionsSpec().getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
@ -252,7 +252,7 @@ public class HadoopDruidIndexerConfig
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return schema.getDriverConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
public Job addInputPaths(Job job) throws IOException
@ -282,7 +282,7 @@ public class HadoopDruidIndexerConfig
return Optional.absent();
}
final List<HadoopyShardSpec> shards = schema.getDriverConfig().getShardSpecs().get(timeBucket.get().getStart());
final List<HadoopyShardSpec> shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart());
if (shards == null || shards.isEmpty()) {
return Optional.absent();
}
@ -327,7 +327,7 @@ public class HadoopDruidIndexerConfig
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getDriverConfig().getShardSpecs().get(bucketTime);
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime);
if (specs == null) {
return ImmutableList.of();
}
@ -370,9 +370,9 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s/%s",
schema.getDriverConfig().getWorkingPath(),
schema.getTuningConfig().getWorkingPath(),
schema.getDataSchema().getDataSource(),
schema.getDriverConfig().getVersion().replace(":", "")
schema.getTuningConfig().getVersion().replace(":", "")
)
);
}
@ -425,7 +425,7 @@ public class HadoopDruidIndexerConfig
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
schema.getDriverConfig().getVersion().replace(":", "_"),
schema.getTuningConfig().getVersion().replace(":", "_"),
bucket.partitionNum
)
);
@ -437,7 +437,7 @@ public class HadoopDruidIndexerConfig
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
schema.getDriverConfig().getVersion(),
schema.getTuningConfig().getVersion(),
bucket.partitionNum
)
);
@ -447,7 +447,7 @@ public class HadoopDruidIndexerConfig
{
Configuration conf = job.getConfiguration();
for (final Map.Entry<String, String> entry : schema.getDriverConfig().getJobProperties().entrySet()) {
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
@ -478,8 +478,8 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec");
Preconditions.checkNotNull(schema.getDataSchema().getGranularitySpec(), "granularitySpec");
Preconditions.checkNotNull(pathSpec, "pathSpec");
Preconditions.checkNotNull(schema.getDriverConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(schema.getTuningConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getDriverConfig().getVersion(), "version");
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}
}

View File

@ -33,7 +33,7 @@ import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTime;
@ -44,17 +44,17 @@ import java.util.Map;
/**
*/
public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, HadoopDriverConfig>
public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTuningConfig>
{
private final DataSchema dataSchema;
private final HadoopIOConfig ioConfig;
private final HadoopDriverConfig driverConfig;
private final HadoopTuningConfig tuningConfig;
@JsonCreator
public HadoopIngestionSchema(
public HadoopIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
@JsonProperty("driverConfig") HadoopDriverConfig driverConfig,
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
// All deprecated
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@ -82,12 +82,12 @@ public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, Hadoo
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
)
{
super(dataSchema, ioConfig, driverConfig);
super(dataSchema, ioConfig, tuningConfig);
if (dataSchema != null) {
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig == null ? HadoopDriverConfig.makeDefaultDriverConfig() : driverConfig;
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
} else { // Backwards compatibility
TimestampSpec theTimestampSpec = (timestampSpec == null)
? new TimestampSpec(timestampColumn, timestampFormat)
@ -153,7 +153,7 @@ public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, Hadoo
segmentOutputPath
);
this.driverConfig = new HadoopDriverConfig(
this.tuningConfig = new HadoopTuningConfig(
workingPath,
version,
thePartitionSpec,
@ -182,19 +182,19 @@ public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, Hadoo
return ioConfig;
}
@JsonProperty("driverConfig")
@JsonProperty("tuningConfig")
@Override
public HadoopDriverConfig getDriverConfig()
public HadoopTuningConfig getTuningConfig()
{
return driverConfig;
return tuningConfig;
}
public HadoopIngestionSchema withDataSchema(DataSchema schema)
public HadoopIngestionSpec withDataSchema(DataSchema schema)
{
return new HadoopIngestionSchema(
return new HadoopIngestionSpec(
schema,
ioConfig,
driverConfig,
tuningConfig,
null,
null,
null,
@ -221,12 +221,12 @@ public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, Hadoo
);
}
public HadoopIngestionSchema withIOConfig(HadoopIOConfig config)
public HadoopIngestionSpec withIOConfig(HadoopIOConfig config)
{
return new HadoopIngestionSchema(
return new HadoopIngestionSpec(
dataSchema,
config,
driverConfig,
tuningConfig,
null,
null,
null,
@ -253,9 +253,9 @@ public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, Hadoo
);
}
public HadoopIngestionSchema withDriverConfig(HadoopDriverConfig config)
public HadoopIngestionSpec withTuningConfig(HadoopTuningConfig config)
{
return new HadoopIngestionSchema(
return new HadoopIngestionSpec(
dataSchema,
ioConfig,
config,

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.indexing.DriverConfig;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime;
import java.util.List;
@ -34,15 +34,15 @@ import java.util.Map;
/**
*/
@JsonTypeName("hadoop")
public class HadoopDriverConfig implements DriverConfig
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int defaultRowFlushBoundary = 80000;
public static HadoopDriverConfig makeDefaultDriverConfig()
public static HadoopTuningConfig makeDefaultTuningConfig()
{
return new HadoopDriverConfig(
return new HadoopTuningConfig(
null,
new DateTime().toString(),
defaultPartitionsSpec,
@ -68,7 +68,7 @@ public class HadoopDriverConfig implements DriverConfig
private final Map<String, String> jobProperties;
@JsonCreator
public HadoopDriverConfig(
public HadoopTuningConfig(
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@ -155,9 +155,9 @@ public class HadoopDriverConfig implements DriverConfig
return jobProperties;
}
public HadoopDriverConfig withWorkingPath(String path)
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopDriverConfig(
return new HadoopTuningConfig(
path,
version,
partitionsSpec,
@ -171,9 +171,9 @@ public class HadoopDriverConfig implements DriverConfig
);
}
public HadoopDriverConfig withVersion(String ver)
public HadoopTuningConfig withVersion(String ver)
{
return new HadoopDriverConfig(
return new HadoopTuningConfig(
workingPath,
ver,
partitionsSpec,
@ -187,9 +187,9 @@ public class HadoopDriverConfig implements DriverConfig
);
}
public HadoopDriverConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> specs)
public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> specs)
{
return new HadoopDriverConfig(
return new HadoopTuningConfig(
workingPath,
version,
partitionsSpec,

View File

@ -298,7 +298,7 @@ public class IndexGeneratorJob implements Jobby
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getDriverConfig().getRowFlushBoundary()) {
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
@ -452,7 +452,7 @@ public class IndexGeneratorJob implements Jobby
DataSegment segment = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getDriverConfig().getVersion(),
config.getSchema().getTuningConfig().getVersion(),
loadSpec,
dimensionNames,
metricNames,

View File

@ -63,7 +63,7 @@ public class JobHelper
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getSchema().getDriverConfig().getWorkingPath(), "classpath");
Path distributedClassPath = new Path(config.getSchema().getTuningConfig().getWorkingPath(), "classpath");
if (fs instanceof LocalFileSystem) {
return;
@ -138,8 +138,8 @@ public class JobHelper
}
}
if (!config.getSchema().getDriverConfig().isLeaveIntermediate()) {
if (failedMessage == null || config.getSchema().getDriverConfig().isCleanupOnFailure()) {
if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
if (failedMessage == null || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {

View File

@ -21,7 +21,6 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@ -50,7 +49,7 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
HadoopIngestionSchema schema;
HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -63,7 +62,7 @@ public class HadoopDruidIndexerConfigTest
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -71,8 +70,8 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withDriverConfig(
schema.getDriverConfig()
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
@ -90,7 +89,7 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -103,7 +102,7 @@ public class HadoopDruidIndexerConfigTest
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -111,8 +110,8 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withDriverConfig(
schema.getDriverConfig()
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)

View File

@ -29,22 +29,18 @@ import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
public class HadoopIngestionSchemaTest
public class HadoopIngestionSpecTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testGranularitySpec()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -55,7 +51,7 @@ public class HadoopIngestionSchemaTest
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -81,7 +77,7 @@ public class HadoopIngestionSchemaTest
public void testGranularitySpecLegacy()
{
// Deprecated and replaced by granularitySpec, but still supported
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -89,7 +85,7 @@ public class HadoopIngestionSchemaTest
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -116,7 +112,7 @@ public class HadoopIngestionSchemaTest
{
boolean thrown = false;
try {
final HadoopIngestionSchema schema = jsonReadWriteRead(
final HadoopIngestionSpec schema = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"],"
@ -126,7 +122,7 @@ public class HadoopIngestionSchemaTest
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -139,7 +135,7 @@ public class HadoopIngestionSchemaTest
@Test
public void testPartitionsSpecAutoDimension()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -148,14 +144,14 @@ public class HadoopIngestionSchemaTest
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
@ -178,7 +174,7 @@ public class HadoopIngestionSchemaTest
@Test
public void testPartitionsSpecSpecificDimensionLegacy()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -188,14 +184,14 @@ public class HadoopIngestionSchemaTest
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
@ -226,7 +222,7 @@ public class HadoopIngestionSchemaTest
@Test
public void testPartitionsSpecLegacy()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -234,14 +230,14 @@ public class HadoopIngestionSchemaTest
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
@ -272,7 +268,7 @@ public class HadoopIngestionSchemaTest
@Test
public void testPartitionsSpecMaxPartitionSize()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -283,14 +279,14 @@ public class HadoopIngestionSchemaTest
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
@ -323,14 +319,14 @@ public class HadoopIngestionSchemaTest
{
boolean thrown = false;
try {
final HadoopIngestionSchema schema = jsonReadWriteRead(
final HadoopIngestionSpec schema = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -343,7 +339,7 @@ public class HadoopIngestionSchemaTest
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
schema = jsonReadWriteRead(
"{"
@ -355,7 +351,7 @@ public class HadoopIngestionSchemaTest
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
@ -371,12 +367,12 @@ public class HadoopIngestionSchemaTest
@Test
public void testDefaultSettings()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -385,19 +381,19 @@ public class HadoopIngestionSchemaTest
Assert.assertEquals(
"cleanupOnFailure",
schema.getDriverConfig().isCleanupOnFailure(),
schema.getTuningConfig().isCleanupOnFailure(),
true
);
Assert.assertEquals(
"overwriteFiles",
schema.getDriverConfig().isOverwriteFiles(),
schema.getTuningConfig().isOverwriteFiles(),
false
);
Assert.assertEquals(
"isDeterminingPartitions",
schema.getDriverConfig().getPartitionsSpec().isDeterminingPartitions(),
schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(),
false
);
}
@ -405,12 +401,12 @@ public class HadoopIngestionSchemaTest
@Test
public void testNoCleanupOnFailure()
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{\"cleanupOnFailure\":false}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
@ -419,7 +415,7 @@ public class HadoopIngestionSchemaTest
Assert.assertEquals(
"cleanupOnFailure",
schema.getDriverConfig().isCleanupOnFailure(),
schema.getTuningConfig().isCleanupOnFailure(),
false
);
}
@ -436,7 +432,7 @@ public class HadoopIngestionSchemaTest
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopIngestionSchema schema;
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
@ -446,14 +442,14 @@ public class HadoopIngestionSchemaTest
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",

View File

@ -38,7 +38,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -90,7 +90,7 @@ public class YeOldePlumberSchool implements PlumberSchool
@Override
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final RealtimeTuningConfig config,
final FireDepartmentMetrics metrics
)
{

View File

@ -33,7 +33,7 @@ import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -68,15 +68,15 @@ public class HadoopIndexTask extends AbstractTask
public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
@JsonIgnore
private final HadoopIngestionSchema schema;
private final HadoopIngestionSpec spec;
@JsonIgnore
private final List<String> hadoopDependencyCoordinates;
/**
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
* <p/>
* Here, we will ensure that the DbConnectorConfig field of the schema is set to null, such that the
* Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.
@ -85,25 +85,25 @@ public class HadoopIndexTask extends AbstractTask
@JsonCreator
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("schema") HadoopIngestionSchema schema,
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
)
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSchema().getDataSource(), new DateTime()),
schema.getDataSchema().getDataSource()
id != null ? id : String.format("index_hadoop_%s_%s", spec.getDataSchema().getDataSource(), new DateTime()),
spec.getDataSchema().getDataSource()
);
// Some HadoopIngestionSchema stuff doesn't make sense in the context of the indexing service
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(
schema.getIOConfig().getSegmentOutputPath() == null,
spec.getIOConfig().getSegmentOutputPath() == null,
"segmentOutputPath must be absent"
);
Preconditions.checkArgument(schema.getDriverConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(schema.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
Preconditions.checkArgument(spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
this.schema = schema;
this.spec = spec;
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates
) : hadoopDependencyCoordinates;
@ -118,7 +118,7 @@ public class HadoopIndexTask extends AbstractTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Optional<SortedSet<Interval>> intervals = schema.getDataSchema().getGranularitySpec().bucketIntervals();
Optional<SortedSet<Interval>> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
@ -132,9 +132,9 @@ public class HadoopIndexTask extends AbstractTask
}
@JsonProperty("schema")
public HadoopIngestionSchema getSchema()
public HadoopIngestionSpec getSpec()
{
return schema;
return spec;
}
@JsonProperty
@ -178,7 +178,7 @@ public class HadoopIndexTask extends AbstractTask
jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
boolean determineIntervals = !schema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName());
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod(
@ -187,14 +187,14 @@ public class HadoopIndexTask extends AbstractTask
);
String[] determineConfigArgs = new String[]{
toolbox.getObjectMapper().writeValueAsString(schema),
toolbox.getObjectMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
};
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
HadoopIngestionSchema indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopIngestionSchema.class);
HadoopIngestionSpec indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopIngestionSpec.class);
// We should have a lock from before we started running only if interval was specified
@ -246,14 +246,14 @@ public class HadoopIndexTask extends AbstractTask
final String schema = args[0];
String version = args[1];
final HadoopIngestionSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema
.withDriverConfig(theSchema.getDriverConfig().withVersion(version))
.withTuningConfig(theSchema.getTuningConfig().withVersion(version))
);
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
@ -275,15 +275,15 @@ public class HadoopIndexTask extends AbstractTask
final String workingPath = args[1];
final String segmentOutputPath = args[2];
final HadoopIngestionSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopIngestionSchema.class
HadoopIngestionSpec.class
);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema
.withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath))
.withDriverConfig(theSchema.getDriverConfig().withWorkingPath(workingPath))
.withTuningConfig(theSchema.getTuningConfig().withWorkingPath(workingPath))
);
Jobby job = new HadoopDruidDetermineConfigurationJob(config);

View File

@ -44,10 +44,10 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.DriverConfig;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -72,7 +72,7 @@ public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
private static String makeId(String id, IndexIngestionSchema ingestionSchema, String dataSource)
private static String makeId(String id, IndexIngestionSpec ingestionSchema, String dataSource)
{
if (id == null) {
return String.format("index_%s_%s", makeDataSource(ingestionSchema, dataSource), new DateTime().toString());
@ -81,7 +81,7 @@ public class IndexTask extends AbstractFixedIntervalTask
return id;
}
private static String makeDataSource(IndexIngestionSchema ingestionSchema, String dataSource)
private static String makeDataSource(IndexIngestionSpec ingestionSchema, String dataSource)
{
if (ingestionSchema != null) {
return ingestionSchema.getDataSchema().getDataSource();
@ -90,7 +90,7 @@ public class IndexTask extends AbstractFixedIntervalTask
}
}
private static Interval makeInterval(IndexIngestionSchema ingestionSchema, GranularitySpec granularitySpec)
private static Interval makeInterval(IndexIngestionSpec ingestionSchema, GranularitySpec granularitySpec)
{
GranularitySpec spec;
if (ingestionSchema != null) {
@ -106,12 +106,12 @@ public class IndexTask extends AbstractFixedIntervalTask
}
@JsonIgnore
private final IndexIngestionSchema ingestionSchema;
private final IndexIngestionSpec ingestionSchema;
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("schema") IndexIngestionSchema ingestionSchema,
@JsonProperty("schema") IndexIngestionSpec ingestionSchema,
// Backwards Compatible
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("granularitySpec") final GranularitySpec granularitySpec,
@ -132,7 +132,7 @@ public class IndexTask extends AbstractFixedIntervalTask
if (ingestionSchema != null) {
this.ingestionSchema = ingestionSchema;
} else { // Backwards Compatible
this.ingestionSchema = new IndexIngestionSchema(
this.ingestionSchema = new IndexIngestionSpec(
new DataSchema(
dataSource,
firehoseFactory.getParser(),
@ -140,7 +140,7 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
new IndexDriverConfig(targetPartitionSize, rowFlushBoundary)
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary)
);
}
}
@ -152,7 +152,7 @@ public class IndexTask extends AbstractFixedIntervalTask
}
@JsonProperty("schema")
public IndexIngestionSchema getIngestionSchema()
public IndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@ -161,7 +161,7 @@ public class IndexTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final int targetPartitionSize = ingestionSchema.getDriverConfig().getTargetPartitionSize();
final int targetPartitionSize = ingestionSchema.getTuningConfig().getTargetPartitionSize();
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
@ -355,7 +355,7 @@ public class IndexTask extends AbstractFixedIntervalTask
);
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final int rowFlushBoundary = ingestionSchema.getDriverConfig().getRowFlushBoundary();
final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary();
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
@ -384,7 +384,7 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, new RealtimeDriverConfig(null, null, null, null, null, null, null, shardSpec), metrics);
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
@ -463,24 +463,24 @@ public class IndexTask extends AbstractFixedIntervalTask
return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(inputRow);
}
public static class IndexIngestionSchema extends IngestionSchema<IndexIOConfig, IndexDriverConfig>
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
{
private final DataSchema dataSchema;
private final IndexIOConfig ioConfig;
private final IndexDriverConfig driverConfig;
private final IndexTuningConfig tuningConfig;
@JsonCreator
public IndexIngestionSchema(
public IndexIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") IndexIOConfig ioConfig,
@JsonProperty("driverConfig") IndexDriverConfig driverConfig
@JsonProperty("tuningConfig") IndexTuningConfig tuningConfig
)
{
super(dataSchema, ioConfig, driverConfig);
super(dataSchema, ioConfig, tuningConfig);
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
this.tuningConfig = tuningConfig;
}
@Override
@ -498,10 +498,10 @@ public class IndexTask extends AbstractFixedIntervalTask
}
@Override
@JsonProperty("driverConfig")
public IndexDriverConfig getDriverConfig()
@JsonProperty("tuningConfig")
public IndexTuningConfig getTuningConfig()
{
return driverConfig;
return tuningConfig;
}
}
@ -526,13 +526,13 @@ public class IndexTask extends AbstractFixedIntervalTask
}
@JsonTypeName("index")
public static class IndexDriverConfig implements DriverConfig
public static class IndexTuningConfig implements TuningConfig
{
private final int targetPartitionSize;
private final int rowFlushBoundary;
@JsonCreator
public IndexDriverConfig(
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)

View File

@ -44,7 +44,7 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
@ -82,7 +82,7 @@ public class RealtimeIndexTask extends AbstractTask
return String.format(
"index_realtime_%s_%d_%s",
fireDepartment.getDataSchema().getDataSource(),
fireDepartment.getDriverConfig().getShardSpec().getPartitionNum(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(),
new DateTime().toString()
);
}
@ -94,7 +94,7 @@ public class RealtimeIndexTask extends AbstractTask
}
@JsonIgnore
private final FireDepartment schema;
private final FireDepartment spec;
@JsonIgnore
private volatile Plumber plumber = null;
@ -106,9 +106,9 @@ public class RealtimeIndexTask extends AbstractTask
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("config") FireDepartment fireDepartment,
@JsonProperty("spec") FireDepartment fireDepartment,
// Backwards compatible, to be deprecated
@JsonProperty("schema") Schema schema,
@JsonProperty("schema") Schema spec,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@ -118,24 +118,24 @@ public class RealtimeIndexTask extends AbstractTask
)
{
super(
id == null ? makeTaskId(fireDepartment, schema) : id,
String.format("index_realtime_%s", makeDatasource(fireDepartment, schema)),
taskResource == null ? new TaskResource(makeTaskId(fireDepartment, schema), 1) : taskResource,
makeDatasource(fireDepartment, schema)
id == null ? makeTaskId(fireDepartment, spec) : id,
String.format("index_realtime_%s", makeDatasource(fireDepartment, spec)),
taskResource == null ? new TaskResource(makeTaskId(fireDepartment, spec), 1) : taskResource,
makeDatasource(fireDepartment, spec)
);
if (fireDepartment != null) {
this.schema = fireDepartment;
this.spec = fireDepartment;
} else {
this.schema = new FireDepartment(
this.spec = new FireDepartment(
new DataSchema(
schema.getDataSource(),
spec.getDataSource(),
firehoseFactory == null ? null : firehoseFactory.getParser(),
schema.getAggregators(),
new UniformGranularitySpec(segmentGranularity, schema.getIndexGranularity(), null, segmentGranularity)
spec.getAggregators(),
new UniformGranularitySpec(segmentGranularity, spec.getIndexGranularity(), null, segmentGranularity)
),
new RealtimeIOConfig(firehoseFactory, null),
new RealtimeDriverConfig(
new RealtimeTuningConfig(
fireDepartmentConfig == null ? null : fireDepartmentConfig.getMaxRowsInMemory(),
fireDepartmentConfig == null ? null : fireDepartmentConfig.getIntermediatePersistPeriod(),
windowPeriod,
@ -143,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask
null,
rejectionPolicyFactory,
maxPendingPersists,
schema.getShardSpec()
spec.getShardSpec()
),
null, null, null, null
);
@ -197,8 +197,8 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true;
// Set up firehose
final Period intermediatePersistPeriod = schema.getDriverConfig().getIntermediatePersistPeriod();
final Firehose firehose = schema.getIOConfig().getFirehoseFactory().connect(schema.getDataSchema().getParser());
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
final Firehose firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
@ -279,16 +279,16 @@ public class RealtimeIndexTask extends AbstractTask
}
};
DataSchema dataSchema = schema.getDataSchema();
RealtimeIOConfig realtimeIOConfig = schema.getIOConfig();
RealtimeDriverConfig driverConfig = schema.getDriverConfig()
DataSchema dataSchema = spec.getDataSchema();
RealtimeIOConfig realtimeIOConfig = spec.getIOConfig();
RealtimeTuningConfig tuningConfig = spec.getTuningConfig()
.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist"))
.withVersioningPolicy(versioningPolicy);
final FireDepartment fireDepartment = new FireDepartment(
dataSchema,
realtimeIOConfig,
driverConfig,
tuningConfig,
null,
null,
null,
@ -317,7 +317,7 @@ public class RealtimeIndexTask extends AbstractTask
0
);
this.plumber = plumberSchool.findPlumber(dataSchema, driverConfig, fireDepartment.getMetrics());
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
try {
plumber.startJob();
@ -354,7 +354,7 @@ public class RealtimeIndexTask extends AbstractTask
int currCount = sink.add(inputRow);
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= driverConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
@ -390,10 +390,10 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@JsonProperty("config")
@JsonProperty("spec")
public FireDepartment getRealtimeIngestionSchema()
{
return schema;
return spec;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher

View File

@ -28,7 +28,7 @@ import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
@ -220,7 +220,7 @@ public class TaskSerdeTest
Assert.assertEquals(
new Period("PT10M"),
task.getRealtimeIngestionSchema()
.getDriverConfig().getWindowPeriod()
.getTuningConfig().getWindowPeriod()
);
Assert.assertEquals(
Granularity.HOUR,
@ -233,8 +233,8 @@ public class TaskSerdeTest
Assert.assertEquals(task.getTaskResource().getRequiredCapacity(), task2.getTaskResource().getRequiredCapacity());
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(
task.getRealtimeIngestionSchema().getDriverConfig().getWindowPeriod(),
task2.getRealtimeIngestionSchema().getDriverConfig().getWindowPeriod()
task.getRealtimeIngestionSchema().getTuningConfig().getWindowPeriod(),
task2.getRealtimeIngestionSchema().getTuningConfig().getWindowPeriod()
);
Assert.assertEquals(
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
@ -397,7 +397,7 @@ public class TaskSerdeTest
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopIngestionSchema(
new HadoopIngestionSpec(
null, null, null,
"foo",
new TimestampSpec("timestamp", "auto"),
@ -441,6 +441,9 @@ public class TaskSerdeTest
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getSchema().getDriverConfig().getJobProperties(), task2.getSchema().getDriverConfig().getJobProperties());
Assert.assertEquals(
task.getSpec().getTuningConfig().getJobProperties(),
task2.getSpec().getTuningConfig().getJobProperties()
);
}
}

View File

@ -24,22 +24,22 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public abstract class IngestionSchema<IOConfigType extends IOConfig, DriverConfigType extends DriverConfig>
public abstract class IngestionSpec<IOConfigType extends IOConfig, TuningConfigType extends TuningConfig>
{
private final DataSchema dataSchema;
private final IOConfigType ioConfig;
private final DriverConfigType driverConfig;
private final TuningConfigType tuningConfig;
@JsonCreator
public IngestionSchema(
public IngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") IOConfigType ioConfig,
@JsonProperty("driverConfig") DriverConfigType driverConfig
@JsonProperty("tuningConfig") TuningConfigType tuningConfig
)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
this.tuningConfig = tuningConfig;
}
@JsonProperty("dataSchema")
@ -54,9 +54,9 @@ public abstract class IngestionSchema<IOConfigType extends IOConfig, DriverConfi
return ioConfig;
}
@JsonProperty("driverConfig")
public DriverConfigType getDriverConfig()
@JsonProperty("tuningConfig")
public TuningConfigType getTuningConfig()
{
return driverConfig;
return tuningConfig;
}
}

View File

@ -34,7 +34,7 @@ import java.io.File;
/**
*/
public class RealtimeDriverConfig implements DriverConfig
public class RealtimeTuningConfig implements TuningConfig
{
private static final int defaultMaxRowsInMemory = 500000;
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
@ -46,9 +46,9 @@ public class RealtimeDriverConfig implements DriverConfig
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
// Might make sense for this to be a builder
public static RealtimeDriverConfig makeDefaultDriverConfig()
public static RealtimeTuningConfig makeDefaultTuningConfig()
{
return new RealtimeDriverConfig(
return new RealtimeTuningConfig(
defaultMaxRowsInMemory,
defaultIntermediatePersistPeriod,
defaultWindowPeriod,
@ -70,7 +70,7 @@ public class RealtimeDriverConfig implements DriverConfig
private final ShardSpec shardSpec;
@JsonCreator
public RealtimeDriverConfig(
public RealtimeTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@ -143,9 +143,9 @@ public class RealtimeDriverConfig implements DriverConfig
return shardSpec;
}
public RealtimeDriverConfig withVersioningPolicy(VersioningPolicy policy)
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeDriverConfig(
return new RealtimeTuningConfig(
maxRowsInMemory,
intermediatePersistPeriod,
windowPeriod,
@ -157,9 +157,9 @@ public class RealtimeDriverConfig implements DriverConfig
);
}
public RealtimeDriverConfig withBasePersistDirectory(File dir)
public RealtimeTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeDriverConfig(
return new RealtimeTuningConfig(
maxRowsInMemory,
intermediatePersistPeriod,
windowPeriod,

View File

@ -26,8 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "realtime", value = RealtimeDriverConfig.class)
@JsonSubTypes.Type(name = "realtime", value = RealtimeTuningConfig.class)
})
public interface DriverConfig
public interface TuningConfig
{
}

View File

@ -26,8 +26,8 @@ import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
@ -44,11 +44,11 @@ import java.io.IOException;
* realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes
* sure that the sinks don't overflow.
*/
public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDriverConfig>
public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuningConfig>
{
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
private final RealtimeDriverConfig driverConfig;
private final RealtimeTuningConfig tuningConfig;
private final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
@ -56,7 +56,7 @@ public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDr
public FireDepartment(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") RealtimeIOConfig ioConfig,
@JsonProperty("driverConfig") RealtimeDriverConfig driverConfig,
@JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig,
// Backwards Compatability
@JsonProperty("schema") Schema schema,
@JsonProperty("config") FireDepartmentConfig config,
@ -64,7 +64,7 @@ public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDr
@JsonProperty("plumber") PlumberSchool plumberSchool
)
{
super(dataSchema, ioConfig, driverConfig);
super(dataSchema, ioConfig, tuningConfig);
// Backwards compatibility
if (dataSchema == null) {
@ -88,7 +88,7 @@ public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDr
firehoseFactory,
plumberSchool
);
this.driverConfig = new RealtimeDriverConfig(
this.tuningConfig = new RealtimeTuningConfig(
config.getMaxRowsInMemory(),
config.getIntermediatePersistPeriod(),
((RealtimePlumberSchool) plumberSchool).getWindowPeriod(),
@ -104,7 +104,7 @@ public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDr
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig == null ? RealtimeDriverConfig.makeDefaultDriverConfig() : driverConfig;
this.tuningConfig = tuningConfig == null ? RealtimeTuningConfig.makeDefaultTuningConfig() : tuningConfig;
}
}
@ -127,16 +127,16 @@ public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDr
return ioConfig;
}
@JsonProperty("driverConfig")
@JsonProperty("tuningConfig")
@Override
public RealtimeDriverConfig getDriverConfig()
public RealtimeTuningConfig getTuningConfig()
{
return driverConfig;
return tuningConfig;
}
public Plumber findPlumber()
{
return ioConfig.getPlumberSchool().findPlumber(dataSchema, driverConfig, metrics);
return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics);
}
public Firehose connect() throws IOException

View File

@ -31,7 +31,6 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@ -41,9 +40,8 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import org.joda.time.DateTime;
@ -138,7 +136,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private volatile RealtimeDriverConfig config = null;
private volatile RealtimeTuningConfig config = null;
private volatile Firehose firehose = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
@ -154,7 +152,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void init() throws IOException
{
config = fireDepartment.getDriverConfig();
config = fireDepartment.getTuningConfig();
synchronized (this) {
try {

View File

@ -28,7 +28,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
@ -48,7 +48,7 @@ public class FlushingPlumber extends RealtimePlumber
private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final RealtimeTuningConfig config;
private final Duration flushDuration;
private volatile ScheduledExecutorService flushScheduledExec = null;
@ -57,7 +57,7 @@ public class FlushingPlumber extends RealtimePlumber
public FlushingPlumber(
Duration flushDuration,
DataSchema schema,
RealtimeDriverConfig config,
RealtimeTuningConfig config,
FireDepartmentMetrics metrics,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,

View File

@ -28,7 +28,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
@ -94,7 +94,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@Override
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final RealtimeTuningConfig config,
final FireDepartmentMetrics metrics
)
{

View File

@ -23,11 +23,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.Granularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import java.io.File;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RealtimePlumberSchool.class)
@ -42,7 +40,7 @@ public interface PlumberSchool
*
* @return returns a plumber
*/
public Plumber findPlumber(DataSchema schema, RealtimeDriverConfig config, FireDepartmentMetrics metrics);
public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics);
@Deprecated
public Granularity getSegmentGranularity();

View File

@ -36,7 +36,7 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -71,7 +71,7 @@ public class RealtimePlumber implements Plumber
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
@ -94,7 +94,7 @@ public class RealtimePlumber implements Plumber
public RealtimePlumber(
DataSchema schema,
RealtimeDriverConfig config,
RealtimeTuningConfig config,
FireDepartmentMetrics metrics,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
@ -125,7 +125,7 @@ public class RealtimePlumber implements Plumber
return schema;
}
public RealtimeDriverConfig getConfig()
public RealtimeTuningConfig getConfig()
{
return config;
}

View File

@ -29,7 +29,7 @@ import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher;
@ -132,7 +132,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@Override
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final RealtimeTuningConfig config,
final FireDepartmentMetrics metrics
)
{

View File

@ -33,9 +33,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -55,14 +54,14 @@ public class Sink implements Iterable<FireHydrant>
private final Interval interval;
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final RealtimeTuningConfig config;
private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
public Sink(
Interval interval,
DataSchema schema,
RealtimeDriverConfig config,
RealtimeTuningConfig config,
String version
)
{
@ -77,7 +76,7 @@ public class Sink implements Iterable<FireHydrant>
public Sink(
Interval interval,
DataSchema schema,
RealtimeDriverConfig config,
RealtimeTuningConfig config,
String version,
List<FireHydrant> hydrants
)

View File

@ -30,7 +30,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
@ -76,7 +76,7 @@ public class FireDepartmentTest
null, null, null, null, null, null, null, null, null, null, null, null, 0
)
),
new RealtimeDriverConfig(
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null
),
null, null, null, null

View File

@ -34,7 +34,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
@ -48,7 +48,6 @@ import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@ -95,7 +94,7 @@ public class RealtimeManagerTest
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeDriverConfig config, FireDepartmentMetrics metrics
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return plumber;
@ -108,7 +107,7 @@ public class RealtimeManagerTest
}
}
);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
1,
new Period("P1Y"),
null,
@ -118,14 +117,14 @@ public class RealtimeManagerTest
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, driverConfig, new DateTime().toString()));
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
ioConfig,
driverConfig,
tuningConfig,
null, null, null, null
)
),

View File

@ -33,7 +33,6 @@ import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -42,7 +41,7 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -126,7 +125,7 @@ public class RealtimePlumberSchoolTest
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
1,
null,
null,
@ -153,7 +152,7 @@ public class RealtimePlumberSchoolTest
0
);
plumber = realtimePlumberSchool.findPlumber(schema, driverConfig, new FireDepartmentMetrics());
plumber = realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics());
}
@After

View File

@ -27,7 +27,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireHydrant;
import junit.framework.Assert;
@ -54,7 +54,7 @@ public class SinkTest
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
1,
new Period("P1Y"),
null,
@ -64,7 +64,7 @@ public class SinkTest
null,
null
);
final Sink sink = new Sink(interval, schema, driverConfig, version);
final Sink sink = new Sink(interval, schema, tuningConfig, version);
sink.add(
new InputRow()