not compiling yet but close

This commit is contained in:
fjy 2014-02-19 15:54:27 -08:00
parent 4b7c76762d
commit 20cac8c506
40 changed files with 726 additions and 1164 deletions

View File

@ -85,7 +85,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"type": "random"
"targetPartitionSize": 5000000
},
"updaterJobSpec": {
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:7980\/test_db",
"user": "username",
@ -110,7 +110,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
|segmentOutputPath|the path to dump segments into.|yes|
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no|
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.|no|
|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
|metadataUpdateSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
### Path specification
@ -242,7 +242,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|config|A Hadoop Index Config (see above).|yes|
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `metadataUpdateSpec`. The Indexing Service takes care of setting these fields internally.
To run the task:

View File

@ -77,7 +77,7 @@ The Hadoop Index Task is used to index larger data sets that require the paralle
|hadoopCoordinates|The Maven \<groupId\>:\<artifactId\>:\<version\> of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `metadataUpdateSpec`. The Indexing Service takes care of setting these fields internally.
##### Using your own Hadoop distribution

View File

@ -41,7 +41,7 @@
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"updaterJobSpec": {
"metadataUpdateSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",

View File

@ -35,7 +35,7 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;

View File

@ -19,15 +19,10 @@
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.segment.indexing.DriverConfig;
import org.joda.time.DateTime;
import java.util.List;
@ -35,109 +30,44 @@ import java.util.Map;
/**
*/
public class HadoopDruidIndexerSchema
public class HadoopDriverConfig implements DriverConfig
{
private final String dataSource;
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private final GranularitySpec granularitySpec;
private final Map<String, Object> pathSpec; // This cannot just be a PathSpec object
private final String workingPath;
private final String segmentOutputPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
private final boolean overwriteFiles;
private final DataRollupSpec rollupSpec;
private final DbUpdaterJobSpec updaterJobSpec;
private final boolean ignoreInvalidRows;
@JsonCreator
public HadoopDruidIndexerSchema(
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
public HadoopDriverConfig(
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat
)
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.dataSource = dataSource;
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
this.dataSpec = dataSpec;
this.granularitySpec = granularitySpec;
this.pathSpec = pathSpec;
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec;
this.shardSpecs = shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
this.overwriteFiles = overwriteFiles;
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
}
@JsonProperty
public Map<String, Object> getPathSpec()
{
return pathSpec;
}
@JsonProperty
public String getWorkingPath()
{
return workingPath;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
@JsonProperty
public String getVersion()
{
@ -150,6 +80,12 @@ public class HadoopDruidIndexerSchema
return partitionsSpec;
}
@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
@ -157,38 +93,48 @@ public class HadoopDruidIndexerSchema
}
@JsonProperty
public boolean isCleanupOnFailure()
public Boolean getCleanupOnFailure()
{
return cleanupOnFailure;
}
@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
}
@JsonProperty
public boolean isOverwriteFiles()
{
return overwriteFiles;
}
@JsonProperty
public DataRollupSpec getRollupSpec()
{
return rollupSpec;
}
@JsonProperty
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
@JsonProperty
public boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
public HadoopDriverConfig withVersion(String ver)
{
return new HadoopDriverConfig(
workingPath,
ver,
partitionsSpec,
shardSpecs,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows
);
}
public HadoopDriverConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> specs)
{
return new HadoopDriverConfig(
workingPath,
version,
partitionsSpec,
specs,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows
);
}
}

View File

@ -29,8 +29,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -44,18 +42,14 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.data.input.impl.ToLowercaseDataSpec;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.initialization.Initialization;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
@ -70,7 +64,6 @@ import org.joda.time.format.ISODateTimeFormat;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -113,30 +106,18 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
private volatile String dataSource;
private volatile TimestampSpec timestampSpec;
private volatile DataSpec dataSpec;
private volatile GranularitySpec granularitySpec;
private volatile HadoopIngestionSchema schema;
private volatile PathSpec pathSpec;
private volatile String workingPath;
private volatile String segmentOutputPath;
private volatile String version;
private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate;
private volatile boolean cleanupOnFailure;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private volatile boolean overwriteFiles;
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("schema") HadoopIngestionSchema schema,
// Backwards compatibility
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") PathSpec pathSpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
@ -157,232 +138,58 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
)
{
this.dataSource = dataSource;
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
this.dataSpec = dataSpec;
this.pathSpec = pathSpec;
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version == null ? new DateTime().toString() : version;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
this.overwriteFiles = overwriteFiles;
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
if (partitionsSpec != null) {
Preconditions.checkArgument(
partitionDimension == null && targetPartitionSize == null,
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
);
this.partitionsSpec = partitionsSpec;
if (schema != null) {
this.schema = schema;
} else {
// Backwards compatibility
this.partitionsSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if (granularitySpec != null) {
Preconditions.checkArgument(
segmentGranularity == null && intervals == null,
"Cannot mix granularitySpec with segmentGranularity/intervals"
this.schema = HadoopIngestionSchema.convertLegacy(
dataSource,
timestampSpec,
dataSpec,
granularitySpec,
pathSpec,
workingPath,
segmentOutputPath,
version,
partitionsSpec,
leaveIntermediate,
cleanupOnFailure,
shardSpecs,
overwriteFiles,
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
timestampColumn,
timestampFormat,
intervals,
segmentGranularity,
partitionDimension,
targetPartitionSize
);
this.granularitySpec = granularitySpec;
} else {
// Backwards compatibility
if (segmentGranularity != null && intervals != null) {
this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals);
}
}
}
/**
* Default constructor does nothing. The caller is expected to use the various setX methods.
*/
public HadoopDruidIndexerConfig()
{
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
public void setDataSource(String dataSource)
{
this.dataSource = dataSource.toLowerCase();
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
public void setTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
}
@JsonProperty
public DataSpec getDataSpec()
{
return dataSpec;
}
public void setDataSpec(DataSpec dataSpec)
{
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
}
@JsonProperty
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setGranularitySpec(GranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
}
@JsonProperty
public PathSpec getPathSpec()
{
return pathSpec;
}
public void setPathSpec(PathSpec pathSpec)
{
this.pathSpec = pathSpec;
}
@JsonProperty
public String getWorkingPath()
{
return workingPath;
}
public void setWorkingPath(String workingPath)
{
this.workingPath = workingPath;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
public void setSegmentOutputPath(String segmentOutputPath)
{
this.segmentOutputPath = segmentOutputPath;
}
@JsonProperty
public String getVersion()
{
return version;
this.schema = schema.withDataSchema(schema.getDataSchema().withGranularitySpec(granularitySpec));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setVersion(String version)
{
this.version = version;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
return leaveIntermediate;
}
public void setLeaveIntermediate(boolean leaveIntermediate)
{
this.leaveIntermediate = leaveIntermediate;
}
@JsonProperty
public boolean isCleanupOnFailure()
{
return cleanupOnFailure;
}
public void setCleanupOnFailure(boolean cleanupOnFailure)
{
this.cleanupOnFailure = cleanupOnFailure;
}
@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
this.schema = schema.withDriverConfig(schema.getDriverConfig().withVersion(version));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public void setShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
{
this.shardSpecs = Collections.unmodifiableMap(shardSpecs);
}
@JsonProperty
public boolean isOverwriteFiles()
{
return overwriteFiles;
}
public void setOverwriteFiles(boolean overwriteFiles)
{
this.overwriteFiles = overwriteFiles;
}
@JsonProperty
public DataRollupSpec getRollupSpec()
{
return rollupSpec;
}
public void setRollupSpec(DataRollupSpec rollupSpec)
{
this.rollupSpec = rollupSpec;
}
@JsonProperty
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
public void setUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec)
{
this.updaterJobSpec = updaterJobSpec;
}
@JsonProperty
public boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
public void setIgnoreInvalidRows(boolean ignoreInvalidRows)
{
this.ignoreInvalidRows = ignoreInvalidRows;
this.schema = schema.withDriverConfig(schema.getDriverConfig().withShardSpecs(shardSpecs));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public Optional<List<Interval>> getIntervals()
{
Optional<SortedSet<Interval>> setOptional = getGranularitySpec().bucketIntervals();
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
} else {
@ -392,58 +199,37 @@ public class HadoopDruidIndexerConfig
public boolean isDeterminingPartitions()
{
return partitionsSpec.isDeterminingPartitions();
return schema.getDriverConfig().getPartitionsSpec().isDeterminingPartitions();
}
public Long getTargetPartitionSize()
{
return partitionsSpec.getTargetPartitionSize();
return schema.getDriverConfig().getPartitionsSpec().getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return partitionsSpec.getMaxPartitionSize();
return schema.getDriverConfig().getPartitionsSpec().getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
{
return (updaterJobSpec != null);
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
}
public StringInputRowParser getParser()
{
final List<String> dimensionExclusions;
if (getDataSpec().hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(timestampSpec.getTimestampColumn());
dimensionExclusions.addAll(
Lists.transform(
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
return new StringInputRowParser(null, getTimestampSpec(), getDataSpec(), dimensionExclusions, null);
return (StringInputRowParser) schema.getDataSchema().getParser();
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return shardSpecs.get(bucket.time).get(bucket.partitionNum);
return schema.getDriverConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}
public Job addInputPaths(Job job) throws IOException
{
return getPathSpec().addInputPaths(this, job);
return pathSpec.addInputPaths(this, job);
}
/********************************************
@ -459,12 +245,16 @@ public class HadoopDruidIndexerConfig
*/
public Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()));
final Optional<Interval> timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval(
new DateTime(
inputRow.getTimestampFromEpoch()
)
);
if (!timeBucket.isPresent()) {
return Optional.absent();
}
final List<HadoopyShardSpec> shards = shardSpecs.get(timeBucket.get().getStart());
final List<HadoopyShardSpec> shards = schema.getDriverConfig().getShardSpecs().get(timeBucket.get().getStart());
if (shards == null || shards.isEmpty()) {
return Optional.absent();
}
@ -487,7 +277,12 @@ public class HadoopDruidIndexerConfig
public Optional<Set<Interval>> getSegmentGranularIntervals()
{
return Optional.fromNullable((Set<Interval>) granularitySpec.bucketIntervals().orNull());
return Optional.fromNullable(
(Set<Interval>) schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
);
}
public Optional<Iterable<Bucket>> getAllBuckets()
@ -504,7 +299,7 @@ public class HadoopDruidIndexerConfig
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = shardSpecs.get(bucketTime);
final List<HadoopyShardSpec> specs = schema.getDriverConfig().getShardSpecs().get(bucketTime);
if (specs == null) {
return ImmutableList.of();
}
@ -532,19 +327,26 @@ public class HadoopDruidIndexerConfig
}
}
/******************************************
Path helper logic
******************************************/
/******************************************
Path helper logic
******************************************/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
public Path makeIntermediatePath()
{
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));
return new Path(
String.format(
"%s/%s/%s",
schema.getDriverConfig().getWorkingPath(),
schema.getDataSchema().getDataSource(),
schema.getDriverConfig().getVersion().replace(":", "")
)
);
}
public Path makeSegmentPartitionInfoPath(Interval bucketInterval)
@ -586,16 +388,16 @@ public class HadoopDruidIndexerConfig
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
final Interval bucketInterval = schema.getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem) {
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputPath(),
getDataSource(),
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
getVersion().replace(":", "_"),
schema.getDriverConfig().getVersion().replace(":", "_"),
bucket.partitionNum
)
);
@ -603,11 +405,11 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputPath(),
getDataSource(),
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion(),
schema.getDriverConfig().getVersion(),
bucket.partitionNum
)
);
@ -634,14 +436,13 @@ public class HadoopDruidIndexerConfig
throw Throwables.propagate(e);
}
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(dataSpec, "dataSpec");
Preconditions.checkNotNull(timestampSpec, "timestampSpec");
Preconditions.checkNotNull(granularitySpec, "granularitySpec");
Preconditions.checkNotNull(schema.getDataSchema().getDataSource(), "dataSource");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), "parseSpec");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec");
Preconditions.checkNotNull(schema.getDataSchema().getGranularitySpec(), "granularitySpec");
Preconditions.checkNotNull(pathSpec, "pathSpec");
Preconditions.checkNotNull(workingPath, "workingPath");
Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
Preconditions.checkNotNull(version, "version");
Preconditions.checkNotNull(rollupSpec, "rollupSpec");
Preconditions.checkNotNull(schema.getDriverConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getDriverConfig().getVersion(), "version");
}
}

View File

@ -24,7 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
@ -41,13 +41,13 @@ import java.util.Map;
*/
public class HadoopDruidIndexerConfigBuilder
{
public static HadoopDruidIndexerConfig fromSchema(HadoopDruidIndexerSchema schema)
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSchema schema)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
}
public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class);
public static HadoopIngestionSchema toSchema(HadoopDruidIndexerConfig config){
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopIngestionSchema.class);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
@ -230,7 +230,7 @@ public class HadoopDruidIndexerConfigBuilder
return this;
}
public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema)
public HadoopDruidIndexerConfigBuilder withSchema(HadoopIngestionSchema schema)
{
this.dataSource = schema.getDataSource();
this.timestampSpec = schema.getTimestampSpec();

View File

@ -22,7 +22,7 @@ package io.druid.indexer;
import com.metamx.common.RE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

View File

@ -0,0 +1,64 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.segment.indexing.IOConfig;
import java.util.Map;
/**
*/
public class HadoopIOConfig implements IOConfig
{
private final Map<String, Object> pathSpec;
private final DbUpdaterJobSpec metadataUpdateSpec;
private final String segmentOutputPath;
public HadoopIOConfig(
final @JsonProperty("inputSpec") Map<String, Object> pathSpec,
final @JsonProperty("metadataUpdateSpec") DbUpdaterJobSpec metadataUpdateSpec,
final @JsonProperty("segmentOutputPath") String segmentOutputPath
)
{
this.pathSpec = pathSpec;
this.metadataUpdateSpec = metadataUpdateSpec;
this.segmentOutputPath = segmentOutputPath;
}
@JsonProperty("inputSpec")
public Map<String, Object> getPathSpec()
{
return pathSpec;
}
@JsonProperty("metadataUpdateSpec")
public DbUpdaterJobSpec getMetadataUpdateSpec()
{
return metadataUpdateSpec;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
}

View File

@ -0,0 +1,319 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopIngestionSchema implements IngestionSchema
{
public static HadoopIngestionSchema convertLegacy(
String dataSource,
TimestampSpec timestampSpec,
DataSpec dataSpec,
GranularitySpec granularitySpec,
Map<String, Object> pathSpec,
String workingPath,
String segmentOutputPath,
String version,
PartitionsSpec partitionsSpec,
boolean leaveIntermediate,
Boolean cleanupOnFailure,
Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
boolean overwriteFiles,
DataRollupSpec rollupSpec,
DbUpdaterJobSpec updaterJobSpec,
boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
String timestampColumn,
String timestampFormat,
List<Interval> intervals,
Granularity segmentGranularity,
String partitionDimension,
Long targetPartitionSize
)
{
return new HadoopIngestionSchema(
null, null, null,
dataSource,
timestampSpec,
dataSpec,
granularitySpec,
pathSpec,
workingPath,
segmentOutputPath,
version,
partitionsSpec,
leaveIntermediate,
cleanupOnFailure,
shardSpecs,
overwriteFiles,
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
timestampColumn,
timestampFormat,
intervals,
segmentGranularity,
partitionDimension,
targetPartitionSize
);
}
private final DataSchema dataSchema;
private final HadoopIOConfig ioConfig;
private final HadoopDriverConfig driverConfig;
@JsonCreator
public HadoopIngestionSchema(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
@JsonProperty("driverConfig") HadoopDriverConfig driverConfig,
// All deprecated
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
)
{
if (dataSchema != null) {
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
} else { // Backwards compatibility
TimestampSpec theTimestampSpec = (timestampSpec == null)
? new TimestampSpec(timestampColumn, timestampFormat)
: timestampSpec;
List<String> dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
for (AggregatorFactory aggregatorFactory : rollupSpec.getAggs()) {
dimensionExclusions.add(aggregatorFactory.getName());
}
PartitionsSpec thePartitionSpec;
if (partitionsSpec != null) {
Preconditions.checkArgument(
partitionDimension == null && targetPartitionSize == null,
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
);
thePartitionSpec = partitionsSpec;
} else {
// Backwards compatibility
thePartitionSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if (dataSpec.hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
dimensionExclusions.addAll(
Lists.transform(
rollupSpec.getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
GranularitySpec theGranularitySpec = null;
if (granularitySpec != null) {
Preconditions.checkArgument(
segmentGranularity == null && intervals == null,
"Cannot mix granularitySpec with segmentGranularity/intervals"
);
theGranularitySpec = granularitySpec;
} else {
// Backwards compatibility
if (segmentGranularity != null && intervals != null) {
theGranularitySpec = new UniformGranularitySpec(segmentGranularity, null, intervals, segmentGranularity);
}
}
this.dataSchema = new DataSchema(
dataSource,
new StringInputRowParser(
new ParseSpec(
theTimestampSpec,
new DimensionsSpec(
dataSpec.getDimensions(),
dimensionExclusions,
dataSpec.getSpatialDimensions()
)
)
{
},
null, null, null, null
),
rollupSpec.getAggs().toArray(new AggregatorFactory[rollupSpec.getAggs().size()]),
theGranularitySpec
);
this.ioConfig = new HadoopIOConfig(
pathSpec,
updaterJobSpec,
segmentOutputPath
);
this.driverConfig = new HadoopDriverConfig(
workingPath,
version,
thePartitionSpec,
shardSpecs,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows
);
}
}
@JsonProperty("dataSchema")
@Override
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty("ioConfig")
@Override
public HadoopIOConfig getIOConfig()
{
return ioConfig;
}
@JsonProperty("driverConfig")
@Override
public HadoopDriverConfig getDriverConfig()
{
return driverConfig;
}
public HadoopIngestionSchema withDriverConfig(HadoopDriverConfig config)
{
return new HadoopIngestionSchema(
dataSchema,
ioConfig,
config,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
null,
null,
null,
null,
null
);
}
public HadoopIngestionSchema withDataSchema(DataSchema schema)
{
return new HadoopIngestionSchema(
schema,
ioConfig,
driverConfig,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -26,7 +26,7 @@ import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.hadoop.FSSpideringIterator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -71,7 +71,7 @@ public class HadoopDruidIndexerConfigTest
);
Assert.assertEquals(
"getGranularity",
"getSegmentGranularity",
"HOUR",
granularitySpec.getGranularity().toString()
);
@ -105,7 +105,7 @@ public class HadoopDruidIndexerConfigTest
);
Assert.assertEquals(
"getGranularity",
"getSegmentGranularity",
"DAY",
granularitySpec.getGranularity().toString()
);

View File

@ -29,19 +29,18 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.logger.Logger;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
@ -82,7 +81,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
@Override
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}

View File

@ -34,7 +34,7 @@ import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.indexer.Jobby;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -68,7 +68,7 @@ public class HadoopIndexTask extends AbstractTask
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@JsonIgnore
private final HadoopDruidIndexerSchema schema;
private final HadoopIngestionSchema schema;
@JsonIgnore
private final String hadoopCoordinates;
@ -85,16 +85,16 @@ public class HadoopIndexTask extends AbstractTask
@JsonCreator
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerSchema schema,
@JsonProperty("config") HadoopIngestionSchema schema,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
)
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
schema.getDataSource()
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSchema().getDataSource(), new DateTime()),
schema.getDataSchema().getDataSource()
);
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
// Some HadoopIngestionSchema stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
@ -126,7 +126,7 @@ public class HadoopIndexTask extends AbstractTask
}
@JsonProperty("config")
public HadoopDruidIndexerSchema getSchema()
public HadoopIngestionSchema getSchema()
{
return schema;
}
@ -186,8 +186,8 @@ public class HadoopIndexTask extends AbstractTask
};
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
HadoopDruidIndexerSchema indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopDruidIndexerSchema.class);
HadoopIngestionSchema indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopIngestionSchema.class);
// We should have a lock from before we started running only if interval was specified
@ -239,10 +239,10 @@ public class HadoopIndexTask extends AbstractTask
final String schema = args[0];
String version = args[1];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
final HadoopIngestionSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
HadoopIngestionSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
@ -268,10 +268,10 @@ public class HadoopIndexTask extends AbstractTask
final String workingPath = args[1];
final String segmentOutputPath = args[2];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
final HadoopIngestionSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
HadoopIngestionSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)

View File

@ -39,17 +39,15 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment;
@ -157,7 +155,7 @@ public class IndexTask extends AbstractFixedIntervalTask
getDataSource(),
firehoseFactory.getParser(),
aggregators,
new io.druid.segment.indexing.GranularitySpec(null, indexGranularity),
granularitySpec.withQueryGranularity(indexGranularity),
shardSpec
),
bucket,
@ -176,7 +174,8 @@ public class IndexTask extends AbstractFixedIntervalTask
try (Firehose firehose = firehoseFactory.connect(firehoseFactory.getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
Interval interval = granularitySpec.getSegmentGranularity()
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
}
}

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closeables;
import com.metamx.common.Granularity;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
@ -42,9 +43,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.DriverConfig;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -95,7 +94,7 @@ public class RealtimeIndexTask extends AbstractTask
private final int maxPendingPersists;
@JsonIgnore
private final SegmentGranularity segmentGranularity;
private final Granularity segmentGranularity;
@JsonIgnore
private final RejectionPolicyFactory rejectionPolicyFactory;
@ -115,7 +114,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") SegmentGranularity segmentGranularity,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
@ -316,7 +315,15 @@ public class RealtimeIndexTask extends AbstractTask
fireDepartmentConfig.getIntermediatePersistPeriod()
);
final FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, driverConfig, null, null, null, null);
final FireDepartment fireDepartment = new FireDepartment(
dataSchema,
realtimeIOConfig,
driverConfig,
null,
null,
null,
null
);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
this.plumber = realtimePlumberSchool.findPlumber(dataSchema, fireDepartment.getMetrics());
@ -417,7 +424,7 @@ public class RealtimeIndexTask extends AbstractTask
}
@JsonProperty
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}

View File

@ -28,14 +28,13 @@ import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
@ -199,7 +198,7 @@ public class TaskSerdeTest
null,
new Period("PT10M"),
1,
SegmentGranularity.HOUR,
Granularity.HOUR,
null
);
@ -213,7 +212,7 @@ public class TaskSerdeTest
Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity());
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod());
Assert.assertEquals(SegmentGranularity.HOUR, task.getSegmentGranularity());
Assert.assertEquals(Granularity.HOUR, task.getSegmentGranularity());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
@ -354,7 +353,7 @@ public class TaskSerdeTest
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopDruidIndexerSchema(
new HadoopIngestionSchema(
"foo",
new TimestampSpec("timestamp", "auto"),
new JSONDataSpec(ImmutableList.of("foo"), null),

View File

@ -43,7 +43,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;

View File

@ -20,6 +20,7 @@
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.RealtimeIndexTask;
@ -27,7 +28,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
@ -47,7 +47,7 @@ public class TaskAnnouncementTest
null,
new Period("PT10M"),
1,
SegmentGranularity.HOUR,
Granularity.HOUR,
null
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -1,549 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Hours;
import org.joda.time.Minutes;
import org.joda.time.Months;
import org.joda.time.MutableDateTime;
import org.joda.time.ReadableInterval;
import org.joda.time.Weeks;
import org.joda.time.Years;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
*/
public enum SegmentGranularity
{
MINUTE
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM/'d'=dd/'H'=HH/'M'=mm");
private final int MILLIS_IN = 60 * 1000;
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Minutes.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Minutes.minutes(count));
}
@Override
public long increment(long timeMillis)
{
return timeMillis + MILLIS_IN;
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Minutes.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Minutes.minutes(count));
}
@Override
public long decrement(long timeMillis)
{
return timeMillis - MILLIS_IN;
}
@Override
public long truncate(long timeMillis)
{
return QueryGranularity.MINUTE.truncate(timeMillis);
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfSecond(0);
mutableDateTime.setSecondOfMinute(0);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Minutes.minutesIn(interval).getMinutes();
}
},
HOUR
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM/'d'=dd/'H'=HH");
private final int MILLIS_IN = 60 * 60 * 1000;
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Hours.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Hours.hours(count));
}
@Override
public long increment(long timeMillis)
{
return timeMillis + MILLIS_IN;
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Hours.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Hours.hours(count));
}
@Override
public long decrement(long timeMillis)
{
return timeMillis - MILLIS_IN;
}
@Override
public long truncate(long timeMillis)
{
return QueryGranularity.HOUR.truncate(timeMillis);
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfSecond(0);
mutableDateTime.setSecondOfMinute(0);
mutableDateTime.setMinuteOfHour(0);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Hours.hoursIn(interval).getHours();
}
},
DAY
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM/'d'=dd");
private final int MILLIS_IN = 24 * 60 * 60 * 1000;
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Days.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Days.days(count));
}
@Override
public long increment(long timeMillis)
{
return timeMillis + MILLIS_IN;
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Days.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Days.days(count));
}
@Override
public long decrement(long timeMillis)
{
return timeMillis - MILLIS_IN;
}
@Override
public long truncate(long timeMillis)
{
return QueryGranularity.DAY.truncate(timeMillis);
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfDay(0);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Days.daysIn(interval).getDays();
}
},
WEEK
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM/'d'=dd");
private final int MILLIS_IN = 7 * 24 * 60 * 60 * 1000;
@Override
public String toPath(DateTime time)
{
throw new UnsupportedOperationException("Not Implemented Yet.");
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Weeks.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Weeks.weeks(count));
}
@Override
public long increment(long timeMillis)
{
return timeMillis + MILLIS_IN;
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Weeks.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Weeks.weeks(count));
}
@Override
public long decrement(long timeMillis)
{
return timeMillis - MILLIS_IN;
}
@Override
public long truncate(long timeMillis)
{
return truncate(new DateTime(timeMillis)).getMillis();
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfDay(0);
mutableDateTime.setDayOfWeek(1);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Weeks.weeksIn(interval).getWeeks();
}
},
MONTH
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM");
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Months.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Months.months(count));
}
@Override
public long increment(long timeMillis)
{
return new DateTime(timeMillis).plus(Months.ONE).getMillis();
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Months.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Months.months(count));
}
@Override
public long decrement(long timeMillis)
{
return new DateTime(timeMillis).minus(Months.ONE).getMillis();
}
@Override
public long truncate(long timeMillis)
{
return truncate(new DateTime(timeMillis)).getMillis();
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfDay(0);
mutableDateTime.setDayOfMonth(1);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Months.monthsIn(interval).getMonths();
}
},
YEAR
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy");
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
return time.plus(Years.ONE);
}
@Override
public DateTime increment(DateTime time, int count)
{
return time.plus(Years.years(count));
}
@Override
public long increment(long timeMillis)
{
return new DateTime(timeMillis).plus(Years.ONE).getMillis();
}
@Override
public DateTime decrement(DateTime time)
{
return time.minus(Years.ONE);
}
@Override
public DateTime decrement(DateTime time, int count)
{
return time.minus(Years.years(count));
}
@Override
public long decrement(long timeMillis)
{
return new DateTime(timeMillis).minus(Years.ONE).getMillis();
}
@Override
public long truncate(long timeMillis)
{
return truncate(new DateTime(timeMillis)).getMillis();
}
@Override
public DateTime truncate(DateTime time)
{
final MutableDateTime mutableDateTime = time.toMutableDateTime();
mutableDateTime.setMillisOfDay(0);
mutableDateTime.setDayOfMonth(1);
mutableDateTime.setMonthOfYear(1);
return mutableDateTime.toDateTime();
}
@Override
public int numIn(ReadableInterval interval)
{
return Years.yearsIn(interval).getYears();
}
},
LATEST
{
DateTimeFormatter format = DateTimeFormat.forPattern("'y'=yyyy/'m'=MM/'d'=dd/'H'=HH/'M'=mm");
@Override
public String toPath(DateTime time)
{
return format.print(time);
}
@Override
public DateTime increment(DateTime time)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public DateTime increment(DateTime time, int count)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public long increment(long timeMillis)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public DateTime decrement(DateTime time)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public DateTime decrement(DateTime time, int count)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public long decrement(long timeMillis)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
@Override
public long truncate(long timeMillis)
{
return 0;
}
@Override
public DateTime truncate(DateTime time)
{
return new DateTime(0L);
}
@Override
public int numIn(ReadableInterval interval)
{
throw new UnsupportedOperationException("LATEST exists for client-side only");
}
};
public abstract String toPath(DateTime time);
public abstract DateTime increment(DateTime time);
public abstract DateTime increment(DateTime time, int count);
public abstract long increment(long timeMillis);
public abstract DateTime decrement(DateTime time);
public abstract DateTime decrement(DateTime time, int count);
public abstract long decrement(long timeMillis);
public abstract long truncate(long timeMillis);
public abstract DateTime truncate(DateTime time);
public abstract int numIn(ReadableInterval interval);
@JsonCreator
public static SegmentGranularity fromString(String s)
{
return SegmentGranularity.valueOf(s.toUpperCase());
}
}

View File

@ -22,11 +22,9 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.impl.InputRowParser;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
/**
*/
@ -36,16 +34,14 @@ public class DataSchema
private final InputRowParser parser;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final ShardSpec shardSpec;
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("parser") InputRowParser parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
@JsonProperty("granularitySpec") GranularitySpec granularitySpec
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(aggregators, "metrics");
@ -55,7 +51,6 @@ public class DataSchema
this.parser = parser;
this.aggregators = aggregators;
this.granularitySpec = granularitySpec;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
}
@JsonProperty
@ -82,9 +77,8 @@ public class DataSchema
return granularitySpec;
}
@JsonProperty
public ShardSpec getShardSpec()
public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
{
return shardSpec;
return new DataSchema(dataSource, parser, aggregators, granularitySpec);
}
}

View File

@ -1,55 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.granularity.QueryGranularity;
import io.druid.segment.SegmentGranularity;
/**
*/
public class GranularitySpec
{
private final SegmentGranularity segmentGranularity;
private final QueryGranularity queryGranularity;
@JsonCreator
public GranularitySpec(
@JsonProperty("segmentGranularity") SegmentGranularity segmentGranularity,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity
)
{
this.segmentGranularity = segmentGranularity;
this.queryGranularity = queryGranularity;
}
@JsonProperty
public SegmentGranularity getSegmentGranularity()
{
return segmentGranularity;
}
@JsonProperty
public QueryGranularity getQueryGranularity()
{
return queryGranularity;
}
}

View File

@ -19,44 +19,13 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class IngestionSchema
public interface IngestionSchema
{
private final DataSchema dataSchema;
private final IOConfig ioConfig;
private final DriverConfig driverConfig;
public DataSchema getDataSchema();
@JsonCreator
public IngestionSchema(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("io") IOConfig ioConfig,
@JsonProperty("driverConfig") DriverConfig driverConfig
)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
}
public IOConfig getIOConfig();
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty("io")
public IOConfig getIoConfig()
{
return ioConfig;
}
@JsonProperty("config")
public DriverConfig getDriverConfig()
{
return driverConfig;
}
public DriverConfig getDriverConfig();
}

View File

@ -21,6 +21,8 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.Period;
/**
@ -29,15 +31,18 @@ public class RealtimeDriverConfig implements DriverConfig
{
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final ShardSpec shardSpec;
@JsonCreator
public RealtimeDriverConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
}
@JsonProperty
@ -51,4 +56,10 @@ public class RealtimeDriverConfig implements DriverConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public ShardSpec getShardSpec()
{
return shardSpec;
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.granularity;
package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -27,6 +27,7 @@ import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -91,7 +92,19 @@ public class ArbitraryGranularitySpec implements GranularitySpec
}
@Override
public Granularity getGranularity()
public Granularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}
@Override
public QueryGranularity getQueryGranularity()
{
throw new UnsupportedOperationException();
}
@Override
public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity)
{
throw new UnsupportedOperationException();
}

View File

@ -17,12 +17,13 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.granularity;
package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -45,5 +46,10 @@ public interface GranularitySpec
/** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getGranularity();
public Granularity getSegmentGranularity();
public QueryGranularity getQueryGranularity();
@Deprecated
public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity);
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.granularity;
package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -34,17 +35,24 @@ import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec
{
final private Granularity granularity;
final private Iterable<Interval> inputIntervals;
private final Granularity segmentGranularity;
private final QueryGranularity queryGranularity;
final private List<Interval> inputIntervals;
final private ArbitraryGranularitySpec wrappedSpec;
@JsonCreator
public UniformGranularitySpec(
@JsonProperty("gran") Granularity granularity,
@JsonProperty("intervals") List<Interval> inputIntervals
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("intervals") List<Interval> inputIntervals,
// Backwards compatible
@JsonProperty("gran") Granularity granularity
)
{
this.granularity = granularity;
this.segmentGranularity = segmentGranularity == null ? granularity : segmentGranularity;
this.queryGranularity = queryGranularity;
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();
for (Interval inputInterval : inputIntervals) {
@ -75,14 +83,31 @@ public class UniformGranularitySpec implements GranularitySpec
}
@Override
@JsonProperty("gran")
public Granularity getGranularity()
@JsonProperty("segmentGranularity")
public Granularity getSegmentGranularity()
{
return granularity;
return segmentGranularity;
}
@Override
public QueryGranularity getQueryGranularity()
{
return queryGranularity;
}
@Override
public GranularitySpec withQueryGranularity(QueryGranularity queryGranularity)
{
return new UniformGranularitySpec(
segmentGranularity,
queryGranularity,
inputIntervals,
segmentGranularity
);
}
@JsonProperty("intervals")
public Optional<Iterable<Interval>> getIntervals()
public Optional<List<Interval>> getIntervals()
{
return Optional.fromNullable(inputIntervals);
}

View File

@ -21,16 +21,18 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Lists;
import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import org.joda.time.Interval;
import java.io.IOException;
@ -41,7 +43,7 @@ import java.io.IOException;
* realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes
* sure that the sinks don't overflow.
*/
public class FireDepartment extends IngestionSchema
public class FireDepartment implements IngestionSchema
{
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
@ -52,7 +54,7 @@ public class FireDepartment extends IngestionSchema
@JsonCreator
public FireDepartment(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("io") RealtimeIOConfig ioConfig,
@JsonProperty("ioConfig") RealtimeIOConfig ioConfig,
@JsonProperty("driverConfig") RealtimeDriverConfig driverConfig,
// Backwards Compatability
@JsonProperty("schema") Schema schema,
@ -61,8 +63,6 @@ public class FireDepartment extends IngestionSchema
@JsonProperty("plumber") PlumberSchool plumberSchool
)
{
super(dataSchema, ioConfig, driverConfig);
// Backwards compatibility
if (dataSchema == null) {
Preconditions.checkNotNull(schema, "schema");
@ -74,11 +74,12 @@ public class FireDepartment extends IngestionSchema
schema.getDataSource(),
firehoseFactory.getParser(),
schema.getAggregators(),
new GranularitySpec(
new UniformGranularitySpec(
plumberSchool.getSegmentGranularity(),
schema.getIndexGranularity()
),
schema.getShardSpec()
schema.getIndexGranularity(),
Lists.<Interval>newArrayList(),
plumberSchool.getSegmentGranularity()
)
);
this.ioConfig = new RealtimeIOConfig(
firehoseFactory,
@ -86,7 +87,8 @@ public class FireDepartment extends IngestionSchema
);
this.driverConfig = new RealtimeDriverConfig(
config.getMaxRowsInMemory(),
config.getIntermediatePersistPeriod()
config.getIntermediatePersistPeriod(),
schema.getShardSpec()
);
} else {
Preconditions.checkNotNull(dataSchema, "dataSchema");
@ -104,12 +106,23 @@ public class FireDepartment extends IngestionSchema
*
* @return the Schema for this feed.
*/
public DataSchema getSchema()
@JsonProperty("dataSchema")
@Override
public DataSchema getDataSchema()
{
return dataSchema;
}
public RealtimeDriverConfig getConfig()
@JsonProperty("ioConfig")
@Override
public RealtimeIOConfig getIOConfig()
{
return ioConfig;
}
@JsonProperty("driverConfig")
@Override
public RealtimeDriverConfig getDriverConfig()
{
return driverConfig;
}

View File

@ -40,7 +40,6 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
@ -80,7 +79,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void start() throws IOException
{
for (final FireDepartment fireDepartment : fireDepartments) {
DataSchema schema = fireDepartment.getSchema();
DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment);
chiefs.put(schema.getDataSource(), chief);
@ -145,7 +144,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void init() throws IOException
{
config = fireDepartment.getConfig();
config = fireDepartment.getDriverConfig();
synchronized (this) {
try {
@ -217,12 +216,12 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
} catch (RuntimeException e) {
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
log.makeAlert(e, "RuntimeException aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
} catch (Error e) {
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getSchema().getDataSource())
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
@ -243,7 +242,7 @@ public class RealtimeManager implements QuerySegmentWalker
Preconditions.checkNotNull(firehose, "firehose is null, init() must be called first.");
Preconditions.checkNotNull(plumber, "plumber is null, init() must be called first.");
log.info("FireChief[%s] state ok.", fireDepartment.getSchema().getDataSource());
log.info("FireChief[%s] state ok.", fireDepartment.getDataSchema().getDataSource());
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)

View File

@ -54,7 +54,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
}
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setUser2(fireDepartment.getSchema().getDataSource());
.setUser2(fireDepartment.getDataSchema().getDataSource());
emitter.emit(builder.build("events/thrownAway", metrics.thrownAway() - previous.thrownAway()));
emitter.emit(builder.build("events/unparseable", metrics.unparseable() - previous.unparseable()));

View File

@ -2,15 +2,14 @@ package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -39,7 +38,7 @@ public class FlushingPlumber extends RealtimePlumber
Duration flushDuration,
Period windowPeriod,
File basePersistDirectory,
SegmentGranularity segmentGranularity,
Granularity segmentGranularity,
DataSchema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
@ -120,20 +119,26 @@ public class FlushingPlumber extends RealtimePlumber
private void startFlushThread()
{
final long truncatedNow = getSegmentGranularity().truncate(new DateTime()).getMillis();
final DateTime truncatedNow = getSegmentGranularity().truncate(new DateTime());
final long windowMillis = getWindowPeriod().toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis)
new Duration(
System.currentTimeMillis(),
getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
flushScheduledExec,
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis),
new Duration(
System.currentTimeMillis(),
getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
),
new Duration(truncatedNow, getSegmentGranularity().increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(

View File

@ -23,14 +23,13 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.common.Granularity;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -50,7 +49,7 @@ public class FlushingPlumberSchool implements PlumberSchool
private final Duration flushDuration;
private final Period windowPeriod;
private final File basePersistDirectory;
private final SegmentGranularity segmentGranularity;
private final Granularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@ -78,7 +77,7 @@ public class FlushingPlumberSchool implements PlumberSchool
@JsonProperty("flushDuration") Duration flushDuration,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") SegmentGranularity segmentGranularity
@JsonProperty("segmentGranularity") Granularity segmentGranularity
)
{
this.flushDuration = flushDuration;
@ -122,7 +121,7 @@ public class FlushingPlumberSchool implements PlumberSchool
}
@Override
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}

View File

@ -21,13 +21,9 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.segment.SegmentGranularity;
import com.metamx.common.Granularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import org.joda.time.Period;
import java.io.File;
/**
*/
@ -45,5 +41,5 @@ public interface PlumberSchool
*/
public Plumber findPlumber(DataSchema schema, FireDepartmentMetrics metrics);
public SegmentGranularity getSegmentGranularity();
public Granularity getSegmentGranularity();
}

View File

@ -9,6 +9,7 @@ import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
@ -29,7 +30,6 @@ import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
@ -39,7 +39,6 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -72,7 +71,7 @@ public class RealtimePlumber implements Plumber
private final Period windowPeriod;
private final File basePersistDirectory;
private final SegmentGranularity segmentGranularity;
private final Granularity segmentGranularity;
private final DataSchema schema;
private final FireDepartmentMetrics metrics;
private final RejectionPolicy rejectionPolicy;
@ -100,7 +99,7 @@ public class RealtimePlumber implements Plumber
public RealtimePlumber(
Period windowPeriod,
File basePersistDirectory,
SegmentGranularity segmentGranularity,
Granularity segmentGranularity,
DataSchema schema,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
@ -142,7 +141,7 @@ public class RealtimePlumber implements Plumber
return windowPeriod;
}
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@ -179,7 +178,7 @@ public class RealtimePlumber implements Plumber
return null;
}
final long truncatedTime = segmentGranularity.truncate(timestamp);
final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis();
Sink retVal = sinks.get(truncatedTime);
@ -546,20 +545,26 @@ public class RealtimePlumber implements Plumber
protected void startPersistThread()
{
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final DateTime truncatedNow = segmentGranularity.truncate(new DateTime());
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
new Duration(
System.currentTimeMillis(),
segmentGranularity.increment(truncatedNow).getMillis() + windowMillis
)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
new Duration(
System.currentTimeMillis(),
segmentGranularity.increment(truncatedNow).getMillis() + windowMillis
),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(

View File

@ -23,16 +23,15 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.common.Granularity;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Period;
@ -51,7 +50,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final Period windowPeriod;
private final File basePersistDirectory;
private final SegmentGranularity segmentGranularity;
private final Granularity segmentGranularity;
@JacksonInject
@NotNull
@ -90,7 +89,7 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") SegmentGranularity segmentGranularity
@JsonProperty("segmentGranularity") Granularity segmentGranularity
)
{
this.windowPeriod = windowPeriod;
@ -159,7 +158,7 @@ public class RealtimePlumberSchool implements PlumberSchool
}
@Override
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.granularity;
package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.granularity;
package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
@ -32,7 +33,6 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
@ -74,7 +74,7 @@ public class RealtimeManagerTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(null, QueryGranularity.NONE),
new GranularitySpec(Granularity.HOUR, QueryGranularity.NONE),
new NoneShardSpec()
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
@ -103,7 +103,7 @@ public class RealtimeManagerTest
}
@Override
public SegmentGranularity getSegmentGranularity()
public Granularity getSegmentGranularity()
{
throw new UnsupportedOperationException();
}

View File

@ -20,10 +20,10 @@
package io.druid.segment.realtime.plumber;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.service.ServiceEmitter;
@ -32,7 +32,6 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -40,12 +39,10 @@ import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.SegmentGranularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
@ -108,14 +105,14 @@ public class RealtimePlumberSchoolTest
}
},
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(null, QueryGranularity.NONE),
new GranularitySpec(Granularity.HOUR, QueryGranularity.NONE),
new NoneShardSpec()
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
SegmentGranularity.HOUR
Granularity.HOUR
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime.plumber;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@ -47,7 +48,7 @@ public class SinkTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(null, QueryGranularity.MINUTE),
new GranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE),
new NoneShardSpec()
);