From 454acd3f5ae5d05be17d2899c9df88ec03fd034a Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 13 Oct 2014 19:30:44 +0530 Subject: [PATCH] remove backwards compatible code 1) remove backwards compatible and deprecated code 2) make hashed partitions spec default --- .../examples/rand/RandomFirehoseFactory.java | 5 - .../TwitterSpritzerFirehoseFactory.java | 5 - .../examples/web/WebFirehoseFactory.java | 6 - .../indexer/DetermineHashedPartitionsJob.java | 3 +- .../indexer/HadoopDruidIndexerConfig.java | 5 - .../io/druid/indexer/HadoopIngestionSpec.java | 212 +---------- .../indexer/partitions/PartitionsSpec.java | 3 +- .../partitions/RandomPartitionsSpec.java | 41 --- .../path/GranularUnprocessedPathSpec.java | 3 +- .../indexer/HadoopDruidIndexerConfigTest.java | 44 ++- .../indexer/HadoopIngestionSpecTest.java | 289 +++------------ .../partitions/RandomPartitionsSpecTest.java | 71 ---- .../common/index/YeOldePlumberSchool.java | 6 - .../indexing/common/task/HadoopIndexTask.java | 17 +- .../druid/indexing/common/task/IndexTask.java | 51 +-- .../common/task/RealtimeIndexTask.java | 94 +---- .../IngestSegmentFirehoseFactory.java | 6 - .../indexing/common/TestRealtimeTask.java | 35 +- .../indexing/common/task/TaskSerdeTest.java | 113 +++--- .../indexing/overlord/TaskLifecycleTest.java | 336 +++++++++--------- .../indexing/worker/TaskAnnouncementTest.java | 33 +- .../kafka/KafkaEightFirehoseFactory.java | 14 +- .../kafka/KafkaSevenFirehoseFactory.java | 12 +- pom.xml | 4 +- .../data/input/ProtoBufInputRowParser.java | 22 +- .../post/ConstantPostAggregator.java | 5 +- .../io/druid/query/groupby/GroupByQuery.java | 9 +- .../input/ProtoBufInputRowParserTest.java | 3 +- .../test/java/io/druid/query/QueriesTest.java | 8 +- .../io/druid/query/QueryRunnerTestHelper.java | 4 +- .../query/aggregation/AggregatorUtilTest.java | 8 +- .../post/ArithmeticPostAggregatorTest.java | 4 +- .../post/ConstantPostAggregatorTest.java | 21 +- .../query/groupby/GroupByQueryRunnerTest.java | 12 +- .../query/topn/TopNBinaryFnBenchmark.java | 2 +- .../io/druid/query/topn/TopNBinaryFnTest.java | 2 +- .../java/io/druid/segment/AppendTest.java | 3 +- .../io/druid/segment/SchemalessTestFull.java | 2 +- .../druid/segment/SchemalessTestSimple.java | 2 +- .../test/java/io/druid/segment/TestIndex.java | 3 +- .../rabbitmq/RabbitMQFirehoseFactory.java | 13 +- .../antlr4/io/druid/sql/antlr4/DruidSQL.g4 | 6 +- .../io/druid/segment/indexing/DataSchema.java | 2 +- .../granularity/UniformGranularitySpec.java | 9 +- .../segment/realtime/FireDepartment.java | 53 +-- .../io/druid/segment/realtime/Schema.java | 108 ------ .../firehose/ClippedFirehoseFactory.java | 5 - .../firehose/CombiningFirehoseFactory.java | 6 - .../EventReceiverFirehoseFactory.java | 9 - .../realtime/firehose/IrcFirehoseFactory.java | 8 - .../firehose/TimedShutoffFirehoseFactory.java | 6 - .../plumber/FlushingPlumberSchool.java | 19 +- .../realtime/plumber/PlumberSchool.java | 2 - .../plumber/RealtimePlumberSchool.java | 61 +--- .../client/CachingClusteredClientTest.java | 4 +- .../CombiningFirehoseFactoryTest.java | 5 - .../granularity/UniformGranularityTest.java | 6 +- .../segment/realtime/FireDepartmentTest.java | 10 +- .../segment/realtime/RealtimeManagerTest.java | 17 +- .../plumber/RealtimePlumberSchoolTest.java | 11 +- .../segment/realtime/plumber/SinkTest.java | 2 +- .../cli/validate/DruidJsonValidator.java | 3 - 62 files changed, 460 insertions(+), 1423 deletions(-) delete mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java delete mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/partitions/RandomPartitionsSpecTest.java delete mode 100644 server/src/main/java/io/druid/segment/realtime/Schema.java diff --git a/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java b/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java index 4f9940a77a7..91deffacc67 100644 --- a/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java @@ -266,9 +266,4 @@ public class RandomFirehoseFactory implements FirehoseFactory }; } - @Override - public InputRowParser getParser() - { - return null; - } } diff --git a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index cc222d8098b..e211bf8ba5c 100644 --- a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -311,9 +311,4 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory }; } - - @Override - public InputRowParser getParser() - { - return null; - } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 77466b8c0d9..b551aea7108 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -146,8 +146,7 @@ public class DetermineHashedPartitionsJob implements Jobby new UniformGranularitySpec( config.getGranularitySpec().getSegmentGranularity(), config.getGranularitySpec().getQueryGranularity(), - intervals, - config.getGranularitySpec().getSegmentGranularity() + intervals ) ); log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index fcf24454118..ba204c9b3cc 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -113,17 +113,12 @@ public class HadoopDruidIndexerConfig public static HadoopDruidIndexerConfig fromMap(Map argSpec) { - //backwards compatibility - if (argSpec.containsKey("schema")) { - return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); - } else { return new HadoopDruidIndexerConfig( HadoopDruidIndexerConfig.jsonMapper.convertValue( argSpec, HadoopIngestionSpec.class ) ); - } } @SuppressWarnings("unchecked") diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index fec163e4164..eef6dc822d5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -21,26 +21,8 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.metamx.common.Granularity; -import io.druid.data.input.impl.DataSpec; -import io.druid.data.input.impl.StringInputRowParser; -import io.druid.data.input.impl.TimestampSpec; -import io.druid.indexer.partitions.PartitionsSpec; -import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import io.druid.indexer.rollup.DataRollupSpec; -import io.druid.indexer.updater.DbUpdaterJobSpec; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IngestionSpec; -import io.druid.segment.indexing.granularity.GranularitySpec; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import java.util.List; -import java.util.Map; /** */ @@ -54,122 +36,14 @@ public class HadoopIngestionSpec extends IngestionSpec pathSpec, - final @JsonProperty("workingPath") String workingPath, - final @JsonProperty("segmentOutputPath") String segmentOutputPath, - final @JsonProperty("version") String version, - final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, - final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, - final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, - final @JsonProperty("shardSpecs") Map> shardSpecs, - final @JsonProperty("overwriteFiles") boolean overwriteFiles, - final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, - final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, - final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, - final @JsonProperty("jobProperties") Map jobProperties, - final @JsonProperty("combineText") boolean combineText, - // These fields are deprecated and will be removed in the future - final @JsonProperty("timestampColumn") String timestampColumn, - final @JsonProperty("timestampFormat") String timestampFormat, - final @JsonProperty("intervals") List intervals, - final @JsonProperty("segmentGranularity") Granularity segmentGranularity, - final @JsonProperty("partitionDimension") String partitionDimension, - final @JsonProperty("targetPartitionSize") Long targetPartitionSize + @JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig ) { super(dataSchema, ioConfig, tuningConfig); - if (dataSchema != null) { - this.dataSchema = dataSchema; - this.ioConfig = ioConfig; - this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig; - } else { // Backwards compatibility - TimestampSpec theTimestampSpec = (timestampSpec == null) - ? new TimestampSpec(timestampColumn, timestampFormat) - : timestampSpec; - List dimensionExclusions = Lists.newArrayList(); - - dimensionExclusions.add(theTimestampSpec.getTimestampColumn()); - if (rollupSpec != null) { - for (AggregatorFactory aggregatorFactory : rollupSpec.getAggs()) { - dimensionExclusions.add(aggregatorFactory.getName()); - } - } - - PartitionsSpec thePartitionSpec; - if (partitionsSpec != null) { - Preconditions.checkArgument( - partitionDimension == null && targetPartitionSize == null, - "Cannot mix partitionsSpec with partitionDimension/targetPartitionSize" - ); - thePartitionSpec = partitionsSpec; - } else { - // Backwards compatibility - thePartitionSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false); - } - - GranularitySpec theGranularitySpec = null; - if (granularitySpec != null) { - Preconditions.checkArgument( - segmentGranularity == null && intervals == null, - "Cannot mix granularitySpec with segmentGranularity/intervals" - ); - theGranularitySpec = granularitySpec; - if (rollupSpec != null) { - theGranularitySpec = theGranularitySpec.withQueryGranularity(rollupSpec.rollupGranularity); - } - } else { - // Backwards compatibility - if (segmentGranularity != null && intervals != null) { - theGranularitySpec = new UniformGranularitySpec( - segmentGranularity, - rollupSpec == null ? null : rollupSpec.rollupGranularity, - intervals, - segmentGranularity - ); - } - } - - this.dataSchema = new DataSchema( - dataSource, - new StringInputRowParser( - dataSpec == null ? null : dataSpec.toParseSpec(theTimestampSpec, dimensionExclusions), - null, null, null, null - ), - rollupSpec == null - ? new AggregatorFactory[]{} - : rollupSpec.getAggs().toArray(new AggregatorFactory[rollupSpec.getAggs().size()]), - theGranularitySpec - ); - - this.ioConfig = new HadoopIOConfig( - pathSpec, - updaterJobSpec, - segmentOutputPath - ); - - this.tuningConfig = new HadoopTuningConfig( - workingPath, - version, - thePartitionSpec, - shardSpecs, - null, - leaveIntermediate, - cleanupOnFailure, - overwriteFiles, - ignoreInvalidRows, - jobProperties, - combineText, - false, - false - ); - } + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig; } @JsonProperty("dataSchema") @@ -198,31 +72,7 @@ public class HadoopIngestionSpec extends IngestionSpec hadoopDependencyCoordinates, @JsonProperty("classpathPrefix") String classpathPrefix ) { super( - id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec, config), new DateTime()), - getTheDataSource(spec, config) + id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()), + getTheDataSource(spec) ); - this.spec = spec == null ? config : spec; + this.spec = spec; // Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service Preconditions.checkArgument( @@ -116,7 +112,10 @@ public class HadoopIndexTask extends AbstractTask "segmentOutputPath must be absent" ); Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent"); - Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent"); + Preconditions.checkArgument( + this.spec.getIOConfig().getMetadataUpdateSpec() == null, + "updaterJobSpec must be absent" + ); if (hadoopDependencyCoordinates != null) { this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 058b3fd027e..b4506e59750 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -76,32 +76,23 @@ public class IndexTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(IndexTask.class); - private static String makeId(String id, IndexIngestionSpec ingestionSchema, String dataSource) + private static String makeId(String id, IndexIngestionSpec ingestionSchema) { if (id == null) { - return String.format("index_%s_%s", makeDataSource(ingestionSchema, dataSource), new DateTime().toString()); + return String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime().toString()); } return id; } - private static String makeDataSource(IndexIngestionSpec ingestionSchema, String dataSource) + private static String makeDataSource(IndexIngestionSpec ingestionSchema) { - if (ingestionSchema != null) { - return ingestionSchema.getDataSchema().getDataSource(); - } else { // Backwards compatible - return dataSource; - } + return ingestionSchema.getDataSchema().getDataSource(); } - private static Interval makeInterval(IndexIngestionSpec ingestionSchema, GranularitySpec granularitySpec) + private static Interval makeInterval(IndexIngestionSpec ingestionSchema) { - GranularitySpec spec; - if (ingestionSchema != null) { - spec = ingestionSchema.getDataSchema().getGranularitySpec(); - } else { - spec = granularitySpec; - } + GranularitySpec spec = ingestionSchema.getDataSchema().getGranularitySpec(); return new Interval( spec.bucketIntervals().get().first().getStart(), @@ -118,38 +109,18 @@ public class IndexTask extends AbstractFixedIntervalTask public IndexTask( @JsonProperty("id") String id, @JsonProperty("schema") IndexIngestionSpec ingestionSchema, - // Backwards Compatible - @JsonProperty("dataSource") final String dataSource, - @JsonProperty("granularitySpec") final GranularitySpec granularitySpec, - @JsonProperty("aggregators") final AggregatorFactory[] aggregators, - @JsonProperty("indexGranularity") final QueryGranularity indexGranularity, - @JsonProperty("targetPartitionSize") final int targetPartitionSize, - @JsonProperty("firehose") final FirehoseFactory firehoseFactory, - @JsonProperty("rowFlushBoundary") final int rowFlushBoundary, @JacksonInject ObjectMapper jsonMapper ) { super( // _not_ the version, just something uniqueish - makeId(id, ingestionSchema, dataSource), - makeDataSource(ingestionSchema, dataSource), - makeInterval(ingestionSchema, granularitySpec) + makeId(id, ingestionSchema), + makeDataSource(ingestionSchema), + makeInterval(ingestionSchema) ); - if (ingestionSchema != null) { - this.ingestionSchema = ingestionSchema; - } else { // Backwards Compatible - this.ingestionSchema = new IndexIngestionSpec( - new DataSchema( - dataSource, - firehoseFactory.getParser(), - aggregators, - granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity) - ), - new IndexIOConfig(firehoseFactory), - new IndexTuningConfig(targetPartitionSize, 0, null) - ); - } + + this.ingestionSchema = ingestionSchema; this.jsonMapper = jsonMapper; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 906e8e7a901..553dc507fb7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -24,12 +24,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.metamx.common.Granularity; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -46,15 +44,11 @@ import io.druid.query.QueryToolChest; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.FireDepartmentConfig; import io.druid.segment.realtime.RealtimeMetricsMonitor; -import io.druid.segment.realtime.Schema; import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; -import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -69,27 +63,19 @@ public class RealtimeIndexTask extends AbstractTask { private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); - private static String makeTaskId(FireDepartment fireDepartment, Schema schema) + private static String makeTaskId(FireDepartment fireDepartment) { - // Backwards compatible - if (fireDepartment == null) { - return String.format( - "index_realtime_%s_%d_%s", - schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString() - ); - } else { - return String.format( - "index_realtime_%s_%d_%s", - fireDepartment.getDataSchema().getDataSource(), - fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), - new DateTime().toString() - ); - } + return String.format( + "index_realtime_%s_%d_%s", + fireDepartment.getDataSchema().getDataSource(), + fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), + new DateTime().toString() + ); } - private static String makeDatasource(FireDepartment fireDepartment, Schema schema) + private static String makeDatasource(FireDepartment fireDepartment) { - return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource(); + return fireDepartment.getDataSchema().getDataSource(); } @JsonIgnore @@ -105,51 +91,16 @@ public class RealtimeIndexTask extends AbstractTask public RealtimeIndexTask( @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, - @JsonProperty("spec") FireDepartment fireDepartment, - // Backwards compatible, to be deprecated - @JsonProperty("schema") Schema spec, - @JsonProperty("firehose") FirehoseFactory firehoseFactory, - @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, - @JsonProperty("windowPeriod") Period windowPeriod, - @JsonProperty("maxPendingPersists") int maxPendingPersists, - @JsonProperty("segmentGranularity") Granularity segmentGranularity, - @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy, - @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory + @JsonProperty("spec") FireDepartment fireDepartment ) { super( - id == null ? makeTaskId(fireDepartment, spec) : id, - String.format("index_realtime_%s", makeDatasource(fireDepartment, spec)), - taskResource == null ? new TaskResource(makeTaskId(fireDepartment, spec), 1) : taskResource, - makeDatasource(fireDepartment, spec) + id == null ? makeTaskId(fireDepartment) : id, + String.format("index_realtime_%s", makeDatasource(fireDepartment)), + taskResource == null ? new TaskResource(makeTaskId(fireDepartment), 1) : taskResource, + makeDatasource(fireDepartment) ); - - if (fireDepartment != null) { - this.spec = fireDepartment; - } else { - this.spec = new FireDepartment( - new DataSchema( - spec.getDataSource(), - firehoseFactory == null ? null : firehoseFactory.getParser(), - spec.getAggregators(), - new UniformGranularitySpec(segmentGranularity, spec.getIndexGranularity(), null, segmentGranularity) - ), - new RealtimeIOConfig(firehoseFactory, null), - new RealtimeTuningConfig( - fireDepartmentConfig == null ? null : fireDepartmentConfig.getMaxRowsInMemory(), - fireDepartmentConfig == null ? null : fireDepartmentConfig.getIntermediatePersistPeriod(), - windowPeriod, - null, - null, - rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, - maxPendingPersists, - spec.getShardSpec(), - false, - false - ), - null, null, null, null - ); - } + this.spec = fireDepartment; } @Override @@ -290,11 +241,7 @@ public class RealtimeIndexTask extends AbstractTask final FireDepartment fireDepartment = new FireDepartment( dataSchema, realtimeIOConfig, - tuningConfig, - null, - null, - null, - null + tuningConfig ); final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment)); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); @@ -310,14 +257,7 @@ public class RealtimeIndexTask extends AbstractTask lockingSegmentAnnouncer, segmentPublisher, toolbox.getNewSegmentServerView(), - toolbox.getQueryExecutorService(), - null, - null, - null, - null, - null, - null, - 0 + toolbox.getQueryExecutorService() ); this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 5744fa7f006..dd42bff97ca 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -218,12 +218,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory rowYielder; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 5280a884a55..b31682e831f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -22,13 +22,23 @@ package io.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.metamx.common.Granularity; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.TaskResource; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.realtime.Schema; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.timeline.partition.NoneShardSpec; +import java.io.File; + /** */ @JsonTypeName("test_realtime") @@ -47,15 +57,20 @@ public class TestRealtimeTask extends RealtimeIndexTask super( id, taskResource, - null, - new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()), - null, - null, - null, - 1, - null, - null, - null + new FireDepartment( + new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), new RealtimeIOConfig( + new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() + { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return null; + } + } + ), null + ) ); this.status = status; } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index ed4769644d1..cfcb23df6cf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -25,19 +25,24 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.metamx.common.Granularity; -import io.druid.data.input.impl.JSONDataSpec; -import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; import io.druid.guice.FirehoseModule; +import io.druid.indexer.HadoopIOConfig; import io.druid.indexer.HadoopIngestionSpec; -import io.druid.indexer.rollup.DataRollupSpec; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import io.druid.segment.realtime.Schema; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; +import io.druid.segment.realtime.plumber.RealtimePlumberSchool; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import junit.framework.Assert; @@ -56,19 +61,20 @@ public class TaskSerdeTest { final IndexTask task = new IndexTask( null, - null, - "foo", - new UniformGranularitySpec( - Granularity.DAY, - null, - ImmutableList.of(new Interval("2010-01-01/P2D")), - Granularity.DAY + new IndexTask.IndexIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of(new Interval("2010-01-01/P2D")) + ) + ), + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), + new IndexTask.IndexTuningConfig(10000, -1, -1) ), - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - QueryGranularity.NONE, - 10000, - new LocalFirehoseFactory(new File("lol"), "rofl", null), - -1, jsonMapper ); @@ -196,18 +202,40 @@ public class TaskSerdeTest @Test public void testRealtimeIndexTaskSerde() throws Exception { + final RealtimeIndexTask task = new RealtimeIndexTask( null, new TaskResource("rofl", 2), - null, - new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), - null, - null, - new Period("PT10M"), - 1, - Granularity.HOUR, - null, - null + new FireDepartment( + new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + ), + new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() + { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return null; + } + }), + new RealtimeTuningConfig( + 1, + new Period("PT10M"), + null, + null, + null, + null, + 1, + new NoneShardSpec(), + false, + false + ) + ) ); final String json = jsonMapper.writeValueAsString(task); @@ -292,7 +320,6 @@ public class TaskSerdeTest Assert.assertEquals(task.getInterval(), task2.getInterval()); } - @Test public void testRestoreTaskSerde() throws Exception { @@ -346,39 +373,15 @@ public class TaskSerdeTest public void testHadoopIndexTaskSerde() throws Exception { final HadoopIndexTask task = new HadoopIndexTask( - null, null, new HadoopIngestionSpec( - null, null, null, - "foo", - new TimestampSpec("timestamp", "auto"), - new JSONDataSpec(ImmutableList.of("foo"), null), - new UniformGranularitySpec( + new DataSchema( + "foo", null, new AggregatorFactory[0], new UniformGranularitySpec( Granularity.DAY, null, - ImmutableList.of(new Interval("2010-01-01/P1D")), - Granularity.DAY - ), - ImmutableMap.of("paths", "bar"), - null, - null, - null, - null, - false, - true, - null, - false, - null, - null, - false, - ImmutableMap.of("foo", "bar"), - false, - null, - null, - null, - null, - null, - null + ImmutableList.of(new Interval("2010-01-01/P1D")) + ) + ), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null ), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 44bbb4c8f6d..ce7169e94b5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -65,6 +65,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; @@ -95,16 +96,6 @@ import java.util.Set; public class TaskLifecycleTest { - private File tmp = null; - private TaskStorage ts = null; - private TaskLockbox tl = null; - private TaskQueue tq = null; - private TaskRunner tr = null; - private MockIndexerDBCoordinator mdc = null; - private TaskActionClientFactory tac = null; - private TaskToolboxFactory tb = null; - TaskStorageQueryAdapter tsqa = null; - private static final Ordering byIntervalOrdering = new Ordering() { @Override @@ -113,6 +104,141 @@ public class TaskLifecycleTest return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval()); } }; + TaskStorageQueryAdapter tsqa = null; + private File tmp = null; + private TaskStorage ts = null; + private TaskLockbox tl = null; + private TaskQueue tq = null; + private TaskRunner tr = null; + private MockIndexerDBCoordinator mdc = null; + private TaskActionClientFactory tac = null; + private TaskToolboxFactory tb = null; + + private static MockIndexerDBCoordinator newMockMDC() + { + return new MockIndexerDBCoordinator(); + } + + private static ServiceEmitter newMockEmitter() + { + return new ServiceEmitter(null, null, null) + { + @Override + public void emit(Event event) + { + + } + + @Override + public void emit(ServiceEventBuilder builder) + { + + } + }; + } + + private static InputRow IR(String dt, String dim1, String dim2, float met) + { + return new MapBasedInputRow( + new DateTime(dt).getMillis(), + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of( + "dim1", dim1, + "dim2", dim2, + "met", met + ) + ); + } + + private static FirehoseFactory newMockExceptionalFirehoseFactory() + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new Firehose() + { + @Override + public boolean hasMore() + { + return true; + } + + @Override + public InputRow nextRow() + { + throw new RuntimeException("HA HA HA"); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + }; + } + + private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + final Iterator inputRowIterator = inputRows.iterator(); + + return new Firehose() + { + @Override + public boolean hasMore() + { + return inputRowIterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + return inputRowIterator.next(); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + }; + } @Before public void setUp() throws Exception @@ -230,26 +356,20 @@ public class TaskLifecycleTest { final Task indexTask = new IndexTask( null, - null, - "foo", - new UniformGranularitySpec( + new IndexTask.IndexIngestionSpec(new DataSchema("foo", null,new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},new UniformGranularitySpec( Granularity.DAY, null, - ImmutableList.of(new Interval("2010-01-01/P2D")), - Granularity.DAY - ), - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - QueryGranularity.NONE, - 10000, - newMockFirehoseFactory( - ImmutableList.of( - IR("2010-01-01T01", "x", "y", 1), - IR("2010-01-01T01", "x", "z", 1), - IR("2010-01-02T01", "a", "b", 2), - IR("2010-01-02T01", "a", "c", 1) - ) - ), - -1, + ImmutableList.of(new Interval("2010-01-01/P2D")) + ) ), + new IndexTask.IndexIOConfig(newMockFirehoseFactory( + ImmutableList.of( + IR("2010-01-01T01", "x", "y", 1), + IR("2010-01-01T01", "x", "z", 1), + IR("2010-01-02T01", "a", "b", 2), + IR("2010-01-02T01", "a", "c", 1) + ) + )), + new IndexTask.IndexTuningConfig(10000, -1, -1)), TestUtils.MAPPER ); @@ -291,14 +411,20 @@ public class TaskLifecycleTest { final Task indexTask = new IndexTask( null, - null, - "foo", - new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY), - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - QueryGranularity.NONE, - 10000, - newMockExceptionalFirehoseFactory(), - -1, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of(new Interval("2010-01-01/P1D")) + ) + ), + new IndexTask.IndexIOConfig(newMockExceptionalFirehoseFactory()), + new IndexTask.IndexTuningConfig(10000, -1, -1) + ), TestUtils.MAPPER ); @@ -560,142 +686,4 @@ public class TaskLifecycleTest return ImmutableSet.copyOf(nuked); } } - - private static MockIndexerDBCoordinator newMockMDC() - { - return new MockIndexerDBCoordinator(); - } - - private static ServiceEmitter newMockEmitter() - { - return new ServiceEmitter(null, null, null) - { - @Override - public void emit(Event event) - { - - } - - @Override - public void emit(ServiceEventBuilder builder) - { - - } - }; - } - - private static InputRow IR(String dt, String dim1, String dim2, float met) - { - return new MapBasedInputRow( - new DateTime(dt).getMillis(), - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of( - "dim1", dim1, - "dim2", dim2, - "met", met - ) - ); - } - - private static FirehoseFactory newMockExceptionalFirehoseFactory() - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - return new Firehose() - { - @Override - public boolean hasMore() - { - return true; - } - - @Override - public InputRow nextRow() - { - throw new RuntimeException("HA HA HA"); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } - - private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - final Iterator inputRowIterator = inputRows.iterator(); - - return new Firehose() - { - @Override - public boolean hasMore() - { - return inputRowIterator.hasNext(); - } - - @Override - public InputRow nextRow() - { - return inputRowIterator.next(); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index ec593aeee33..46dd82a93e8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -28,12 +28,21 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.realtime.Schema; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.timeline.partition.NoneShardSpec; import junit.framework.Assert; import org.joda.time.Period; import org.junit.Test; +import java.io.File; + public class TaskAnnouncementTest { @Test @@ -42,15 +51,19 @@ public class TaskAnnouncementTest final Task task = new RealtimeIndexTask( "theid", new TaskResource("rofl", 2), - null, - new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), - null, - null, - new Period("PT10M"), - 1, - Granularity.HOUR, - null, - null + new FireDepartment( + new DataSchema("foo", null, new AggregatorFactory[0], null), + new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() + { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return null; + } + }), null + ) ); final TaskStatus status = TaskStatus.running(task.getId()); final TaskAnnouncement announcement = TaskAnnouncement.create(task, status); diff --git a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index a97f18ae1b7..b9d259f0ba6 100644 --- a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -54,20 +54,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory UTF-8 - 0.26.7 + 0.26.9 2.6.0 - 0.2.10 + 0.2.16-SNAPSHOT diff --git a/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java b/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java index df8bdb5cef8..fc5bfb635c9 100644 --- a/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java +++ b/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java @@ -57,26 +57,12 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser @JsonCreator public ProtoBufInputRowParser( @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("descriptor") String descriptorFileInClasspath, - // Backwards compatible - @JsonProperty("timestampSpec") TimestampSpec timestampSpec, - @JsonProperty("dimensions") List dimensions, - @JsonProperty("dimensionExclusions") List dimensionExclusions, - @JsonProperty("spatialDimensions") List spatialDimensions + @JsonProperty("descriptor") String descriptorFileInClasspath ) { - // Backwards Compatible - if (parseSpec == null) { - this.parseSpec = new JSONParseSpec( - timestampSpec, - new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions) - ); - } else { - this.parseSpec = parseSpec; - } - + this.parseSpec = parseSpec; this.descriptorFileInClasspath = descriptorFileInClasspath; - this.mapParser = new MapInputRowParser(this.parseSpec, null, null, null, null); + this.mapParser = new MapInputRowParser(this.parseSpec); } @Override @@ -88,7 +74,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser @Override public ProtoBufInputRowParser withParseSpec(ParseSpec parseSpec) { - return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath, null, null, null, null); + return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java index 6bbacdeba48..25c8474adaf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java @@ -39,12 +39,11 @@ public class ConstantPostAggregator implements PostAggregator @JsonCreator public ConstantPostAggregator( @JsonProperty("name") String name, - @JsonProperty("value") Number constantValue, - @JsonProperty("constantValue") Number backwardsCompatibleValue + @JsonProperty("value") Number constantValue ) { this.name = name; - this.constantValue = constantValue == null ? backwardsCompatibleValue : constantValue; + this.constantValue = constantValue; Preconditions.checkNotNull(this.constantValue); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index a0fab78da07..c12b6de0f8e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -84,9 +84,7 @@ public class GroupByQuery extends BaseQuery @JsonProperty("postAggregations") List postAggregatorSpecs, @JsonProperty("having") HavingSpec havingSpec, @JsonProperty("limitSpec") LimitSpec limitSpec, - @JsonProperty("context") Map context, - // Backwards compatible - @JsonProperty("orderBy") LimitSpec orderBySpec + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -96,7 +94,7 @@ public class GroupByQuery extends BaseQuery this.aggregatorSpecs = aggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; this.havingSpec = havingSpec; - this.limitSpec = (limitSpec == null) ? (orderBySpec == null ? new NoopLimitSpec() : orderBySpec) : limitSpec; + this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec; Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator"); @@ -523,8 +521,7 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs, havingSpec, theLimitSpec, - context, - null + context ); } } diff --git a/processing/src/test/java/io/druid/data/input/ProtoBufInputRowParserTest.java b/processing/src/test/java/io/druid/data/input/ProtoBufInputRowParserTest.java index 67d5fdc3574..1981dd51f24 100644 --- a/processing/src/test/java/io/druid/data/input/ProtoBufInputRowParserTest.java +++ b/processing/src/test/java/io/druid/data/input/ProtoBufInputRowParserTest.java @@ -61,8 +61,7 @@ public class ProtoBufInputRowParserTest new TimestampSpec("timestamp", "iso"), new DimensionsSpec(Arrays.asList(DIMENSIONS), Arrays.asList(), null) ), - "prototest.desc", - null, null, null, null + "prototest.desc" ); diff --git a/processing/src/test/java/io/druid/query/QueriesTest.java b/processing/src/test/java/io/druid/query/QueriesTest.java index 763aeb135fd..eaac2446d76 100644 --- a/processing/src/test/java/io/druid/query/QueriesTest.java +++ b/processing/src/test/java/io/druid/query/QueriesTest.java @@ -119,7 +119,7 @@ public class QueriesTest "+", Arrays.asList( new FieldAccessPostAggregator("idx", "idx"), - new ConstantPostAggregator("const", 1, null) + new ConstantPostAggregator("const", 1) ) ), new ArithmeticPostAggregator( @@ -127,7 +127,7 @@ public class QueriesTest "-", Arrays.asList( new FieldAccessPostAggregator("rev", "rev"), - new ConstantPostAggregator("const", 1, null) + new ConstantPostAggregator("const", 1) ) ) ) @@ -173,7 +173,7 @@ public class QueriesTest "+", Arrays.asList( new FieldAccessPostAggregator("idx", "idx"), - new ConstantPostAggregator("const", 1, null) + new ConstantPostAggregator("const", 1) ) ), new ArithmeticPostAggregator( @@ -181,7 +181,7 @@ public class QueriesTest "-", Arrays.asList( new FieldAccessPostAggregator("rev", "rev2"), - new ConstantPostAggregator("const", 1, null) + new ConstantPostAggregator("const", 1) ) ) ) diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index b47f78cefb1..196a80fd882 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -116,7 +116,7 @@ public class QueryRunnerTestHelper Arrays.asList("quality"), false ); - public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final ArithmeticPostAggregator addRowsIndexConstant = @@ -138,7 +138,7 @@ public class QueryRunnerTestHelper public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator( hyperUniqueFinalizingPostAggMetric, "+", - Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1, 1)) + Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1)) ); public static final List commonAggregators = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java index 08c51f9438b..49c1b1fce3e 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -43,12 +43,12 @@ public class AggregatorUtilTest { PostAggregator agg1 = new ArithmeticPostAggregator( "abc", "+", Lists.newArrayList( - new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L) + new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("2", 2L) ) ); PostAggregator dependency1 = new ArithmeticPostAggregator( "dep1", "+", Lists.newArrayList( - new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L) + new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("4", 4L) ) ); PostAggregator agg2 = new FieldAccessPostAggregator("def", "def"); @@ -78,12 +78,12 @@ public class AggregatorUtilTest { PostAggregator agg1 = new ArithmeticPostAggregator( "abc", "+", Lists.newArrayList( - new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L) + new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("2", 2L) ) ); PostAggregator dependency1 = new ArithmeticPostAggregator( "dep1", "+", Lists.newArrayList( - new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L) + new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("4", 4L) ) ); PostAggregator agg2 = new FieldAccessPostAggregator("def", "def"); diff --git a/processing/src/test/java/io/druid/query/aggregation/post/ArithmeticPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/post/ArithmeticPostAggregatorTest.java index e56ccb94ad9..6c0f49e96bd 100644 --- a/processing/src/test/java/io/druid/query/aggregation/post/ArithmeticPostAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/post/ArithmeticPostAggregatorTest.java @@ -48,7 +48,7 @@ public class ArithmeticPostAggregatorTest List postAggregatorList = Lists.newArrayList( new ConstantPostAggregator( - "roku", 6, null + "roku", 6 ), new FieldAccessPostAggregator( "rows", "rows" @@ -79,7 +79,7 @@ public class ArithmeticPostAggregatorTest List postAggregatorList = Lists.newArrayList( new ConstantPostAggregator( - "roku", 6, null + "roku", 6 ), new FieldAccessPostAggregator( "rows", "rows" diff --git a/processing/src/test/java/io/druid/query/aggregation/post/ConstantPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/post/ConstantPostAggregatorTest.java index 4108b8d2239..e2ef3ba9cb7 100644 --- a/processing/src/test/java/io/druid/query/aggregation/post/ConstantPostAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/post/ConstantPostAggregatorTest.java @@ -34,11 +34,11 @@ public class ConstantPostAggregatorTest { ConstantPostAggregator constantPostAggregator; - constantPostAggregator = new ConstantPostAggregator("shichi", 7, null); + constantPostAggregator = new ConstantPostAggregator("shichi", 7); Assert.assertEquals(7, constantPostAggregator.compute(null)); - constantPostAggregator = new ConstantPostAggregator("rei", 0.0, null); + constantPostAggregator = new ConstantPostAggregator("rei", 0.0); Assert.assertEquals(0.0, constantPostAggregator.compute(null)); - constantPostAggregator = new ConstantPostAggregator("ichi", 1.0, null); + constantPostAggregator = new ConstantPostAggregator("ichi", 1.0); Assert.assertNotSame(1, constantPostAggregator.compute(null)); } @@ -46,29 +46,18 @@ public class ConstantPostAggregatorTest public void testComparator() { ConstantPostAggregator constantPostAggregator = - new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1, null); + new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1); Comparator comp = constantPostAggregator.getComparator(); Assert.assertEquals(0, comp.compare(0, constantPostAggregator.compute(null))); Assert.assertEquals(0, comp.compare(0, 1)); Assert.assertEquals(0, comp.compare(1, 0)); } - @Test - public void testSerdeBackwardsCompatible() throws Exception - { - DefaultObjectMapper mapper = new DefaultObjectMapper(); - ConstantPostAggregator aggregator = mapper.readValue( - "{\"type\":\"constant\",\"name\":\"thistestbasicallydoesnothing unhappyface\",\"constantValue\":1}\n", - ConstantPostAggregator.class - ); - Assert.assertEquals(new Integer(1), aggregator.getConstantValue()); - } - @Test public void testSerde() throws Exception { DefaultObjectMapper mapper = new DefaultObjectMapper(); - ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2, null); + ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2); ConstantPostAggregator aggregator1 = mapper.readValue( mapper.writeValueAsString(aggregator), ConstantPostAggregator.class diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index b0e76bd1eb7..eb27672ffbf 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1130,7 +1130,7 @@ public class GroupByQueryRunnerTest new ArithmeticPostAggregator( "idx_subpostagg", "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"), - new ConstantPostAggregator("thousand", 1000, 1000) + new ConstantPostAggregator("thousand", 1000) ) ) @@ -1155,7 +1155,7 @@ public class GroupByQueryRunnerTest new ArithmeticPostAggregator( "idx", "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_agg", "idx"), - new ConstantPostAggregator("ten_thousand", 10000, 10000) + new ConstantPostAggregator("ten_thousand", 10000) ) ) @@ -1213,7 +1213,7 @@ public class GroupByQueryRunnerTest "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"), - new ConstantPostAggregator("thousand", 1000, 1000) + new ConstantPostAggregator("thousand", 1000) ) ) @@ -1255,7 +1255,7 @@ public class GroupByQueryRunnerTest new ArithmeticPostAggregator( "idx", "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_agg", "idx"), - new ConstantPostAggregator("ten_thousand", 10000, 10000) + new ConstantPostAggregator("ten_thousand", 10000) ) ) @@ -1318,7 +1318,7 @@ public class GroupByQueryRunnerTest "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"), - new ConstantPostAggregator("thousand", 1000, 1000) + new ConstantPostAggregator("thousand", 1000) ) ) @@ -1361,7 +1361,7 @@ public class GroupByQueryRunnerTest new ArithmeticPostAggregator( "idx", "+", Arrays.asList( new FieldAccessPostAggregator("the_idx_agg", "idx"), - new ConstantPostAggregator("ten_thousand", 10000, 10000) + new ConstantPostAggregator("ten_thousand", 10000) ) ) diff --git a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnBenchmark.java b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnBenchmark.java index 65ab16ad19b..7f3fb891366 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnBenchmark.java +++ b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnBenchmark.java @@ -61,7 +61,7 @@ public class TopNBinaryFnBenchmark extends SimpleBenchmark protected void setUp() throws Exception { - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); diff --git a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java index c8958dc6580..af18d9e9839 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java @@ -47,7 +47,7 @@ public class TopNBinaryFnTest { final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator( diff --git a/processing/src/test/java/io/druid/segment/AppendTest.java b/processing/src/test/java/io/druid/segment/AppendTest.java index 740b29315c4..4e9f65e5e9a 100644 --- a/processing/src/test/java/io/druid/segment/AppendTest.java +++ b/processing/src/test/java/io/druid/segment/AppendTest.java @@ -54,7 +54,6 @@ import io.druid.query.topn.TopNResultValue; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.Arrays; @@ -88,7 +87,7 @@ public class AppendTest final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final ArithmeticPostAggregator addRowsIndexConstant = diff --git a/processing/src/test/java/io/druid/segment/SchemalessTestFull.java b/processing/src/test/java/io/druid/segment/SchemalessTestFull.java index 0efa20822ca..c5b95be9919 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessTestFull.java +++ b/processing/src/test/java/io/druid/segment/SchemalessTestFull.java @@ -79,7 +79,7 @@ public class SchemalessTestFull final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final ArithmeticPostAggregator addRowsIndexConstant = diff --git a/processing/src/test/java/io/druid/segment/SchemalessTestSimple.java b/processing/src/test/java/io/druid/segment/SchemalessTestSimple.java index 153a1226619..a2aaf4b9e62 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessTestSimple.java +++ b/processing/src/test/java/io/druid/segment/SchemalessTestSimple.java @@ -106,7 +106,7 @@ public class SchemalessTestSimple final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); + final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final ArithmeticPostAggregator addRowsIndexConstant = diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 7180a6edb52..0c4355ae7e2 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -197,8 +197,7 @@ public class TestIndex "\t", "\u0001", Arrays.asList(COLUMNS) - ), - null, null, null, null + ) ); boolean runOnce = false; int lineCount = 0; diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 8504c70ed9d..4a4295a2178 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -108,22 +108,17 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory _queue; diff --git a/server/src/main/antlr4/io/druid/sql/antlr4/DruidSQL.g4 b/server/src/main/antlr4/io/druid/sql/antlr4/DruidSQL.g4 index 37716ab6673..1cafead058d 100644 --- a/server/src/main/antlr4/io/druid/sql/antlr4/DruidSQL.g4 +++ b/server/src/main/antlr4/io/druid/sql/antlr4/DruidSQL.g4 @@ -212,12 +212,12 @@ unaryExpression returns [PostAggregator p] if($e.p instanceof ConstantPostAggregator) { ConstantPostAggregator c = (ConstantPostAggregator)$e.p; double v = c.getConstantValue().doubleValue() * -1; - $p = new ConstantPostAggregator(Double.toString(v), v, null); + $p = new ConstantPostAggregator(Double.toString(v), v); } else { $p = new ArithmeticPostAggregator( "-"+$e.p.getName(), "*", - Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0, null)) + Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0)) ); } } @@ -240,7 +240,7 @@ aggregate returns [AggregatorFactory agg] ; constant returns [ConstantPostAggregator c] - : value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v, null); } + : value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v); } ; /* time filters must be top level filters */ diff --git a/server/src/main/java/io/druid/segment/indexing/DataSchema.java b/server/src/main/java/io/druid/segment/indexing/DataSchema.java index 3013c9728f7..f3a4172660c 100644 --- a/server/src/main/java/io/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/io/druid/segment/indexing/DataSchema.java @@ -74,7 +74,7 @@ public class DataSchema this.aggregators = aggregators; this.granularitySpec = granularitySpec == null - ? new UniformGranularitySpec(null, null, null, null) + ? new UniformGranularitySpec(null, null, null) : granularitySpec; } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index d83f824366e..9bf64ff8dd4 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -47,16 +47,12 @@ public class UniformGranularitySpec implements GranularitySpec public UniformGranularitySpec( @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("queryGranularity") QueryGranularity queryGranularity, - @JsonProperty("intervals") List inputIntervals, - // Backwards compatible - @JsonProperty("gran") Granularity granularity + @JsonProperty("intervals") List inputIntervals ) { if (segmentGranularity != null) { this.segmentGranularity = segmentGranularity; - } else if (granularity != null) { // backwards compatibility - this.segmentGranularity = granularity; } else { this.segmentGranularity = defaultSegmentGranularity; } @@ -111,8 +107,7 @@ public class UniformGranularitySpec implements GranularitySpec return new UniformGranularitySpec( segmentGranularity, queryGranularity, - inputIntervals, - segmentGranularity + inputIntervals ); } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 3e2211f51fd..446e518aa9a 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -56,58 +56,17 @@ public class FireDepartment extends IngestionSpecnewArrayList(), - plumberSchool.getSegmentGranularity() - ) - ); - this.ioConfig = new RealtimeIOConfig( - firehoseFactory, - plumberSchool - ); - this.tuningConfig = new RealtimeTuningConfig( - config.getMaxRowsInMemory(), - config.getIntermediatePersistPeriod(), - ((RealtimePlumberSchool) plumberSchool).getWindowPeriod(), - ((RealtimePlumberSchool) plumberSchool).getBasePersistDirectory(), - ((RealtimePlumberSchool) plumberSchool).getVersioningPolicy(), - ((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(), - ((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(), - schema.getShardSpec(), - false, - false - ); - } else { - Preconditions.checkNotNull(dataSchema, "dataSchema"); - Preconditions.checkNotNull(ioConfig, "ioConfig"); - - this.dataSchema = dataSchema; - this.ioConfig = ioConfig; - this.tuningConfig = tuningConfig == null ? RealtimeTuningConfig.makeDefaultTuningConfig() : tuningConfig; - } } /** diff --git a/server/src/main/java/io/druid/segment/realtime/Schema.java b/server/src/main/java/io/druid/segment/realtime/Schema.java deleted file mode 100644 index d63c34efa99..00000000000 --- a/server/src/main/java/io/druid/segment/realtime/Schema.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.realtime; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import io.druid.data.input.impl.SpatialDimensionSchema; -import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.timeline.partition.NoneShardSpec; -import io.druid.timeline.partition.ShardSpec; - -import java.util.Arrays; -import java.util.List; - -/** - */ -@Deprecated -public class Schema -{ - private final String dataSource; - private final List spatialDimensions; - private final AggregatorFactory[] aggregators; - private final QueryGranularity indexGranularity; - private final ShardSpec shardSpec; - - @JsonCreator - public Schema( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("spatialDimensions") List spatialDimensions, - @JsonProperty("aggregators") AggregatorFactory[] aggregators, - @JsonProperty("indexGranularity") QueryGranularity indexGranularity, - @JsonProperty("shardSpec") ShardSpec shardSpec - ) - { - this.dataSource = dataSource; - this.spatialDimensions = (spatialDimensions == null) ? Lists.newArrayList() - : spatialDimensions; - this.aggregators = aggregators; - this.indexGranularity = indexGranularity; - this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec; - - Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(aggregators, "aggregators"); - Preconditions.checkNotNull(indexGranularity, "indexGranularity"); - } - - @JsonProperty - public String getDataSource() - { - return dataSource; - } - - @JsonProperty("spatialDimensions") - public List getSpatialDimensions() - { - return spatialDimensions; - } - - @JsonProperty - public AggregatorFactory[] getAggregators() - { - return aggregators; - } - - @JsonProperty - public QueryGranularity getIndexGranularity() - { - return indexGranularity; - } - - @JsonProperty - public ShardSpec getShardSpec() - { - return shardSpec; - } - - @Override - public String toString() - { - return "Schema{" + - "dataSource='" + dataSource + '\'' + - ", spatialDimensions=" + spatialDimensions + - ", aggregators=" + (aggregators == null ? null : Arrays.asList(aggregators)) + - ", indexGranularity=" + indexGranularity + - ", shardSpec=" + shardSpec + - '}'; - } -} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java index f082883f395..7abdc654dbc 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java @@ -76,9 +76,4 @@ public class ClippedFirehoseFactory implements FirehoseFactory ); } - @Override - public InputRowParser getParser() - { - return delegate.getParser(); - } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 745fa768943..faea652273d 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -57,12 +57,6 @@ public class CombiningFirehoseFactory implements FirehoseFactory return new CombiningFirehose(parser); } - @Override - public InputRowParser getParser() - { - return delegateFactoryList.get(0).getParser(); - } - @JsonProperty("delegates") public List getDelegateFactoryList() { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index f4768195d4a..fe3e9126798 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -59,7 +59,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory chatHandlerProvider; @JsonCreator @@ -75,8 +74,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory buffer; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index f4fd3e6fc41..fdb98cf0743 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -102,7 +102,6 @@ public class IrcFirehoseFactory implements FirehoseFactory private final String host; private final List channels; private final IrcDecoder decoder; - private final IrcParser parser; @JsonCreator public IrcFirehoseFactory( @@ -116,7 +115,6 @@ public class IrcFirehoseFactory implements FirehoseFactory this.host = host; this.channels = channels; this.decoder = decoder; - this.parser = new IrcParser(decoder); } @JsonProperty @@ -268,11 +266,5 @@ public class IrcFirehoseFactory implements FirehoseFactory } }; } - - @Override - public IrcParser getParser() - { - return parser; - } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index f5ecc0e8622..ce3d728401a 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -59,12 +59,6 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactoryasList( new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), - new ConstantPostAggregator("constant", 2, 2) + new ConstantPostAggregator("constant", 2) ) ), new ArithmeticPostAggregator( @@ -181,7 +181,7 @@ public class CachingClusteredClientTest "/", Arrays.asList( new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), - new ConstantPostAggregator("constant", 2, 2) + new ConstantPostAggregator("constant", 2) ) ) ); diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java index 3efc191ef3b..9089d021bad 100644 --- a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -150,10 +150,5 @@ public class CombiningFirehoseFactoryTest }; } - @Override - public InputRowParser getParser() - { - return null; - } } } diff --git a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java index 55a529596bf..68333665763 100644 --- a/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/io/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -45,8 +45,7 @@ public class UniformGranularityTest new Interval("2012-01-07T00Z/2012-01-08T00Z"), new Interval("2012-01-03T00Z/2012-01-04T00Z"), new Interval("2012-01-01T00Z/2012-01-03T00Z") - ), - null + ) ); Assert.assertEquals( @@ -104,8 +103,7 @@ public class UniformGranularityTest new Interval("2012-01-07T00Z/2012-01-08T00Z"), new Interval("2012-01-03T00Z/2012-01-04T00Z"), new Interval("2012-01-01T00Z/2012-01-03T00Z") - ), - null + ) ); try { diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 549574b238e..53defa22810 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -62,24 +62,22 @@ public class FireDepartmentTest null, null ) - ), - null, null, null, null + ) ), new AggregatorFactory[]{ new CountAggregatorFactory("count") }, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null) ), new RealtimeIOConfig( null, new RealtimePlumberSchool( - null, null, null, null, null, null, null, null, null, null, null, null, null, 0 + null, null, null, null, null, null, null ) ), new RealtimeTuningConfig( null, null, null, null, null, null, null, null, false, false - ), - null, null, null, null + ) ); String json = jsonMapper.writeValueAsString(schema); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index b6619b35f38..37c56b64401 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -74,7 +74,7 @@ public class RealtimeManagerTest "test", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) ); RealtimeIOConfig ioConfig = new RealtimeIOConfig( new FirehoseFactory() @@ -84,12 +84,6 @@ public class RealtimeManagerTest { return new TestFirehose(rows.iterator()); } - - @Override - public ByteBufferInputRowParser getParser() - { - throw new UnsupportedOperationException(); - } }, new PlumberSchool() { @@ -100,12 +94,6 @@ public class RealtimeManagerTest { return plumber; } - - @Override - public Granularity getSegmentGranularity() - { - throw new UnsupportedOperationException(); - } } ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( @@ -127,8 +115,7 @@ public class RealtimeManagerTest new FireDepartment( schema, ioConfig, - tuningConfig, - null, null, null, null + tuningConfig ) ), null diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index c037d5b2b13..2e882d6a464 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -133,7 +133,7 @@ public class RealtimePlumberSchoolTest } }, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) ); announcer = EasyMock.createMock(DataSegmentAnnouncer.class); @@ -174,14 +174,7 @@ public class RealtimePlumberSchoolTest announcer, segmentPublisher, serverView, - MoreExecutors.sameThreadExecutor(), - new Period("PT10m"), - tmpDir, - Granularity.HOUR, - new IntervalStartVersioningPolicy(), - rejectionPolicy, - null, - 0 + MoreExecutors.sameThreadExecutor() ); plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics()); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 8fed5962f54..d625cf1d1fe 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -50,7 +50,7 @@ public class SinkTest "test", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null) ); final Interval interval = new Interval("2013-01-01/2013-01-02"); diff --git a/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java b/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java index 0f773f7e84b..2784fe0c7a5 100644 --- a/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java +++ b/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java @@ -28,7 +28,6 @@ import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexing.common.task.Task; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; -import io.druid.segment.realtime.Schema; import java.io.File; @@ -63,8 +62,6 @@ public class DruidJsonValidator implements Runnable jsonMapper.readValue(file, HadoopDruidIndexerConfig.class); } else if (type.equalsIgnoreCase("task")) { jsonMapper.readValue(file, Task.class); - } else if (type.equalsIgnoreCase("realtimeSchema")) { - jsonMapper.readValue(file, Schema.class); } else { throw new UOE("Unknown type[%s]", type); }