remove backwards compatible code

1) remove backwards compatible and deprecated code
2) make hashed partitions spec default
This commit is contained in:
nishantmonu51 2014-10-13 19:30:44 +05:30
parent c7b4d5b7b4
commit 454acd3f5a
62 changed files with 460 additions and 1423 deletions

View File

@ -266,9 +266,4 @@ public class RandomFirehoseFactory implements FirehoseFactory<InputRowParser>
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -311,9 +311,4 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -133,10 +133,4 @@ public class WebFirehoseFactory implements FirehoseFactory<InputRowParser>
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}

View File

@ -146,8 +146,7 @@ public class DetermineHashedPartitionsJob implements Jobby
new UniformGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
intervals,
config.getGranularitySpec().getSegmentGranularity()
intervals
)
);
log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals());

View File

@ -113,17 +113,12 @@ public class HadoopDruidIndexerConfig
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
//backwards compatibility
if (argSpec.containsKey("schema")) {
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
} else {
return new HadoopDruidIndexerConfig(
HadoopDruidIndexerConfig.jsonMapper.convertValue(
argSpec,
HadoopIngestionSpec.class
)
);
}
}
@SuppressWarnings("unchecked")

View File

@ -21,26 +21,8 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
@ -54,122 +36,14 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
public HadoopIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
// All deprecated
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig
)
{
super(dataSchema, ioConfig, tuningConfig);
if (dataSchema != null) {
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
} else { // Backwards compatibility
TimestampSpec theTimestampSpec = (timestampSpec == null)
? new TimestampSpec(timestampColumn, timestampFormat)
: timestampSpec;
List<String> dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
if (rollupSpec != null) {
for (AggregatorFactory aggregatorFactory : rollupSpec.getAggs()) {
dimensionExclusions.add(aggregatorFactory.getName());
}
}
PartitionsSpec thePartitionSpec;
if (partitionsSpec != null) {
Preconditions.checkArgument(
partitionDimension == null && targetPartitionSize == null,
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
);
thePartitionSpec = partitionsSpec;
} else {
// Backwards compatibility
thePartitionSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
GranularitySpec theGranularitySpec = null;
if (granularitySpec != null) {
Preconditions.checkArgument(
segmentGranularity == null && intervals == null,
"Cannot mix granularitySpec with segmentGranularity/intervals"
);
theGranularitySpec = granularitySpec;
if (rollupSpec != null) {
theGranularitySpec = theGranularitySpec.withQueryGranularity(rollupSpec.rollupGranularity);
}
} else {
// Backwards compatibility
if (segmentGranularity != null && intervals != null) {
theGranularitySpec = new UniformGranularitySpec(
segmentGranularity,
rollupSpec == null ? null : rollupSpec.rollupGranularity,
intervals,
segmentGranularity
);
}
}
this.dataSchema = new DataSchema(
dataSource,
new StringInputRowParser(
dataSpec == null ? null : dataSpec.toParseSpec(theTimestampSpec, dimensionExclusions),
null, null, null, null
),
rollupSpec == null
? new AggregatorFactory[]{}
: rollupSpec.getAggs().toArray(new AggregatorFactory[rollupSpec.getAggs().size()]),
theGranularitySpec
);
this.ioConfig = new HadoopIOConfig(
pathSpec,
updaterJobSpec,
segmentOutputPath
);
this.tuningConfig = new HadoopTuningConfig(
workingPath,
version,
thePartitionSpec,
shardSpecs,
null,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties,
combineText,
false,
false
);
}
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
}
@JsonProperty("dataSchema")
@ -198,31 +72,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return new HadoopIngestionSpec(
schema,
ioConfig,
tuningConfig,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
false,
null,
null,
null,
null,
null,
null
tuningConfig
);
}
@ -231,31 +81,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return new HadoopIngestionSpec(
dataSchema,
config,
tuningConfig,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
false,
null,
null,
null,
null,
null,
null
tuningConfig
);
}
@ -264,31 +90,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return new HadoopIngestionSpec(
dataSchema,
ioConfig,
config,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
false,
null,
null,
null,
null,
null,
null
config
);
}
}

View File

@ -26,10 +26,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
})
public interface PartitionsSpec

View File

@ -1,41 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
// for backward compatibility
@Deprecated
public class RandomPartitionsSpec extends HashedPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("numShards") @Nullable Integer numShards
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
}
}

View File

@ -112,8 +112,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
new UniformGranularitySpec(
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
Lists.newArrayList(bucketsToRun),
segmentGranularity
Lists.newArrayList(bucketsToRun)
)
);

View File

@ -52,14 +52,20 @@ public class HadoopDruidIndexerConfigTest
try {
schema = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
@ -92,14 +98,20 @@ public class HadoopDruidIndexerConfigTest
try {
schema = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"the:data:source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
@ -44,12 +44,15 @@ public class HadoopIngestionSpecTest
try {
schema = jsonReadWriteRead(
"{"
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
"{\n"
+ " \"dataSchema\": {\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-01-01/P1D\"]\n"
+ " }\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
@ -73,65 +76,6 @@ public class HadoopIngestionSpecTest
);
}
@Test
public void testGranularitySpecLegacy()
{
// Deprecated and replaced by granularitySpec, but still supported
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) schema.getDataSchema().getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals().get()
);
Assert.assertEquals(
"getSegmentGranularity",
"DAY",
granularitySpec.getSegmentGranularity().toString()
);
}
@Test
public void testInvalidGranularityCombination()
{
boolean thrown = false;
try {
final HadoopIngestionSpec schema = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"],"
+ "\"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testPartitionsSpecAutoDimension()
{
@ -139,10 +83,13 @@ public class HadoopIngestionSpecTest
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
"{\n"
+ " \"tuningConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"partitionsSpec\": {\n"
+ " \"targetPartitionSize\": 100\n"
+ " }\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
@ -167,101 +114,7 @@ public class HadoopIngestionSpecTest
Assert.assertTrue(
"partitionSpec",
partitionsSpec instanceof SingleDimensionPartitionsSpec
);
}
@Test
public void testPartitionsSpecSpecificDimensionLegacy()
{
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecLegacy()
{
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
partitionsSpec instanceof HashedPartitionsSpec
);
}
@ -272,12 +125,16 @@ public class HadoopIngestionSpecTest
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
"{\n"
+ " \"tuningConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"partitionsSpec\": {\n"
+ " \"type\": \"dimension\",\n"
+ " \"targetPartitionSize\": 100,\n"
+ " \"maxPartitionSize\" : 200,\n"
+ " \"partitionDimension\" : \"foo\"\n"
+ " }\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
@ -314,42 +171,23 @@ public class HadoopIngestionSpecTest
);
}
@Test
public void testInvalidPartitionsCombination()
{
boolean thrown = false;
try {
final HadoopIngestionSpec schema = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopIngestionSpec schema;
schema = jsonReadWriteRead(
"{"
+ "\"updaterJobSpec\":{\n"
+ " \"type\" : \"db\",\n"
+ " \"connectURI\" : \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\" : \"rofl\",\n"
+ " \"password\" : \"p4ssw0rd\",\n"
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
"{\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"metadataUpdateSpec\": {\n"
+ " \"type\": \"db\",\n"
+ " \"connectURI\": \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\": \"rofl\",\n"
+ " \"password\": \"p4ssw0rd\",\n"
+ " \"segmentTable\": \"segments\"\n"
+ " }\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
@ -405,7 +243,12 @@ public class HadoopIngestionSpecTest
try {
schema = jsonReadWriteRead(
"{\"cleanupOnFailure\":false}",
"{\n"
+ " \"tuningConfig\" : {\n"
+ " \"type\" : \"hadoop\", \n"
+ " \"cleanupOnFailure\" : \"false\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
@ -430,46 +273,4 @@ public class HadoopIngestionSpecTest
}
}
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
}

View File

@ -1,71 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.google.common.base.Throwables;
import io.druid.indexer.HadoopDruidIndexerConfigTest;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class RandomPartitionsSpecTest
{
@Test
public void testRandomPartitionsSpec() throws Exception
{
{
final PartitionsSpec partitionsSpec;
try {
partitionsSpec = HadoopDruidIndexerConfigTest.jsonReadWriteRead(
"{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ "}",
PartitionsSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof RandomPartitionsSpec);
}
}
}

View File

@ -82,12 +82,6 @@ public class YeOldePlumberSchool implements PlumberSchool
this.tmpSegmentDir = tmpSegmentDir;
}
@Override
public Granularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}
@Override
public Plumber findPlumber(
final DataSchema schema,

View File

@ -67,12 +67,9 @@ public class HadoopIndexTask extends AbstractTask
extensionsConfig = GuiceInjectors.makeStartupInjector().getInstance(ExtensionsConfig.class);
}
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
private static String getTheDataSource(HadoopIngestionSpec spec)
{
if (spec != null) {
return spec.getDataSchema().getDataSource();
}
return config.getDataSchema().getDataSource();
}
@JsonIgnore
@ -96,19 +93,18 @@ public class HadoopIndexTask extends AbstractTask
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix
)
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec, config), new DateTime()),
getTheDataSource(spec, config)
id != null ? id : String.format("index_hadoop_%s_%s", getTheDataSource(spec), new DateTime()),
getTheDataSource(spec)
);
this.spec = spec == null ? config : spec;
this.spec = spec;
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(
@ -116,7 +112,10 @@ public class HadoopIndexTask extends AbstractTask
"segmentOutputPath must be absent"
);
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
Preconditions.checkArgument(
this.spec.getIOConfig().getMetadataUpdateSpec() == null,
"updaterJobSpec must be absent"
);
if (hadoopDependencyCoordinates != null) {
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;

View File

@ -76,32 +76,23 @@ public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
private static String makeId(String id, IndexIngestionSpec ingestionSchema, String dataSource)
private static String makeId(String id, IndexIngestionSpec ingestionSchema)
{
if (id == null) {
return String.format("index_%s_%s", makeDataSource(ingestionSchema, dataSource), new DateTime().toString());
return String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime().toString());
}
return id;
}
private static String makeDataSource(IndexIngestionSpec ingestionSchema, String dataSource)
private static String makeDataSource(IndexIngestionSpec ingestionSchema)
{
if (ingestionSchema != null) {
return ingestionSchema.getDataSchema().getDataSource();
} else { // Backwards compatible
return dataSource;
}
return ingestionSchema.getDataSchema().getDataSource();
}
private static Interval makeInterval(IndexIngestionSpec ingestionSchema, GranularitySpec granularitySpec)
private static Interval makeInterval(IndexIngestionSpec ingestionSchema)
{
GranularitySpec spec;
if (ingestionSchema != null) {
spec = ingestionSchema.getDataSchema().getGranularitySpec();
} else {
spec = granularitySpec;
}
GranularitySpec spec = ingestionSchema.getDataSchema().getGranularitySpec();
return new Interval(
spec.bucketIntervals().get().first().getStart(),
@ -118,38 +109,18 @@ public class IndexTask extends AbstractFixedIntervalTask
public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("schema") IndexIngestionSpec ingestionSchema,
// Backwards Compatible
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("granularitySpec") final GranularitySpec granularitySpec,
@JsonProperty("aggregators") final AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") final QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") final int targetPartitionSize,
@JsonProperty("firehose") final FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary,
@JacksonInject ObjectMapper jsonMapper
)
{
super(
// _not_ the version, just something uniqueish
makeId(id, ingestionSchema, dataSource),
makeDataSource(ingestionSchema, dataSource),
makeInterval(ingestionSchema, granularitySpec)
makeId(id, ingestionSchema),
makeDataSource(ingestionSchema),
makeInterval(ingestionSchema)
);
if (ingestionSchema != null) {
this.ingestionSchema = ingestionSchema;
} else { // Backwards Compatible
this.ingestionSchema = new IndexIngestionSpec(
new DataSchema(
dataSource,
firehoseFactory.getParser(),
aggregators,
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
new IndexTuningConfig(targetPartitionSize, 0, null)
);
}
this.ingestionSchema = ingestionSchema;
this.jsonMapper = jsonMapper;
}

View File

@ -24,12 +24,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -46,15 +44,11 @@ import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentConfig;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -69,27 +63,19 @@ public class RealtimeIndexTask extends AbstractTask
{
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
private static String makeTaskId(FireDepartment fireDepartment, Schema schema)
private static String makeTaskId(FireDepartment fireDepartment)
{
// Backwards compatible
if (fireDepartment == null) {
return String.format(
"index_realtime_%s_%d_%s",
schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()
);
} else {
return String.format(
"index_realtime_%s_%d_%s",
fireDepartment.getDataSchema().getDataSource(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(),
new DateTime().toString()
);
}
return String.format(
"index_realtime_%s_%d_%s",
fireDepartment.getDataSchema().getDataSource(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(),
new DateTime().toString()
);
}
private static String makeDatasource(FireDepartment fireDepartment, Schema schema)
private static String makeDatasource(FireDepartment fireDepartment)
{
return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource();
return fireDepartment.getDataSchema().getDataSource();
}
@JsonIgnore
@ -105,51 +91,16 @@ public class RealtimeIndexTask extends AbstractTask
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("spec") FireDepartment fireDepartment,
// Backwards compatible, to be deprecated
@JsonProperty("schema") Schema spec,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
@JsonProperty("spec") FireDepartment fireDepartment
)
{
super(
id == null ? makeTaskId(fireDepartment, spec) : id,
String.format("index_realtime_%s", makeDatasource(fireDepartment, spec)),
taskResource == null ? new TaskResource(makeTaskId(fireDepartment, spec), 1) : taskResource,
makeDatasource(fireDepartment, spec)
id == null ? makeTaskId(fireDepartment) : id,
String.format("index_realtime_%s", makeDatasource(fireDepartment)),
taskResource == null ? new TaskResource(makeTaskId(fireDepartment), 1) : taskResource,
makeDatasource(fireDepartment)
);
if (fireDepartment != null) {
this.spec = fireDepartment;
} else {
this.spec = new FireDepartment(
new DataSchema(
spec.getDataSource(),
firehoseFactory == null ? null : firehoseFactory.getParser(),
spec.getAggregators(),
new UniformGranularitySpec(segmentGranularity, spec.getIndexGranularity(), null, segmentGranularity)
),
new RealtimeIOConfig(firehoseFactory, null),
new RealtimeTuningConfig(
fireDepartmentConfig == null ? null : fireDepartmentConfig.getMaxRowsInMemory(),
fireDepartmentConfig == null ? null : fireDepartmentConfig.getIntermediatePersistPeriod(),
windowPeriod,
null,
null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec(),
false,
false
),
null, null, null, null
);
}
this.spec = fireDepartment;
}
@Override
@ -290,11 +241,7 @@ public class RealtimeIndexTask extends AbstractTask
final FireDepartment fireDepartment = new FireDepartment(
dataSchema,
realtimeIOConfig,
tuningConfig,
null,
null,
null,
null
tuningConfig
);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
@ -310,14 +257,7 @@ public class RealtimeIndexTask extends AbstractTask
lockingSegmentAnnouncer,
segmentPublisher,
toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(),
null,
null,
null,
null,
null,
null,
0
toolbox.getQueryExecutorService()
);
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());

View File

@ -218,12 +218,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
}
@Override
public InputRowParser getParser()
{
return null;
}
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;

View File

@ -22,13 +22,23 @@ package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.realtime.Schema;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.timeline.partition.NoneShardSpec;
import java.io.File;
/**
*/
@JsonTypeName("test_realtime")
@ -47,15 +57,20 @@ public class TestRealtimeTask extends RealtimeIndexTask
super(
id,
taskResource,
null,
new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
null,
1,
null,
null,
null
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
}
), null
)
);
this.status = status;
}

View File

@ -25,19 +25,24 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -56,19 +61,20 @@ public class TaskSerdeTest
{
final IndexTask task = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, -1, -1)
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
new LocalFirehoseFactory(new File("lol"), "rofl", null),
-1,
jsonMapper
);
@ -196,18 +202,40 @@ public class TaskSerdeTest
@Test
public void testRealtimeIndexTaskSerde() throws Exception
{
final RealtimeIndexTask task = new RealtimeIndexTask(
null,
new TaskResource("rofl", 2),
null,
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
new FireDepartment(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
),
new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
}),
new RealtimeTuningConfig(
1,
new Period("PT10M"),
null,
null,
null,
null,
1,
new NoneShardSpec(),
false,
false
)
)
);
final String json = jsonMapper.writeValueAsString(task);
@ -292,7 +320,6 @@ public class TaskSerdeTest
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testRestoreTaskSerde() throws Exception
{
@ -346,39 +373,15 @@ public class TaskSerdeTest
public void testHadoopIndexTaskSerde() throws Exception
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
null,
new HadoopIngestionSpec(
null, null, null,
"foo",
new TimestampSpec("timestamp", "auto"),
new JSONDataSpec(ImmutableList.of("foo"), null),
new UniformGranularitySpec(
new DataSchema(
"foo", null, new AggregatorFactory[0], new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D")),
Granularity.DAY
),
ImmutableMap.<String, Object>of("paths", "bar"),
null,
null,
null,
null,
false,
true,
null,
false,
null,
null,
false,
ImmutableMap.of("foo", "bar"),
false,
null,
null,
null,
null,
null,
null
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar"), null, null), null
),
null,
null,

View File

@ -65,6 +65,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
@ -95,16 +96,6 @@ import java.util.Set;
public class TaskLifecycleTest
{
private File tmp = null;
private TaskStorage ts = null;
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
TaskStorageQueryAdapter tsqa = null;
private static final Ordering<DataSegment> byIntervalOrdering = new Ordering<DataSegment>()
{
@Override
@ -113,6 +104,141 @@ public class TaskLifecycleTest
return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval());
}
};
TaskStorageQueryAdapter tsqa = null;
private File tmp = null;
private TaskStorage ts = null;
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private static MockIndexerDBCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
}
private static ServiceEmitter newMockEmitter()
{
return new ServiceEmitter(null, null, null)
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
}
private static InputRow IR(String dt, String dim1, String dim2, float met)
{
return new MapBasedInputRow(
new DateTime(dt).getMillis(),
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of(
"dim1", dim1,
"dim2", dim2,
"met", met
)
);
}
private static FirehoseFactory newMockExceptionalFirehoseFactory()
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new Firehose()
{
@Override
public boolean hasMore()
{
return true;
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
};
}
private static FirehoseFactory newMockFirehoseFactory(final Iterable<InputRow> inputRows)
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final Iterator<InputRow> inputRowIterator = inputRows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return inputRowIterator.hasNext();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
};
}
@Before
public void setUp() throws Exception
@ -230,26 +356,20 @@ public class TaskLifecycleTest
{
final Task indexTask = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(
new IndexTask.IndexIngestionSpec(new DataSchema("foo", null,new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
newMockFirehoseFactory(
ImmutableList.of(
IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-01T01", "x", "z", 1),
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
),
-1,
ImmutableList.of(new Interval("2010-01-01/P2D"))
) ),
new IndexTask.IndexIOConfig(newMockFirehoseFactory(
ImmutableList.of(
IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-01T01", "x", "z", 1),
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
)),
new IndexTask.IndexTuningConfig(10000, -1, -1)),
TestUtils.MAPPER
);
@ -291,14 +411,20 @@ public class TaskLifecycleTest
{
final Task indexTask = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
newMockExceptionalFirehoseFactory(),
-1,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
new IndexTask.IndexIOConfig(newMockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, -1, -1)
),
TestUtils.MAPPER
);
@ -560,142 +686,4 @@ public class TaskLifecycleTest
return ImmutableSet.copyOf(nuked);
}
}
private static MockIndexerDBCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
}
private static ServiceEmitter newMockEmitter()
{
return new ServiceEmitter(null, null, null)
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
}
private static InputRow IR(String dt, String dim1, String dim2, float met)
{
return new MapBasedInputRow(
new DateTime(dt).getMillis(),
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of(
"dim1", dim1,
"dim2", dim2,
"met", met
)
);
}
private static FirehoseFactory newMockExceptionalFirehoseFactory()
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new Firehose()
{
@Override
public boolean hasMore()
{
return true;
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
private static FirehoseFactory newMockFirehoseFactory(final Iterable<InputRow> inputRows)
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final Iterator<InputRow> inputRowIterator = inputRows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return inputRowIterator.hasNext();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
}

View File

@ -28,12 +28,21 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.realtime.Schema;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.Period;
import org.junit.Test;
import java.io.File;
public class TaskAnnouncementTest
{
@Test
@ -42,15 +51,19 @@ public class TaskAnnouncementTest
final Task task = new RealtimeIndexTask(
"theid",
new TaskResource("rofl", 2),
null,
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
new FireDepartment(
new DataSchema("foo", null, new AggregatorFactory[0], null),
new RealtimeIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
}), null
)
);
final TaskStatus status = TaskStatus.running(task.getId());
final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);

View File

@ -54,20 +54,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
@JsonProperty
private final String feed;
@JsonProperty
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaEightFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
// backwards compatible
@JsonProperty("parser") ByteBufferInputRowParser parser
@JsonProperty("feed") String feed
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = (parser == null) ? null : parser;
}
@Override
@ -154,9 +149,4 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
};
}
@Override
public ByteBufferInputRowParser getParser()
{
return parser;
}
}

View File

@ -50,19 +50,15 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
private final Properties consumerProps;
private final String feed;
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaSevenFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
// backwards compatible
@JsonProperty("parser") ByteBufferInputRowParser parser
@JsonProperty("feed") String feed
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = (parser == null) ? null : parser;
}
@JsonProperty
@ -77,12 +73,6 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return feed;
}
@JsonProperty
public ByteBufferInputRowParser getParser()
{
return parser;
}
@Override
public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException
{

View File

@ -39,9 +39,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.7</metamx.java-util.version>
<metamx.java-util.version>0.26.9</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.10</druid.api.version>
<druid.api.version>0.2.16-SNAPSHOT</druid.api.version>
</properties>
<modules>

View File

@ -57,26 +57,12 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("descriptor") String descriptorFileInClasspath,
// Backwards compatible
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
@JsonProperty("descriptor") String descriptorFileInClasspath
)
{
// Backwards Compatible
if (parseSpec == null) {
this.parseSpec = new JSONParseSpec(
timestampSpec,
new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions)
);
} else {
this.parseSpec = parseSpec;
}
this.parseSpec = parseSpec;
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.mapParser = new MapInputRowParser(this.parseSpec, null, null, null, null);
this.mapParser = new MapInputRowParser(this.parseSpec);
}
@Override
@ -88,7 +74,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
@Override
public ProtoBufInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath, null, null, null, null);
return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath);
}
@Override

View File

@ -39,12 +39,11 @@ public class ConstantPostAggregator implements PostAggregator
@JsonCreator
public ConstantPostAggregator(
@JsonProperty("name") String name,
@JsonProperty("value") Number constantValue,
@JsonProperty("constantValue") Number backwardsCompatibleValue
@JsonProperty("value") Number constantValue
)
{
this.name = name;
this.constantValue = constantValue == null ? backwardsCompatibleValue : constantValue;
this.constantValue = constantValue;
Preconditions.checkNotNull(this.constantValue);
}

View File

@ -84,9 +84,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("context") Map<String, Object> context,
// Backwards compatible
@JsonProperty("orderBy") LimitSpec orderBySpec
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -96,7 +94,7 @@ public class GroupByQuery extends BaseQuery<Row>
this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = (limitSpec == null) ? (orderBySpec == null ? new NoopLimitSpec() : orderBySpec) : limitSpec;
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
@ -523,8 +521,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs,
havingSpec,
theLimitSpec,
context,
null
context
);
}
}

View File

@ -61,8 +61,7 @@ public class ProtoBufInputRowParserTest
new TimestampSpec("timestamp", "iso"),
new DimensionsSpec(Arrays.asList(DIMENSIONS), Arrays.<String>asList(), null)
),
"prototest.desc",
null, null, null, null
"prototest.desc"
);

View File

@ -119,7 +119,7 @@ public class QueriesTest
"+",
Arrays.asList(
new FieldAccessPostAggregator("idx", "idx"),
new ConstantPostAggregator("const", 1, null)
new ConstantPostAggregator("const", 1)
)
),
new ArithmeticPostAggregator(
@ -127,7 +127,7 @@ public class QueriesTest
"-",
Arrays.asList(
new FieldAccessPostAggregator("rev", "rev"),
new ConstantPostAggregator("const", 1, null)
new ConstantPostAggregator("const", 1)
)
)
)
@ -173,7 +173,7 @@ public class QueriesTest
"+",
Arrays.asList(
new FieldAccessPostAggregator("idx", "idx"),
new ConstantPostAggregator("const", 1, null)
new ConstantPostAggregator("const", 1)
)
),
new ArithmeticPostAggregator(
@ -181,7 +181,7 @@ public class QueriesTest
"-",
Arrays.asList(
new FieldAccessPostAggregator("rev", "rev2"),
new ConstantPostAggregator("const", 1, null)
new ConstantPostAggregator("const", 1)
)
)
)

View File

@ -116,7 +116,7 @@ public class QueryRunnerTestHelper
Arrays.asList("quality"),
false
);
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
public static final ArithmeticPostAggregator addRowsIndexConstant =
@ -138,7 +138,7 @@ public class QueryRunnerTestHelper
public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator(
hyperUniqueFinalizingPostAggMetric,
"+",
Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1, 1))
Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1))
);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(

View File

@ -43,12 +43,12 @@ public class AggregatorUtilTest
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("2", 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("4", 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");
@ -78,12 +78,12 @@ public class AggregatorUtilTest
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("2", 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
new ConstantPostAggregator("1", 1L), new ConstantPostAggregator("4", 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");

View File

@ -48,7 +48,7 @@ public class ArithmeticPostAggregatorTest
List<PostAggregator> postAggregatorList =
Lists.newArrayList(
new ConstantPostAggregator(
"roku", 6, null
"roku", 6
),
new FieldAccessPostAggregator(
"rows", "rows"
@ -79,7 +79,7 @@ public class ArithmeticPostAggregatorTest
List<PostAggregator> postAggregatorList =
Lists.newArrayList(
new ConstantPostAggregator(
"roku", 6, null
"roku", 6
),
new FieldAccessPostAggregator(
"rows", "rows"

View File

@ -34,11 +34,11 @@ public class ConstantPostAggregatorTest
{
ConstantPostAggregator constantPostAggregator;
constantPostAggregator = new ConstantPostAggregator("shichi", 7, null);
constantPostAggregator = new ConstantPostAggregator("shichi", 7);
Assert.assertEquals(7, constantPostAggregator.compute(null));
constantPostAggregator = new ConstantPostAggregator("rei", 0.0, null);
constantPostAggregator = new ConstantPostAggregator("rei", 0.0);
Assert.assertEquals(0.0, constantPostAggregator.compute(null));
constantPostAggregator = new ConstantPostAggregator("ichi", 1.0, null);
constantPostAggregator = new ConstantPostAggregator("ichi", 1.0);
Assert.assertNotSame(1, constantPostAggregator.compute(null));
}
@ -46,29 +46,18 @@ public class ConstantPostAggregatorTest
public void testComparator()
{
ConstantPostAggregator constantPostAggregator =
new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1, null);
new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1);
Comparator comp = constantPostAggregator.getComparator();
Assert.assertEquals(0, comp.compare(0, constantPostAggregator.compute(null)));
Assert.assertEquals(0, comp.compare(0, 1));
Assert.assertEquals(0, comp.compare(1, 0));
}
@Test
public void testSerdeBackwardsCompatible() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
ConstantPostAggregator aggregator = mapper.readValue(
"{\"type\":\"constant\",\"name\":\"thistestbasicallydoesnothing unhappyface\",\"constantValue\":1}\n",
ConstantPostAggregator.class
);
Assert.assertEquals(new Integer(1), aggregator.getConstantValue());
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2, null);
ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2);
ConstantPostAggregator aggregator1 = mapper.readValue(
mapper.writeValueAsString(aggregator),
ConstantPostAggregator.class

View File

@ -1130,7 +1130,7 @@ public class GroupByQueryRunnerTest
new ArithmeticPostAggregator(
"idx_subpostagg", "+", Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
new ConstantPostAggregator("thousand", 1000)
)
)
@ -1155,7 +1155,7 @@ public class GroupByQueryRunnerTest
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
new ConstantPostAggregator("ten_thousand", 10000)
)
)
@ -1213,7 +1213,7 @@ public class GroupByQueryRunnerTest
"+",
Arrays.asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
new ConstantPostAggregator("thousand", 1000)
)
)
@ -1255,7 +1255,7 @@ public class GroupByQueryRunnerTest
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
new ConstantPostAggregator("ten_thousand", 10000)
)
)
@ -1318,7 +1318,7 @@ public class GroupByQueryRunnerTest
"+",
Arrays.asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
new ConstantPostAggregator("thousand", 1000)
)
)
@ -1361,7 +1361,7 @@ public class GroupByQueryRunnerTest
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
new ConstantPostAggregator("ten_thousand", 10000)
)
)

View File

@ -61,7 +61,7 @@ public class TopNBinaryFnBenchmark extends SimpleBenchmark
protected void setUp() throws Exception
{
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");

View File

@ -47,7 +47,7 @@ public class TopNBinaryFnTest
{
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator(

View File

@ -54,7 +54,6 @@ import io.druid.query.topn.TopNResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
@ -88,7 +87,7 @@ public class AppendTest
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant =

View File

@ -79,7 +79,7 @@ public class SchemalessTestFull
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant =

View File

@ -106,7 +106,7 @@ public class SchemalessTestSimple
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant =

View File

@ -197,8 +197,7 @@ public class TestIndex
"\t",
"\u0001",
Arrays.asList(COLUMNS)
),
null, null, null, null
)
);
boolean runOnce = false;
int lineCount = 0;

View File

@ -108,22 +108,17 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
@JsonProperty
private final RabbitMQFirehoseConfig config;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final ConnectionFactory connectionFactory;
@JsonCreator
public RabbitMQFirehoseFactory(
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
@JsonProperty("config") RabbitMQFirehoseConfig config,
@JsonProperty("parser") StringInputRowParser parser
@JsonProperty("config") RabbitMQFirehoseConfig config
)
{
this.connectionFactory = connectionFactory;
this.config = config;
this.parser = parser;
}
@Override
@ -270,12 +265,6 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
};
}
@Override
public ByteBufferInputRowParser getParser()
{
return parser;
}
private static class QueueingConsumer extends DefaultConsumer
{
private final BlockingQueue<Delivery> _queue;

View File

@ -212,12 +212,12 @@ unaryExpression returns [PostAggregator p]
if($e.p instanceof ConstantPostAggregator) {
ConstantPostAggregator c = (ConstantPostAggregator)$e.p;
double v = c.getConstantValue().doubleValue() * -1;
$p = new ConstantPostAggregator(Double.toString(v), v, null);
$p = new ConstantPostAggregator(Double.toString(v), v);
} else {
$p = new ArithmeticPostAggregator(
"-"+$e.p.getName(),
"*",
Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0, null))
Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0))
);
}
}
@ -240,7 +240,7 @@ aggregate returns [AggregatorFactory agg]
;
constant returns [ConstantPostAggregator c]
: value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v, null); }
: value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v); }
;
/* time filters must be top level filters */

View File

@ -74,7 +74,7 @@ public class DataSchema
this.aggregators = aggregators;
this.granularitySpec = granularitySpec == null
? new UniformGranularitySpec(null, null, null, null)
? new UniformGranularitySpec(null, null, null)
: granularitySpec;
}

View File

@ -47,16 +47,12 @@ public class UniformGranularitySpec implements GranularitySpec
public UniformGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("intervals") List<Interval> inputIntervals,
// Backwards compatible
@JsonProperty("gran") Granularity granularity
@JsonProperty("intervals") List<Interval> inputIntervals
)
{
if (segmentGranularity != null) {
this.segmentGranularity = segmentGranularity;
} else if (granularity != null) { // backwards compatibility
this.segmentGranularity = granularity;
} else {
this.segmentGranularity = defaultSegmentGranularity;
}
@ -111,8 +107,7 @@ public class UniformGranularitySpec implements GranularitySpec
return new UniformGranularitySpec(
segmentGranularity,
queryGranularity,
inputIntervals,
segmentGranularity
inputIntervals
);
}

View File

@ -56,58 +56,17 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
public FireDepartment(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") RealtimeIOConfig ioConfig,
@JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig,
// Backwards Compatability
@JsonProperty("schema") Schema schema,
@JsonProperty("config") FireDepartmentConfig config,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
@JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig
)
{
super(dataSchema, ioConfig, tuningConfig);
Preconditions.checkNotNull(dataSchema, "dataSchema");
Preconditions.checkNotNull(ioConfig, "ioConfig");
// Backwards compatibility
if (dataSchema == null) {
Preconditions.checkNotNull(schema, "schema");
Preconditions.checkNotNull(config, "config");
Preconditions.checkNotNull(firehoseFactory, "firehoseFactory");
Preconditions.checkNotNull(plumberSchool, "plumberSchool");
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? RealtimeTuningConfig.makeDefaultTuningConfig() : tuningConfig;
this.dataSchema = new DataSchema(
schema.getDataSource(),
firehoseFactory.getParser(),
schema.getAggregators(),
new UniformGranularitySpec(
plumberSchool.getSegmentGranularity(),
schema.getIndexGranularity(),
Lists.<Interval>newArrayList(),
plumberSchool.getSegmentGranularity()
)
);
this.ioConfig = new RealtimeIOConfig(
firehoseFactory,
plumberSchool
);
this.tuningConfig = new RealtimeTuningConfig(
config.getMaxRowsInMemory(),
config.getIntermediatePersistPeriod(),
((RealtimePlumberSchool) plumberSchool).getWindowPeriod(),
((RealtimePlumberSchool) plumberSchool).getBasePersistDirectory(),
((RealtimePlumberSchool) plumberSchool).getVersioningPolicy(),
((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(),
((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(),
schema.getShardSpec(),
false,
false
);
} else {
Preconditions.checkNotNull(dataSchema, "dataSchema");
Preconditions.checkNotNull(ioConfig, "ioConfig");
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? RealtimeTuningConfig.makeDefaultTuningConfig() : tuningConfig;
}
}
/**

View File

@ -1,108 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import java.util.Arrays;
import java.util.List;
/**
*/
@Deprecated
public class Schema
{
private final String dataSource;
private final List<SpatialDimensionSchema> spatialDimensions;
private final AggregatorFactory[] aggregators;
private final QueryGranularity indexGranularity;
private final ShardSpec shardSpec;
@JsonCreator
public Schema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
this.dataSource = dataSource;
this.spatialDimensions = (spatialDimensions == null) ? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(aggregators, "aggregators");
Preconditions.checkNotNull(indexGranularity, "indexGranularity");
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty("spatialDimensions")
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@JsonProperty
public AggregatorFactory[] getAggregators()
{
return aggregators;
}
@JsonProperty
public QueryGranularity getIndexGranularity()
{
return indexGranularity;
}
@JsonProperty
public ShardSpec getShardSpec()
{
return shardSpec;
}
@Override
public String toString()
{
return "Schema{" +
"dataSource='" + dataSource + '\'' +
", spatialDimensions=" + spatialDimensions +
", aggregators=" + (aggregators == null ? null : Arrays.asList(aggregators)) +
", indexGranularity=" + indexGranularity +
", shardSpec=" + shardSpec +
'}';
}
}

View File

@ -76,9 +76,4 @@ public class ClippedFirehoseFactory implements FirehoseFactory
);
}
@Override
public InputRowParser getParser()
{
return delegate.getParser();
}
}

View File

@ -57,12 +57,6 @@ public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
return new CombiningFirehose(parser);
}
@Override
public InputRowParser getParser()
{
return delegateFactoryList.get(0).getParser();
}
@JsonProperty("delegates")
public List<FirehoseFactory> getDelegateFactoryList()
{

View File

@ -59,7 +59,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private final String serviceName;
private final int bufferSize;
private final MapInputRowParser parser;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonCreator
@ -75,8 +74,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
// this is really for backwards compatibility
this.parser = new MapInputRowParser(parser.getParseSpec(), null, null, null, null);
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
}
@ -112,12 +109,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
return bufferSize;
}
@JsonProperty
public MapInputRowParser getParser()
{
return parser;
}
public class EventReceiverFirehose implements ChatHandler, Firehose
{
private final BlockingQueue<InputRow> buffer;

View File

@ -102,7 +102,6 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcParser>
private final String host;
private final List<String> channels;
private final IrcDecoder decoder;
private final IrcParser parser;
@JsonCreator
public IrcFirehoseFactory(
@ -116,7 +115,6 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcParser>
this.host = host;
this.channels = channels;
this.decoder = decoder;
this.parser = new IrcParser(decoder);
}
@JsonProperty
@ -268,11 +266,5 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcParser>
}
};
}
@Override
public IrcParser getParser()
{
return parser;
}
}

View File

@ -59,12 +59,6 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return new TimedShutoffFirehose(parser);
}
@Override
public InputRowParser getParser()
{
return delegateFactory.getParser();
}
public class TimedShutoffFirehose implements Firehose
{
private final Firehose firehose;

View File

@ -58,15 +58,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject ServiceEmitter emitter,
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
@JacksonInject @Processing ExecutorService queryExecutorService
)
{
super(
@ -76,14 +68,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
segmentAnnouncer,
null,
null,
queryExecutorService,
windowPeriod,
basePersistDirectory,
segmentGranularity,
versioningPolicy,
rejectionPolicy,
rejectionPolicyFactory,
maxPendingPersists
queryExecutorService
);
this.flushDuration = flushDuration == null ? defaultFlushDuration : flushDuration;

View File

@ -42,6 +42,4 @@ public interface PlumberSchool
*/
public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics);
@Deprecated
public Granularity getSegmentGranularity();
}

View File

@ -51,14 +51,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private final FilteredServerView serverView;
private final ExecutorService queryExecutorService;
// Backwards compatible
private final Period windowPeriod;
private final File basePersistDirectory;
private final Granularity segmentGranularity;
private final VersioningPolicy versioningPolicy;
private final RejectionPolicyFactory rejectionPolicyFactory;
private final int maxPendingPersists;
@JsonCreator
public RealtimePlumberSchool(
@JacksonInject ServiceEmitter emitter,
@ -67,15 +59,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
@JacksonInject @Processing ExecutorService executorService
)
{
this.emitter = emitter;
@ -85,49 +69,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.queryExecutorService = executorService;
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = versioningPolicy;
this.rejectionPolicyFactory = (rejectionPolicy == null) ? rejectionPolicyFactory : rejectionPolicy;
this.maxPendingPersists = maxPendingPersists;
}
@Deprecated
public Period getWindowPeriod()
{
return windowPeriod;
}
@Deprecated
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@Override
@JsonProperty
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@Deprecated
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
@Deprecated
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
@Deprecated
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@Override

View File

@ -173,7 +173,7 @@ public class CachingClusteredClientTest
"*",
Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
new ConstantPostAggregator("constant", 2, 2)
new ConstantPostAggregator("constant", 2)
)
),
new ArithmeticPostAggregator(
@ -181,7 +181,7 @@ public class CachingClusteredClientTest
"/",
Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
new ConstantPostAggregator("constant", 2, 2)
new ConstantPostAggregator("constant", 2)
)
)
);

View File

@ -150,10 +150,5 @@ public class CombiningFirehoseFactoryTest
};
}
@Override
public InputRowParser getParser()
{
return null;
}
}
}

View File

@ -45,8 +45,7 @@ public class UniformGranularityTest
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
),
null
)
);
Assert.assertEquals(
@ -104,8 +103,7 @@ public class UniformGranularityTest
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
),
null
)
);
try {

View File

@ -62,24 +62,22 @@ public class FireDepartmentTest
null,
null
)
),
null, null, null, null
)
),
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
),
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null, null, null, null, null, null, null, 0
null, null, null, null, null, null, null
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, false, false
),
null, null, null, null
)
);
String json = jsonMapper.writeValueAsString(schema);

View File

@ -74,7 +74,7 @@ public class RealtimeManagerTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
@ -84,12 +84,6 @@ public class RealtimeManagerTest
{
return new TestFirehose(rows.iterator());
}
@Override
public ByteBufferInputRowParser getParser()
{
throw new UnsupportedOperationException();
}
},
new PlumberSchool()
{
@ -100,12 +94,6 @@ public class RealtimeManagerTest
{
return plumber;
}
@Override
public Granularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}
}
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
@ -127,8 +115,7 @@ public class RealtimeManagerTest
new FireDepartment(
schema,
ioConfig,
tuningConfig,
null, null, null, null
tuningConfig
)
),
null

View File

@ -133,7 +133,7 @@ public class RealtimePlumberSchoolTest
}
},
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);
@ -174,14 +174,7 @@ public class RealtimePlumberSchoolTest
announcer,
segmentPublisher,
serverView,
MoreExecutors.sameThreadExecutor(),
new Period("PT10m"),
tmpDir,
Granularity.HOUR,
new IntervalStartVersioningPolicy(),
rejectionPolicy,
null,
0
MoreExecutors.sameThreadExecutor()
);
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, new FireDepartmentMetrics());

View File

@ -50,7 +50,7 @@ public class SinkTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
);
final Interval interval = new Interval("2013-01-01/2013-01-02");

View File

@ -28,7 +28,6 @@ import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.segment.realtime.Schema;
import java.io.File;
@ -63,8 +62,6 @@ public class DruidJsonValidator implements Runnable
jsonMapper.readValue(file, HadoopDruidIndexerConfig.class);
} else if (type.equalsIgnoreCase("task")) {
jsonMapper.readValue(file, Task.class);
} else if (type.equalsIgnoreCase("realtimeSchema")) {
jsonMapper.readValue(file, Schema.class);
} else {
throw new UOE("Unknown type[%s]", type);
}