unit tests pass at this point

This commit is contained in:
fjy 2014-02-20 15:52:12 -08:00
parent 20cac8c506
commit 5d2367f0fd
34 changed files with 950 additions and 966 deletions

View File

@ -45,7 +45,7 @@ public class DbUpdaterJob implements Jobby
)
{
this.config = config;
this.dbi = new DbConnector(config.getUpdaterJobSpec(), null).getDBI();
this.dbi = new DbConnector(config.getSchema().getIOConfig().getMetadataUpdateSpec(), null).getDBI();
}
@Override
@ -63,7 +63,7 @@ public class DbUpdaterJob implements Jobby
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getUpdaterJobSpec().getSegmentTable()
config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable()
)
);
for (final DataSegment segment : segments) {

View File

@ -131,7 +131,14 @@ public class DetermineHashedPartitionsJob implements Jobby
{
}
);
config.setGranularitySpec(new UniformGranularitySpec(config.getGranularitySpec().getGranularity(), intervals));
config.setGranularitySpec(
new UniformGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
intervals,
config.getGranularitySpec().getSegmentGranularity()
)
);
log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals());
}
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
@ -201,8 +208,8 @@ public class DetermineHashedPartitionsJob implements Jobby
throws IOException, InterruptedException
{
super.setup(context);
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity();
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
@ -231,7 +238,9 @@ public class DetermineHashedPartitionsJob implements Jobby
);
Interval interval;
if (determineIntervals) {
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
interval = config.getGranularitySpec()
.getSegmentGranularity()
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
@ -282,7 +291,7 @@ public class DetermineHashedPartitionsJob implements Jobby
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
}
@Override
@ -302,7 +311,7 @@ public class DetermineHashedPartitionsJob implements Jobby
e.printStackTrace(); // TODO: check for better handling
}
}
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
final OutputStream out = Utils.makePathAndOutputStream(

View File

@ -114,8 +114,11 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment.
*/
if(!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)){
throw new ISE("DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]", config.getPartitionsSpec());
if (!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)) {
throw new ISE(
"DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]",
config.getPartitionsSpec()
);
}
if (!config.getPartitionsSpec().isAssumeGrouped()) {
@ -250,7 +253,7 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
super.setup(context);
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity();
}
@Override
@ -297,7 +300,7 @@ public class DeterminePartitionsJob implements Jobby
protected void setup(Context context)
throws IOException, InterruptedException
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
}
@ -329,7 +332,7 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
}
@ -449,7 +452,7 @@ public class DeterminePartitionsJob implements Jobby
if (config == null) {
synchronized (DeterminePartitionsDimSelectionBaseReducer.class) {
if (config == null) {
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
}
}
}
@ -724,7 +727,9 @@ public class DeterminePartitionsJob implements Jobby
}
final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(config.getGranularitySpec().bucketInterval(bucket).get()), config.isOverwriteFiles()
context,
config.makeSegmentPartitionInfoPath(config.getGranularitySpec().bucketInterval(bucket).get()),
config.isOverwriteFiles()
);
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD

View File

@ -36,6 +36,7 @@ public class HadoopDriverConfig implements DriverConfig
private final String version;
private final PartitionsSpec partitionsSpec;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final int rowFlushBoundary;
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
private final boolean overwriteFiles;
@ -46,6 +47,7 @@ public class HadoopDriverConfig implements DriverConfig
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("rowFlushBoundary") int rowFlushBoundary,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
@ -56,6 +58,7 @@ public class HadoopDriverConfig implements DriverConfig
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec;
this.shardSpecs = shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
this.overwriteFiles = overwriteFiles;
@ -86,6 +89,12 @@ public class HadoopDriverConfig implements DriverConfig
return shardSpecs;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
@ -93,7 +102,7 @@ public class HadoopDriverConfig implements DriverConfig
}
@JsonProperty
public Boolean getCleanupOnFailure()
public Boolean isCleanupOnFailure()
{
return cleanupOnFailure;
}
@ -110,6 +119,21 @@ public class HadoopDriverConfig implements DriverConfig
return ignoreInvalidRows;
}
public HadoopDriverConfig withWorkingPath(String path)
{
return new HadoopDriverConfig(
path,
version,
partitionsSpec,
shardSpecs,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows
);
}
public HadoopDriverConfig withVersion(String ver)
{
return new HadoopDriverConfig(
@ -117,6 +141,7 @@ public class HadoopDriverConfig implements DriverConfig
ver,
partitionsSpec,
shardSpecs,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@ -131,6 +156,7 @@ public class HadoopDriverConfig implements DriverConfig
version,
partitionsSpec,
specs,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,

View File

@ -21,6 +21,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@ -33,21 +34,16 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
@ -62,6 +58,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
@ -106,75 +103,109 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSchema schema)
{
return new HadoopDruidIndexerConfig(schema);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
return new HadoopDruidIndexerConfig(
HadoopDruidIndexerConfig.jsonMapper
.convertValue(argSpec, HadoopIngestionSchema.class)
);
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private volatile HadoopIngestionSchema schema;
private volatile PathSpec pathSpec;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("schema") HadoopIngestionSchema schema,
// Backwards compatibility
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
final @JsonProperty("schema") HadoopIngestionSchema schema
)
{
if (schema != null) {
this.schema = schema;
} else {
this.schema = HadoopIngestionSchema.convertLegacy(
dataSource,
timestampSpec,
dataSpec,
granularitySpec,
pathSpec,
workingPath,
segmentOutputPath,
version,
partitionsSpec,
leaveIntermediate,
cleanupOnFailure,
shardSpecs,
overwriteFiles,
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
timestampColumn,
timestampFormat,
intervals,
segmentGranularity,
partitionDimension,
targetPartitionSize
);
}
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
@JsonProperty
public HadoopIngestionSchema getSchema()
{
return schema;
}
public String getDataSource()
{
return schema.getDataSchema().getDataSource();
}
public GranularitySpec getGranularitySpec()
{
return schema.getDataSchema().getGranularitySpec();
}
public void setGranularitySpec(GranularitySpec granularitySpec)
{
this.schema = schema.withDataSchema(schema.getDataSchema().withGranularitySpec(granularitySpec));
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public PartitionsSpec getPartitionsSpec()
{
return schema.getDriverConfig().getPartitionsSpec();
}
public boolean isOverwriteFiles()
{
return schema.getDriverConfig().isOverwriteFiles();
}
public boolean isIgnoreInvalidRows()
{
return schema.getDriverConfig().isIgnoreInvalidRows();
}
public void setVersion(String version)
{
this.schema = schema.withDriverConfig(schema.getDriverConfig().withVersion(version));

View File

@ -1,282 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidIndexerConfigBuilder
{
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSchema schema)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
}
public static HadoopIngestionSchema toSchema(HadoopDruidIndexerConfig config){
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopIngestionSchema.class);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private volatile String dataSource;
private volatile TimestampSpec timestampSpec;
private volatile DataSpec dataSpec;
private volatile GranularitySpec granularitySpec;
private volatile PathSpec pathSpec;
private volatile String workingPath;
private volatile String segmentOutputPath;
private volatile String version;
private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate;
private volatile boolean cleanupOnFailure;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private volatile boolean overwriteFiles;
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows;
public HadoopDruidIndexerConfigBuilder()
{
this.dataSource = null;
this.timestampSpec = null;
this.dataSpec = null;
this.granularitySpec = null;
this.pathSpec = null;
this.workingPath = null;
this.segmentOutputPath = null;
this.version = new DateTime().toString();
this.partitionsSpec = null;
this.leaveIntermediate = false;
this.cleanupOnFailure = true;
this.shardSpecs = ImmutableMap.of();
this.overwriteFiles = false;
this.rollupSpec = null;
this.updaterJobSpec = null;
this.ignoreInvalidRows = false;
}
public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public HadoopDruidIndexerConfigBuilder withTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withDataSpec(DataSpec dataSpec)
{
this.dataSpec = dataSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withGranularitySpec(GranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withPathSpec(PathSpec pathSpec)
{
this.pathSpec = pathSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withWorkingPath(String workingPath)
{
this.workingPath = workingPath;
return this;
}
public HadoopDruidIndexerConfigBuilder withSegmentOutputPath(String segmentOutputPath)
{
this.segmentOutputPath = segmentOutputPath;
return this;
}
public HadoopDruidIndexerConfigBuilder withVersion(String version)
{
this.version = version;
return this;
}
public HadoopDruidIndexerConfigBuilder withPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withLeaveIntermediate(boolean leaveIntermediate)
{
this.leaveIntermediate = leaveIntermediate;
return this;
}
public HadoopDruidIndexerConfigBuilder withCleanupOnFailure(boolean cleanupOnFailure)
{
this.cleanupOnFailure = cleanupOnFailure;
return this;
}
public HadoopDruidIndexerConfigBuilder withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
{
this.shardSpecs = shardSpecs;
return this;
}
public HadoopDruidIndexerConfigBuilder withOverwriteFiles(boolean overwriteFiles)
{
this.overwriteFiles = overwriteFiles;
return this;
}
public HadoopDruidIndexerConfigBuilder withRollupSpec(DataRollupSpec rollupSpec)
{
this.rollupSpec = rollupSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec)
{
this.updaterJobSpec = updaterJobSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withIgnoreInvalidRows(boolean ignoreInvalidRows)
{
this.ignoreInvalidRows = ignoreInvalidRows;
return this;
}
public HadoopDruidIndexerConfigBuilder withSchema(HadoopIngestionSchema schema)
{
this.dataSource = schema.getDataSource();
this.timestampSpec = schema.getTimestampSpec();
this.dataSpec = schema.getDataSpec();
this.granularitySpec = schema.getGranularitySpec();
this.pathSpec = HadoopDruidIndexerConfig.jsonMapper.convertValue(schema.getPathSpec(), PathSpec.class);
this.workingPath = schema.getWorkingPath();
this.segmentOutputPath = schema.getSegmentOutputPath();
this.version = schema.getVersion();
this.partitionsSpec = schema.getPartitionsSpec();
this.leaveIntermediate = schema.isLeaveIntermediate();
this.cleanupOnFailure = schema.isCleanupOnFailure();
this.shardSpecs = schema.getShardSpecs();
this.overwriteFiles = schema.isOverwriteFiles();
this.rollupSpec = schema.getRollupSpec();
this.updaterJobSpec = schema.getUpdaterJobSpec();
this.ignoreInvalidRows = schema.isIgnoreInvalidRows();
return this;
}
public HadoopDruidIndexerConfig build()
{
return new HadoopDruidIndexerConfig(
dataSource,
timestampSpec,
dataSpec,
granularitySpec,
pathSpec,
workingPath,
segmentOutputPath,
version,
partitionsSpec,
leaveIntermediate,
cleanupOnFailure,
shardSpecs,
overwriteFiles,
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -39,7 +39,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
}

View File

@ -61,4 +61,9 @@ public class HadoopIOConfig implements IOConfig
{
return segmentOutputPath;
}
public HadoopIOConfig withSegmentOutputPath(String path)
{
return new HadoopIOConfig(pathSpec, metadataUpdateSpec, path);
}
}

View File

@ -28,6 +28,7 @@ import com.metamx.common.Granularity;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.PartitionsSpec;
@ -146,10 +147,32 @@ public class HadoopIngestionSchema implements IngestionSchema
? new TimestampSpec(timestampColumn, timestampFormat)
: timestampSpec;
List<String> dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
if (rollupSpec != null) {
for (AggregatorFactory aggregatorFactory : rollupSpec.getAggs()) {
dimensionExclusions.add(aggregatorFactory.getName());
}
}
//
//if (dataSpec.hasCustomDimensions()) {
// dimensionExclusions = null;
//} else {
// dimensionExclusions = Lists.newArrayList();
// dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
// dimensionExclusions.addAll(
// Lists.transform(
// rollupSpec.getAggs(), new Function<AggregatorFactory, String>()
// {
// @Override
// public String apply(AggregatorFactory aggregatorFactory)
// {
// return aggregatorFactory.getName();
// }
// }
// )
// );
//}
PartitionsSpec thePartitionSpec;
if (partitionsSpec != null) {
@ -163,25 +186,6 @@ public class HadoopIngestionSchema implements IngestionSchema
thePartitionSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if (dataSpec.hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(theTimestampSpec.getTimestampColumn());
dimensionExclusions.addAll(
Lists.transform(
rollupSpec.getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
GranularitySpec theGranularitySpec = null;
if (granularitySpec != null) {
Preconditions.checkArgument(
@ -189,10 +193,18 @@ public class HadoopIngestionSchema implements IngestionSchema
"Cannot mix granularitySpec with segmentGranularity/intervals"
);
theGranularitySpec = granularitySpec;
if (rollupSpec != null) {
theGranularitySpec = theGranularitySpec.withQueryGranularity(rollupSpec.rollupGranularity);
}
} else {
// Backwards compatibility
if (segmentGranularity != null && intervals != null) {
theGranularitySpec = new UniformGranularitySpec(segmentGranularity, null, intervals, segmentGranularity);
theGranularitySpec = new UniformGranularitySpec(
segmentGranularity,
rollupSpec == null ? null : rollupSpec.rollupGranularity,
intervals,
segmentGranularity
);
}
}
@ -202,16 +214,18 @@ public class HadoopIngestionSchema implements IngestionSchema
new ParseSpec(
theTimestampSpec,
new DimensionsSpec(
dataSpec.getDimensions(),
dataSpec == null ? Lists.<String>newArrayList() : dataSpec.getDimensions(),
dimensionExclusions,
dataSpec.getSpatialDimensions()
dataSpec == null ? Lists.<SpatialDimensionSchema>newArrayList() : dataSpec.getSpatialDimensions()
)
)
{
},
null, null, null, null
),
rollupSpec.getAggs().toArray(new AggregatorFactory[rollupSpec.getAggs().size()]),
rollupSpec == null
? new AggregatorFactory[]{}
: rollupSpec.getAggs().toArray(new AggregatorFactory[rollupSpec.getAggs().size()]),
theGranularitySpec
);
@ -226,6 +240,7 @@ public class HadoopIngestionSchema implements IngestionSchema
version,
thePartitionSpec,
shardSpecs,
rollupSpec == null ? 50000 : rollupSpec.rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@ -255,37 +270,6 @@ public class HadoopIngestionSchema implements IngestionSchema
return driverConfig;
}
public HadoopIngestionSchema withDriverConfig(HadoopDriverConfig config)
{
return new HadoopIngestionSchema(
dataSchema,
ioConfig,
config,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
null,
null,
null,
null,
null
);
}
public HadoopIngestionSchema withDataSchema(DataSchema schema)
{
return new HadoopIngestionSchema(
@ -316,4 +300,66 @@ public class HadoopIngestionSchema implements IngestionSchema
null
);
}
public HadoopIngestionSchema withIOConfig(HadoopIOConfig config)
{
return new HadoopIngestionSchema(
dataSchema,
config,
driverConfig,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
null,
null,
null,
null,
null
);
}
public HadoopIngestionSchema withDriverConfig(HadoopDriverConfig config)
{
return new HadoopIngestionSchema(
dataSchema,
ioConfig,
config,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false,
null,
null,
false,
null,
null,
false,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -33,8 +33,8 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
@ -254,9 +254,9 @@ public class IndexGeneratorJob implements Jobby
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) {
metricNames.add(factory.getName().toLowerCase());
}
@ -272,10 +272,8 @@ public class IndexGeneratorJob implements Jobby
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final DataRollupSpec rollupSpec = config.getRollupSpec();
final AggregatorFactory[] aggs = rollupSpec.getAggs().toArray(
new AggregatorFactory[rollupSpec.getAggs().size()]
);
//final DataRollupSpec rollupSpec = config.getRollupSpec();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
@ -299,7 +297,7 @@ public class IndexGeneratorJob implements Jobby
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= rollupSpec.rowFlushBoundary) {
if (numRows >= config.getSchema().getDriverConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
@ -453,7 +451,7 @@ public class IndexGeneratorJob implements Jobby
DataSegment segment = new DataSegment(
config.getDataSource(),
interval,
config.getVersion(),
config.getSchema().getDriverConfig().getVersion(),
loadSpec,
dimensionNames,
metricNames,
@ -612,11 +610,17 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{
List<SpatialDimensionSchema> spatialDimensionSchemas = config.getSchema().getDataSchema().getParser() == null
? Lists.<SpatialDimensionSchema>newArrayList()
: config.getSchema().getDataSchema().getParser()
.getParseSpec()
.getDimensionsSpec()
.getSpatialDimensions();
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getDataSpec().getSpatialDimensions())
.withQueryGranularity(config.getRollupSpec().getRollupGranularity())
.withSpatialDimensions(spatialDimensionSchemas)
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build()
);

View File

@ -63,7 +63,7 @@ public class JobHelper
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
Path distributedClassPath = new Path(config.getSchema().getDriverConfig().getWorkingPath(), "classpath");
if (fs instanceof LocalFileSystem) {
return;
@ -74,7 +74,7 @@ public class JobHelper
if (jarFile.getName().endsWith(".jar")) {
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());
if (! existing.contains(hdfsPath)) {
if (!existing.contains(hdfsPath)) {
if (jarFile.getName().endsWith("SNAPSHOT.jar") || !fs.exists(hdfsPath)) {
log.info("Uploading jar to path[%s]", hdfsPath);
ByteStreams.copy(
@ -127,7 +127,8 @@ public class JobHelper
}
}
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config){
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config)
{
String failedMessage = null;
for (Jobby job : jobs) {
if (failedMessage == null) {
@ -137,8 +138,8 @@ public class JobHelper
}
}
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
if (!config.getSchema().getDriverConfig().isLeaveIntermediate()) {
if (failedMessage == null || config.getSchema().getDriverConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {

View File

@ -26,8 +26,8 @@ import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.hadoop.FSSpideringIterator;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -72,7 +72,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
final Path betaInput = new Path(getInputPath());
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getGranularity();
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getSegmentGranularity();
Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
Comparators.inverse(Comparators.<Comparable>comparable())
@ -93,7 +93,7 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
String bucketOutput = String.format(
"%s/%s",
config.getSegmentOutputPath(),
config.getSchema().getIOConfig().getSegmentOutputPath(),
segmentGranularity.toPath(timeBucket)
);
for (FileStatus fileStatus : FSSpideringIterator.spiderIterable(fs, new Path(bucketOutput))) {
@ -108,7 +108,14 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
}
}
config.setGranularitySpec(new UniformGranularitySpec(segmentGranularity, Lists.newArrayList(bucketsToRun)));
config.setGranularitySpec(
new UniformGranularitySpec(
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
Lists.newArrayList(bucketsToRun),
segmentGranularity
)
);
return super.addInputPaths(config, job);
}

View File

@ -21,416 +21,27 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class HadoopDruidIndexerConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testGranularitySpec()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")),
granularitySpec.getIntervals().get()
);
Assert.assertEquals(
"getSegmentGranularity",
"HOUR",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testGranularitySpecLegacy()
{
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals().get()
);
Assert.assertEquals(
"getSegmentGranularity",
"DAY",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testInvalidGranularityCombination()
{
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"],"
+ "\"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testPartitionsSpecAutoDimension()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertTrue(
"partitionSpec",
partitionsSpec instanceof SingleDimensionPartitionsSpec
);
}
@Test
public void testPartitionsSpecSpecificDimensionLegacy()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecLegacy()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecMaxPartitionSize()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
200
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testInvalidPartitionsCombination()
{
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopDruidIndexerConfig cfg;
cfg = jsonReadWriteRead(
"{"
+ "\"updaterJobSpec\":{\n"
+ " \"type\" : \"db\",\n"
+ " \"connectURI\" : \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\" : \"rofl\",\n"
+ " \"password\" : \"p4ssw0rd\",\n"
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
final DbUpdaterJobSpec spec = cfg.getUpdaterJobSpec();
final DbConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());
Assert.assertEquals("rofl", connectorConfig.getUser());
Assert.assertEquals("p4ssw0rd", connectorConfig.getPassword());
Assert.assertEquals(false, connectorConfig.isUseValidationQuery());
}
@Test
public void testDefaultSettings()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
cfg.isCleanupOnFailure(),
true
);
Assert.assertEquals(
"overwriteFiles",
cfg.isOverwriteFiles(),
false
);
Assert.assertEquals(
"isDeterminingPartitions",
cfg.getPartitionsSpec().isDeterminingPartitions(),
false
);
}
@Test
public void testNoCleanupOnFailure()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{\"cleanupOnFailure\":false}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
cfg.isCleanupOnFailure(),
false
);
}
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
final HadoopDruidIndexerConfig cfg;
HadoopIngestionSchema schema;
try {
cfg = jsonReadWriteRead(
schema = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
@ -440,14 +51,21 @@ public class HadoopDruidIndexerConfigTest
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
+ "}",
HadoopDruidIndexerConfig.class
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withDriverConfig(
schema.getDriverConfig()
.withVersion(
"some:brand:new:version"
)
)
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket);
@ -460,10 +78,10 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopDruidIndexerConfig cfg;
HadoopIngestionSchema schema;
try {
cfg = jsonReadWriteRead(
schema = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
@ -473,15 +91,16 @@ public class HadoopDruidIndexerConfigTest
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withDriverConfig(schema.getDriverConfig().withVersion("some:brand:new:version"))
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals(
@ -501,46 +120,4 @@ public class HadoopDruidIndexerConfigTest
}
}
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
}

View File

@ -0,0 +1,479 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
public class HadoopIngestionSchemaTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testGranularitySpec()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) schema.getDataSchema().getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")),
granularitySpec.getIntervals().get()
);
Assert.assertEquals(
"getSegmentGranularity",
"HOUR",
granularitySpec.getSegmentGranularity().toString()
);
}
@Test
public void testGranularitySpecLegacy()
{
// Deprecated and replaced by granularitySpec, but still supported
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) schema.getDataSchema().getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals().get()
);
Assert.assertEquals(
"getSegmentGranularity",
"DAY",
granularitySpec.getSegmentGranularity().toString()
);
}
@Test
public void testInvalidGranularityCombination()
{
boolean thrown = false;
try {
final HadoopIngestionSchema schema = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"],"
+ "\"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testPartitionsSpecAutoDimension()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertTrue(
"partitionSpec",
partitionsSpec instanceof SingleDimensionPartitionsSpec
);
}
@Test
public void testPartitionsSpecSpecificDimensionLegacy()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecLegacy()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecMaxPartitionSize()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
200
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo"
);
}
@Test
public void testInvalidPartitionsCombination()
{
boolean thrown = false;
try {
final HadoopIngestionSchema schema = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopIngestionSchema schema;
schema = jsonReadWriteRead(
"{"
+ "\"updaterJobSpec\":{\n"
+ " \"type\" : \"db\",\n"
+ " \"connectURI\" : \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\" : \"rofl\",\n"
+ " \"password\" : \"p4ssw0rd\",\n"
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
+ "}",
HadoopIngestionSchema.class
);
final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final DbConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());
Assert.assertEquals("rofl", connectorConfig.getUser());
Assert.assertEquals("p4ssw0rd", connectorConfig.getPassword());
Assert.assertEquals(false, connectorConfig.isUseValidationQuery());
}
@Test
public void testDefaultSettings()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
schema.getDriverConfig().isCleanupOnFailure(),
true
);
Assert.assertEquals(
"overwriteFiles",
schema.getDriverConfig().isOverwriteFiles(),
false
);
Assert.assertEquals(
"isDeterminingPartitions",
schema.getDriverConfig().getPartitionsSpec().isDeterminingPartitions(),
false
);
}
@Test
public void testNoCleanupOnFailure()
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{\"cleanupOnFailure\":false}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
schema.getDriverConfig().isCleanupOnFailure(),
false
);
}
private <T> T jsonReadWriteRead(String s, Class<T> klass)
{
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopIngestionSchema schema;
try {
schema = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopIngestionSchema.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = schema.getDriverConfig().getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
}

View File

@ -38,6 +38,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -87,10 +88,14 @@ public class YeOldePlumberSchool implements PlumberSchool
}
@Override
public Plumber findPlumber(final DataSchema schema, final FireDepartmentMetrics metrics)
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final FireDepartmentMetrics metrics
)
{
// There can be only one.
final Sink theSink = new Sink(interval, schema, version);
final Sink theSink = new Sink(interval, schema, config, version);
// Temporary directory to hold spilled segments.
final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getIdentifier());
@ -139,9 +144,9 @@ public class YeOldePlumberSchool implements PlumberSchool
// User should have persisted everything by now.
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
if(spilled.size() == 0) {
if (spilled.size() == 0) {
throw new IllegalStateException("Nothing indexed?");
} else if(spilled.size() == 1) {
} else if (spilled.size() == 1) {
fileToUpload = Iterables.getOnlyElement(spilled);
} else {
List<QueryableIndex> indexes = Lists.newArrayList();
@ -167,7 +172,8 @@ public class YeOldePlumberSchool implements PlumberSchool
segmentToUpload.getIdentifier()
);
} catch(Exception e) {
}
catch (Exception e) {
log.warn(e, "Failed to merge and upload");
throw Throwables.propagate(e);
}
@ -186,7 +192,7 @@ public class YeOldePlumberSchool implements PlumberSchool
private void spillIfSwappable()
{
if(theSink.swappable()) {
if (theSink.swappable()) {
final FireHydrant indexToPersist = theSink.swap();
final int rowsToPersist = indexToPersist.getIndex().size();
final File dirToPersist = getSpillDir(indexToPersist.getCount());
@ -206,7 +212,8 @@ public class YeOldePlumberSchool implements PlumberSchool
spilled.add(dirToPersist);
} catch(Exception e) {
}
catch (Exception e) {
log.warn(e, "Failed to spill index[%d]", indexToPersist.getCount());
throw Throwables.propagate(e);
}

View File

@ -32,7 +32,6 @@ import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.indexer.Jobby;
@ -95,9 +94,12 @@ public class HadoopIndexTask extends AbstractTask
);
// Some HadoopIngestionSchema stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
Preconditions.checkArgument(
schema.getIOConfig().getSegmentOutputPath() == null,
"segmentOutputPath must be absent"
);
Preconditions.checkArgument(schema.getDriverConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(schema.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
this.schema = schema;
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
@ -112,7 +114,7 @@ public class HadoopIndexTask extends AbstractTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Optional<SortedSet<Interval>> intervals = schema.getGranularitySpec().bucketIntervals();
Optional<SortedSet<Interval>> intervals = schema.getDataSchema().getGranularitySpec().bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
@ -171,7 +173,7 @@ public class HadoopIndexTask extends AbstractTask
jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
boolean determineIntervals = !schema.getGranularitySpec().bucketIntervals().isPresent();
boolean determineIntervals = !schema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName());
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod(
@ -195,7 +197,7 @@ public class HadoopIndexTask extends AbstractTask
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getGranularitySpec().bucketIntervals().get()
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
@ -244,10 +246,10 @@ public class HadoopIndexTask extends AbstractTask
schema,
HadoopIngestionSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version)
.build();
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema
.withDriverConfig(theSchema.getDriverConfig().withVersion(version))
);
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
@ -273,17 +275,17 @@ public class HadoopIndexTask extends AbstractTask
schema,
HadoopIngestionSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withWorkingPath(workingPath)
.withSegmentOutputPath(segmentOutputPath)
.build();
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(
theSchema
.withIOConfig(theSchema.getIOConfig().withSegmentOutputPath(segmentOutputPath))
.withDriverConfig(theSchema.getDriverConfig().withWorkingPath(workingPath))
);
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
log.info("Starting a hadoop index generator job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config.getSchema());
}
return null;

View File

@ -39,13 +39,15 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber;
@ -56,6 +58,7 @@ import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
@ -155,9 +158,9 @@ public class IndexTask extends AbstractFixedIntervalTask
getDataSource(),
firehoseFactory.getParser(),
aggregators,
granularitySpec.withQueryGranularity(indexGranularity),
shardSpec
granularitySpec.withQueryGranularity(indexGranularity)
),
shardSpec,
bucket,
myLock.getVersion()
);
@ -306,6 +309,7 @@ public class IndexTask extends AbstractFixedIntervalTask
private DataSegment generateSegment(
final TaskToolbox toolbox,
final DataSchema schema,
final ShardSpec shardSpec,
final Interval interval,
final String version
) throws IOException
@ -319,7 +323,7 @@ public class IndexTask extends AbstractFixedIntervalTask
interval.getStart(),
interval.getEnd(),
version,
schema.getShardSpec().getPartitionNum()
shardSpec.getPartitionNum()
)
);
@ -350,7 +354,7 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, metrics);
).findPlumber(schema, new RealtimeDriverConfig(0, new Period(), shardSpec), metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
@ -363,7 +367,7 @@ public class IndexTask extends AbstractFixedIntervalTask
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (shouldIndex(schema, interval, inputRow)) {
if (shouldIndex(shardSpec, interval, inputRow)) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
throw new NullPointerException(
@ -400,7 +404,7 @@ public class IndexTask extends AbstractFixedIntervalTask
+ " and output %,d rows",
getId(),
interval,
schema.getShardSpec().getPartitionNum(),
shardSpec.getPartitionNum(),
metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
metrics.processed(),
metrics.unparseable(),
@ -420,9 +424,13 @@ public class IndexTask extends AbstractFixedIntervalTask
*
* @return true or false
*/
private boolean shouldIndex(final DataSchema schema, final Interval interval, final InputRow inputRow)
private boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
final InputRow inputRow
)
{
return interval.contains(inputRow.getTimestampFromEpoch()) && schema.getShardSpec().isInChunk(inputRow);
return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(inputRow);
}
@JsonProperty

View File

@ -44,9 +44,9 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentConfig;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
@ -306,13 +306,18 @@ public class RealtimeIndexTask extends AbstractTask
schema.getDataSource(),
firehoseFactory.getParser(),
schema.getAggregators(),
new GranularitySpec(realtimePlumberSchool.getSegmentGranularity(), schema.getIndexGranularity()),
schema.getShardSpec()
new UniformGranularitySpec(
realtimePlumberSchool.getSegmentGranularity(),
schema.getIndexGranularity(),
null,
realtimePlumberSchool.getSegmentGranularity()
)
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(firehoseFactory, realtimePlumberSchool);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
fireDepartmentConfig.getMaxRowsInMemory(),
fireDepartmentConfig.getIntermediatePersistPeriod()
fireDepartmentConfig.getIntermediatePersistPeriod(),
schema.getShardSpec()
);
final FireDepartment fireDepartment = new FireDepartment(
@ -326,7 +331,7 @@ public class RealtimeIndexTask extends AbstractTask
);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
this.plumber = realtimePlumberSchool.findPlumber(dataSchema, fireDepartment.getMetrics());
this.plumber = realtimePlumberSchool.findPlumber(dataSchema, driverConfig, fireDepartment.getMetrics());
try {
plumber.startJob();

View File

@ -54,7 +54,12 @@ public class TaskSerdeTest
final IndexTask task = new IndexTask(
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
@ -354,10 +359,16 @@ public class TaskSerdeTest
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopIngestionSchema(
null, null, null,
"foo",
new TimestampSpec("timestamp", "auto"),
new JSONDataSpec(ImmutableList.of("foo"), null),
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D")),
Granularity.DAY
),
ImmutableMap.<String, Object>of("paths", "bar"),
null,
null,
@ -371,6 +382,10 @@ public class TaskSerdeTest
null,
false,
null,
null,
null,
null,
null,
null
),
null

View File

@ -163,7 +163,8 @@ public class TaskLifecycleTest
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec)
throws SegmentLoadingException
{
return dataSegment;
}
@ -223,7 +224,12 @@ public class TaskLifecycleTest
final Task indexTask = new IndexTask(
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
@ -278,7 +284,7 @@ public class TaskLifecycleTest
final Task indexTask = new IndexTask(
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,

View File

@ -43,10 +43,6 @@ public class DataSchema
@JsonProperty("granularitySpec") GranularitySpec granularitySpec
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(aggregators, "metrics");
Preconditions.checkNotNull(granularitySpec, "granularitySpec");
this.dataSource = dataSource;
this.parser = parser;
this.aggregators = aggregators;

View File

@ -56,7 +56,7 @@ public class UniformGranularitySpec implements GranularitySpec
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();
for (Interval inputInterval : inputIntervals) {
Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
}
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);

View File

@ -129,7 +129,7 @@ public class FireDepartment implements IngestionSchema
public Plumber findPlumber()
{
return ioConfig.getPlumberSchool().findPlumber(dataSchema, metrics);
return ioConfig.getPlumberSchool().findPlumber(dataSchema, driverConfig, metrics);
}
public Firehose connect() throws IOException

View File

@ -9,6 +9,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
@ -40,6 +41,7 @@ public class FlushingPlumber extends RealtimePlumber
File basePersistDirectory,
Granularity segmentGranularity,
DataSchema schema,
RealtimeDriverConfig config,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
@ -55,6 +57,7 @@ public class FlushingPlumber extends RealtimePlumber
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,
@ -144,7 +147,7 @@ public class FlushingPlumber extends RealtimePlumber
String.format(
"%s-flusher-%d",
getSchema().getDataSource(),
getSchema().getShardSpec().getPartitionNum()
getConfig().getShardSpec().getPartitionNum()
)
)
{

View File

@ -29,6 +29,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
@ -96,7 +97,11 @@ public class FlushingPlumberSchool implements PlumberSchool
}
@Override
public Plumber findPlumber(final DataSchema schema, final FireDepartmentMetrics metrics)
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final FireDepartmentMetrics metrics
)
{
verifyState();
@ -109,6 +114,7 @@ public class FlushingPlumberSchool implements PlumberSchool
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.Granularity;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
/**
@ -39,7 +40,7 @@ public interface PlumberSchool
*
* @return returns a plumber
*/
public Plumber findPlumber(DataSchema schema, FireDepartmentMetrics metrics);
public Plumber findPlumber(DataSchema schema, RealtimeDriverConfig config, FireDepartmentMetrics metrics);
public Granularity getSegmentGranularity();
}

View File

@ -36,6 +36,7 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -73,6 +74,7 @@ public class RealtimePlumber implements Plumber
private final File basePersistDirectory;
private final Granularity segmentGranularity;
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final FireDepartmentMetrics metrics;
private final RejectionPolicy rejectionPolicy;
private final ServiceEmitter emitter;
@ -101,6 +103,7 @@ public class RealtimePlumber implements Plumber
File basePersistDirectory,
Granularity segmentGranularity,
DataSchema schema,
RealtimeDriverConfig config,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
@ -118,6 +121,7 @@ public class RealtimePlumber implements Plumber
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.schema = schema;
this.config = config;
this.metrics = metrics;
this.rejectionPolicy = rejectionPolicy;
this.emitter = emitter;
@ -136,6 +140,11 @@ public class RealtimePlumber implements Plumber
return schema;
}
public RealtimeDriverConfig getConfig()
{
return config;
}
public Period getWindowPeriod()
{
return windowPeriod;
@ -188,7 +197,7 @@ public class RealtimePlumber implements Plumber
segmentGranularity.increment(new DateTime(truncatedTime))
);
retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval));
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval));
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
@ -516,7 +525,7 @@ public class RealtimePlumber implements Plumber
sinkInterval.getStart(),
sinkInterval.getEnd(),
versioningPolicy.getVersion(sinkInterval),
schema.getShardSpec()
config.getShardSpec()
),
IndexIO.loadIndex(segmentDir)
),
@ -525,7 +534,7 @@ public class RealtimePlumber implements Plumber
);
}
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
@ -570,7 +579,7 @@ public class RealtimePlumber implements Plumber
String.format(
"%s-overseer-%d",
schema.getDataSource(),
schema.getShardSpec().getPartitionNum()
config.getShardSpec().getPartitionNum()
)
)
{

View File

@ -30,6 +30,7 @@ import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher;
@ -164,7 +165,11 @@ public class RealtimePlumberSchool implements PlumberSchool
}
@Override
public Plumber findPlumber(final DataSchema schema, final FireDepartmentMetrics metrics)
public Plumber findPlumber(
final DataSchema schema,
final RealtimeDriverConfig config,
final FireDepartmentMetrics metrics
)
{
verifyState();
@ -176,6 +181,7 @@ public class RealtimePlumberSchool implements PlumberSchool
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,

View File

@ -33,6 +33,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.DataSegment;
@ -54,16 +55,19 @@ public class Sink implements Iterable<FireHydrant>
private final Interval interval;
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
public Sink(
Interval interval,
DataSchema schema,
RealtimeDriverConfig config,
String version
)
{
this.schema = schema;
this.config = config;
this.interval = interval;
this.version = version;
@ -73,11 +77,13 @@ public class Sink implements Iterable<FireHydrant>
public Sink(
Interval interval,
DataSchema schema,
RealtimeDriverConfig config,
String version,
List<FireHydrant> hydrants
)
{
this.schema = schema;
this.config = config;
this.interval = interval;
this.version = version;
@ -160,7 +166,7 @@ public class Sink implements Iterable<FireHydrant>
}
}
),
schema.getShardSpec(),
config.getShardSpec(),
null,
0
);

View File

@ -39,12 +39,14 @@ public class UniformGranularityTest
{
final GranularitySpec spec = new UniformGranularitySpec(
Granularity.DAY,
null,
Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
)
),
null
);
Assert.assertEquals(
@ -96,12 +98,14 @@ public class UniformGranularityTest
{
final GranularitySpec spec = new UniformGranularitySpec(
Granularity.DAY,
null,
Lists.newArrayList(
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-01-07T00Z/2012-01-08T00Z"),
new Interval("2012-01-03T00Z/2012-01-04T00Z"),
new Interval("2012-01-01T00Z/2012-01-03T00Z")
)
),
null
);
try {
@ -113,10 +117,11 @@ public class UniformGranularityTest
);
Assert.assertEquals(
"Round-trip granularity",
((UniformGranularitySpec) spec).getGranularity(),
((UniformGranularitySpec) rtSpec).getGranularity()
spec.getSegmentGranularity(),
rtSpec.getSegmentGranularity()
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

View File

@ -34,13 +34,12 @@ import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.utils.Runnables;
import junit.framework.Assert;
import org.joda.time.DateTime;
@ -74,8 +73,7 @@ public class RealtimeManagerTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(Granularity.HOUR, QueryGranularity.NONE),
new NoneShardSpec()
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
@ -96,7 +94,7 @@ public class RealtimeManagerTest
{
@Override
public Plumber findPlumber(
DataSchema schema, FireDepartmentMetrics metrics
DataSchema schema, RealtimeDriverConfig config, FireDepartmentMetrics metrics
)
{
return plumber;
@ -109,14 +107,15 @@ public class RealtimeManagerTest
}
}
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, new DateTime().toString()));
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, driverConfig, new DateTime().toString()));
realtimeManager = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
ioConfig,
new RealtimeDriverConfig(1, new Period("P1Y")),
driverConfig,
null, null, null, null
)
),

View File

@ -40,13 +40,13 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
@ -105,8 +105,7 @@ public class RealtimePlumberSchoolTest
}
},
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(Granularity.HOUR, QueryGranularity.NONE),
new NoneShardSpec()
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
@ -143,7 +142,9 @@ public class RealtimePlumberSchoolTest
realtimePlumberSchool.setEmitter(emitter);
realtimePlumberSchool.setQueryExecutorService(MoreExecutors.sameThreadExecutor());
plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
plumber = realtimePlumberSchool.findPlumber(schema, driverConfig, new FireDepartmentMetrics());
}
@After

View File

@ -27,12 +27,14 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.GranularitySpec;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireHydrant;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Test;
import java.util.List;
@ -48,13 +50,13 @@ public class SinkTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new GranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE),
new NoneShardSpec()
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR)
);
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
final Sink sink = new Sink(interval, schema, version);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
final Sink sink = new Sink(interval, schema, driverConfig, version);
sink.add(new InputRow()
{

View File

@ -24,7 +24,6 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import java.io.File;
@ -58,9 +57,9 @@ public class CliInternalHadoopIndexer implements Runnable
{
try {
if (argumentSpec.startsWith("{")) {
return HadoopDruidIndexerConfigBuilder.fromString(argumentSpec);
return HadoopDruidIndexerConfig.fromString(argumentSpec);
} else {
return HadoopDruidIndexerConfigBuilder.fromFile(new File(argumentSpec));
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
}
}
catch (Exception e) {