merge with 0.7.x and resolve any conflicts

This commit is contained in:
fjy 2014-10-23 17:24:06 -07:00
parent f790a05bd7
commit bef74104d9
295 changed files with 29010 additions and 2080 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -71,7 +71,8 @@ public class DruidSecondaryModule implements Module
binder.install(new DruidGuiceExtensions()); binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties); binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory); binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)); // make objectMapper eager to ensure jackson gets setup with guice injection for JsonConfigurator
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)).asEagerSingleton();
binder.bind(Validator.class).toInstance(validator); binder.bind(Validator.class).toInstance(validator);
binder.bind(JsonConfigurator.class).toInstance(jsonConfigurator); binder.bind(JsonConfigurator.class).toInstance(jsonConfigurator);
} }

View File

@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit #For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/* DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:" echo "Running command:"

View File

@ -0,0 +1,23 @@
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"]
# Zookeeper
druid.zk.service.host=localhost
# Metadata Storage
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
# Deep storage
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord
# Monitoring (disabled for examples)
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
# Metrics logging (disabled for examples)
druid.emitter=noop

View File

@ -2,8 +2,6 @@ druid.host=localhost
druid.service=broker druid.service=broker
druid.port=8080 druid.port=8080
druid.zk.service.host=localhost # Bump these up only for faster nested groupBy
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1

View File

@ -2,10 +2,4 @@ druid.host=localhost
druid.service=coordinator druid.service=coordinator
druid.port=8082 druid.port=8082
druid.zk.service.host=localhost
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.coordinator.startDelay=PT70s druid.coordinator.startDelay=PT70s

View File

@ -2,9 +2,7 @@ druid.host=localhost
druid.service=historical druid.service=historical
druid.port=8081 druid.port=8081
druid.zk.service.host=localhost druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -16,4 +14,4 @@ druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

View File

@ -1,18 +1,8 @@
druid.host=localhost druid.host=localhost
druid.port=8087 druid.port=8080
druid.service=overlord druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m" druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.processing.numThreads=1 druid.indexer.fork.property.druid.processing.numThreads=1
druid.indexer.fork.property.druid.computation.buffer.size=100000000 druid.indexer.fork.property.druid.computation.buffer.size=100000000

View File

@ -2,19 +2,11 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147","io.druid.extensions:druid-rabbitmq:0.6.147"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop
# These configs are only required for real hand off
# druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"] # Enable Real monitoring
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -27,7 +27,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -87,6 +87,13 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()"); throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()");
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
}
@Override @Override
public void close() public void close()
{ {

View File

@ -91,6 +91,12 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()"); throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()");
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
}
@Override @Override
public void close() public void close()
{ {

View File

@ -86,7 +86,8 @@ public class ApproximateHistogramGroupByQueryTest
engine, engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER, QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier, configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine) new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool),
pool
); );
GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig() GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig()
@ -106,7 +107,8 @@ public class ApproximateHistogramGroupByQueryTest
singleThreadEngine, singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER, QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier, singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine) new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
pool
); );

View File

@ -48,6 +48,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -240,7 +241,8 @@ public class ApproximateHistogramTopNQueryTest
) )
) )
); );
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query)); TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
} }
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -48,7 +48,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec; import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -394,6 +393,11 @@ public class HadoopDruidIndexerConfig
} }
} }
public boolean isPersistInHeap()
{
return schema.getTuningConfig().isPersistInHeap();
}
/****************************************** /******************************************
Path helper logic Path helper logic
******************************************/ ******************************************/

View File

@ -57,7 +57,11 @@ public class HadoopDruidIndexerJob implements Jobby
List<Jobby> jobs = Lists.newArrayList(); List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config); JobHelper.ensurePaths(config);
indexJob = new IndexGeneratorJob(config); if (config.isPersistInHeap()) {
indexJob = new IndexGeneratorJob(config);
} else {
indexJob = new LegacyIndexGeneratorJob(config);
}
jobs.add(indexJob); jobs.add(indexJob);
if (dbUpdaterJob != null) { if (dbUpdaterJob != null) {
@ -66,15 +70,17 @@ public class HadoopDruidIndexerJob implements Jobby
log.info("No updaterJobSpec set, not uploading to database"); log.info("No updaterJobSpec set, not uploading to database");
} }
jobs.add(new Jobby() jobs.add(
{ new Jobby()
@Override {
public boolean run() @Override
{ public boolean run()
publishedSegments = IndexGeneratorJob.getPublishedSegments(config); {
return true; publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
} return true;
}); }
}
);
JobHelper.runJobs(jobs, config); JobHelper.runJobs(jobs, config);

View File

@ -159,13 +159,15 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
version, version,
thePartitionSpec, thePartitionSpec,
shardSpecs, shardSpecs,
rollupSpec == null ? 50000 : rollupSpec.rowFlushBoundary, null,
leaveIntermediate, leaveIntermediate,
cleanupOnFailure, cleanupOnFailure,
overwriteFiles, overwriteFiles,
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText combineText,
false,
false
); );
} }
} }
@ -289,4 +291,4 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null null
); );
} }
} }

View File

@ -53,6 +53,8 @@ public class HadoopTuningConfig implements TuningConfig
false, false,
false, false,
null, null,
false,
false,
false false
); );
} }
@ -68,6 +70,8 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean ignoreInvalidRows; private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties; private final Map<String, String> jobProperties;
private final boolean combineText; private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator @JsonCreator
public HadoopTuningConfig( public HadoopTuningConfig(
@ -81,7 +85,9 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties, final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap
) )
{ {
this.workingPath = workingPath == null ? null : workingPath; this.workingPath = workingPath == null ? null : workingPath;
@ -97,6 +103,8 @@ public class HadoopTuningConfig implements TuningConfig
? ImmutableMap.<String, String>of() ? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties)); : ImmutableMap.copyOf(jobProperties));
this.combineText = combineText; this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
} }
@JsonProperty @JsonProperty
@ -165,6 +173,17 @@ public class HadoopTuningConfig implements TuningConfig
return combineText; return combineText;
} }
@JsonProperty
public boolean isPersistInHeap()
{
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public HadoopTuningConfig withWorkingPath(String path) public HadoopTuningConfig withWorkingPath(String path)
{ {
return new HadoopTuningConfig( return new HadoopTuningConfig(
@ -178,7 +197,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles, overwriteFiles,
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText combineText,
persistInHeap,
ingestOffheap
); );
} }
@ -195,7 +216,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles, overwriteFiles,
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText combineText,
persistInHeap,
ingestOffheap
); );
} }
@ -212,7 +235,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles, overwriteFiles,
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText combineText,
persistInHeap,
ingestOffheap
); );
} }
} }

View File

@ -35,13 +35,17 @@ import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMaker;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
@ -86,20 +90,9 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby public class IndexGeneratorJob implements Jobby
{ {
private static final Logger log = new Logger(IndexGeneratorJob.class); private static final Logger log = new Logger(IndexGeneratorJob.class);
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.jobStats = new IndexGeneratorStats();
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{ {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@ -130,6 +123,22 @@ public class IndexGeneratorJob implements Jobby
return publishedSegments; return publishedSegments;
} }
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.jobStats = new IndexGeneratorStats();
}
protected void setReducerClass(final Job job)
{
job.setReducerClass(IndexGeneratorReducer.class);
}
public IndexGeneratorStats getJobStats() public IndexGeneratorStats getJobStats()
{ {
return jobStats; return jobStats;
@ -161,7 +170,7 @@ public class IndexGeneratorJob implements Jobby
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get())); job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
job.setPartitionerClass(IndexGeneratorPartitioner.class); job.setPartitionerClass(IndexGeneratorPartitioner.class);
job.setReducerClass(IndexGeneratorReducer.class); setReducerClass(job);
job.setOutputKeyClass(BytesWritable.class); job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class); job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class); job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
@ -190,7 +199,6 @@ public class IndexGeneratorJob implements Jobby
} }
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{ {
@Override @Override
protected void innerMap( protected void innerMap(
@ -256,6 +264,42 @@ public class IndexGeneratorJob implements Jobby
private List<String> metricNames = Lists.newArrayList(); private List<String> metricNames = Lists.newArrayList();
private StringInputRowParser parser; private StringInputRowParser parser;
protected ProgressIndicator makeProgressIndicator(final Context context)
{
return new LoggingProgressIndicator("IndexGeneratorJob")
{
@Override
public void progress()
{
context.progress();
}
};
}
protected File persist(
final IncrementalIndex index,
final Interval interval,
final File file,
final ProgressIndicator progressIndicator
) throws IOException
{
return IndexMaker.persist(
index, interval, file, progressIndicator
);
}
protected File mergeQueryableIndex(
final List<QueryableIndex> indexes,
final AggregatorFactory[] aggs,
final File file,
ProgressIndicator progressIndicator
) throws IOException
{
return IndexMaker.mergeQueryableIndex(
indexes, aggs, file, progressIndicator
);
}
@Override @Override
protected void setup(Context context) protected void setup(Context context)
throws IOException, InterruptedException throws IOException, InterruptedException
@ -282,113 +326,84 @@ public class IndexGeneratorJob implements Jobby
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs); IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
baseFlushFile.mkdirs();
File baseFlushFile = File.createTempFile("base", "flush"); Set<File> toMerge = Sets.newTreeSet();
baseFlushFile.delete(); int indexCount = 0;
baseFlushFile.mkdirs(); int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<File> toMerge = Sets.newTreeSet(); Set<String> allDimensionNames = Sets.newHashSet();
int indexCount = 0; final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = Sets.newHashSet();
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
toMerge.add(file);
for (final Text value : values) {
context.progress(); context.progress();
IndexMerger.persist( final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
index, interval, file, new IndexMerger.ProgressIndicator() allDimensionNames.addAll(inputRow.getDimensions());
{
@Override
public void progress()
{
context.progress();
}
}
);
index = makeIncrementalIndex(bucket, aggs);
startTime = System.currentTimeMillis(); int numRows = index.add(inputRow);
++indexCount; ++lineCount;
}
}
log.info("%,d lines completed.", lineCount); if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount); final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
final File mergedBase; toMerge.add(file);
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
IndexMerger.persist(
index, interval, mergedBase, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress(); context.progress();
persist(index, interval, file, progressIndicator);
// close this index and make a new one
index.close();
index = makeIncrementalIndex(bucket, aggs);
startTime = System.currentTimeMillis();
++indexCount;
} }
} }
);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
toMerge.add(finalFile);
}
log.info("%,d lines completed.", lineCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
persist(index, interval, mergedBase, progressIndicator);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
persist(index, interval, finalFile, progressIndicator);
toMerge.add(finalFile);
}
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
);
}
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) { for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file)); FileUtils.deleteDirectory(file);
} }
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
} }
finally {
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); index.close();
for (File file : toMerge) {
FileUtils.deleteDirectory(file);
} }
} }
@ -616,14 +631,29 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{ {
return new IncrementalIndex( int aggsSize = 0;
new IncrementalIndexSchema.Builder() for (AggregatorFactory agg : aggs) {
.withMinTimestamp(theBucket.time.getMillis()) aggsSize += agg.getMaxIntermediateSize();
.withSpatialDimensions(config.getSchema().getDataSchema().getParser()) }
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
.withMetrics(aggs) int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
.build() final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
); .withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
} else {
return new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
}
} }
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException private void createNewZipEntry(ZipOutputStream out, String name) throws IOException

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.IndexMerger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
/**
*/
public class LegacyIndexGeneratorJob extends IndexGeneratorJob
{
public LegacyIndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
super(config);
}
@Override
protected void setReducerClass(Job job)
{
job.setReducerClass(LegacyIndexGeneratorReducer.class);
}
public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer
{
@Override
protected ProgressIndicator makeProgressIndicator(final Context context)
{
return new BaseProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
};
}
@Override
protected File persist(
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.persist(index, interval, file, progressIndicator);
}
@Override
protected File mergeQueryableIndex(
List<QueryableIndex> indexes,
AggregatorFactory[] aggs,
File file,
ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator);
}
}
}

View File

@ -20,6 +20,7 @@
package io.druid.indexer.rollup; package io.druid.indexer.rollup;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Lists;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -30,32 +31,17 @@ import java.util.List;
* *
* Adjust to JsonCreator and final fields when resolved. * Adjust to JsonCreator and final fields when resolved.
*/ */
@Deprecated
public class DataRollupSpec public class DataRollupSpec
{ {
@JsonProperty @JsonProperty
public List<AggregatorFactory> aggs; public List<AggregatorFactory> aggs = Lists.newArrayList();
@JsonProperty @JsonProperty
public QueryGranularity rollupGranularity = QueryGranularity.NONE; public QueryGranularity rollupGranularity = QueryGranularity.NONE;
@JsonProperty
public int rowFlushBoundary = 500000;
public DataRollupSpec() {}
public DataRollupSpec(List<AggregatorFactory> aggs, QueryGranularity rollupGranularity)
{
this.aggs = aggs;
this.rollupGranularity = rollupGranularity;
}
public List<AggregatorFactory> getAggs() public List<AggregatorFactory> getAggs()
{ {
return aggs; return aggs;
} }
public QueryGranularity getRollupGranularity()
{
return rollupGranularity;
}
} }

View File

@ -152,6 +152,10 @@ public class HadoopDruidIndexerConfigTest
for (int i = 0; i < partitionCount; i++) { for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i)); specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
} }
// Backwards compatibility
DataRollupSpec rollupSpec = new DataRollupSpec();
rollupSpec.rollupGranularity = QueryGranularity.MINUTE;
HadoopIngestionSpec spec = new HadoopIngestionSpec( HadoopIngestionSpec spec = new HadoopIngestionSpec(
null, null, null, null, null, null,
"foo", "foo",
@ -172,7 +176,7 @@ public class HadoopDruidIndexerConfigTest
true, true,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs), ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
false, false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.MINUTE), rollupSpec,
null, null,
false, false,
ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar"),

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -35,7 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -166,7 +166,7 @@ public class YeOldePlumberSchool implements PlumberSchool
} }
fileToUpload = new File(tmpSegmentDir, "merged"); fileToUpload = new File(tmpSegmentDir, "merged");
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload); IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
} }
// Map merged segment so we can extract dimensions // Map merged segment so we can extract dimensions
@ -211,8 +211,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist); log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try { try {
IndexMaker.persist(
IndexMerger.persist(
indexToPersist.getIndex(), indexToPersist.getIndex(),
dirToPersist dirToPersist
); );

View File

@ -27,7 +27,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMaker;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter; import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat; import io.druid.segment.Rowboat;
@ -106,7 +106,7 @@ public class AppendTask extends MergeTaskBase
); );
} }
return IndexMerger.append(adapters, outDir); return IndexMaker.append(adapters, outDir);
} }
@Override @Override

View File

@ -1,109 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
public class DeleteTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(DeleteTask.class);
@JsonCreator
public DeleteTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
id != null ? id : String.format(
"delete_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
dataSource,
Preconditions.checkNotNull(interval, "interval")
);
}
@Override
public String getType()
{
return "delete";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
log.info(
"Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
);
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
}

View File

@ -147,7 +147,7 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity) granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
), ),
new IndexIOConfig(firehoseFactory), new IndexIOConfig(firehoseFactory),
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null) new IndexTuningConfig(targetPartitionSize, 0, null)
); );
} }
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -401,7 +401,11 @@ public class IndexTask extends AbstractFixedIntervalTask
version, version,
wrappedDataSegmentPusher, wrappedDataSegmentPusher,
tmpDir tmpDir
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics); ).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
metrics
);
// rowFlushBoundary for this job // rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0 final int myRowFlushBoundary = rowFlushBoundary > 0
@ -557,7 +561,7 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("targetPartitionSize") int targetPartitionSize, @JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary, @JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards @JsonProperty("numShards") @Nullable Integer numShards
) )
{ {
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;

View File

@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -60,7 +60,7 @@ public class MergeTask extends MergeTaskBase
public File merge(final Map<DataSegment, File> segments, final File outDir) public File merge(final Map<DataSegment, File> segments, final File outDir)
throws Exception throws Exception
{ {
return IndexMerger.mergeQueryableIndex( return IndexMaker.mergeQueryableIndex(
Lists.transform( Lists.transform(
ImmutableList.copyOf(segments.values()), ImmutableList.copyOf(segments.values()),
new Function<File, QueryableIndex>() new Function<File, QueryableIndex>()

View File

@ -143,7 +143,9 @@ public class RealtimeIndexTask extends AbstractTask
null, null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists, maxPendingPersists,
spec.getShardSpec() spec.getShardSpec(),
false,
false
), ),
null, null, null, null null, null, null, null
); );

View File

@ -43,7 +43,6 @@ import io.druid.query.QueryRunner;
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class), @JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class), @JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class), @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),

View File

@ -51,10 +51,11 @@ import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector; import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector; import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
@ -250,7 +251,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@Override @Override
public Sequence<InputRow> apply(final Cursor cursor) public Sequence<InputRow> apply(final Cursor cursor)
{ {
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap(); final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) { for (String dim : dims) {
@ -287,7 +288,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
public InputRow next() public InputRow next()
{ {
final Map<String, Object> theEvent = Maps.newLinkedHashMap(); final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp(); final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) { for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {

View File

@ -49,7 +49,7 @@ public class ForkingTaskRunnerConfig
@JsonProperty @JsonProperty
@Min(1024) @Min(1024)
@Max(65535) @Max(65535)
private int startPort = 8081; private int startPort = 8100;
@JsonProperty @JsonProperty
@NotNull @NotNull

View File

@ -243,51 +243,6 @@ public class TaskSerdeTest
); );
} }
@Test
public void testDeleteTaskSerde() throws Exception
{
final DeleteTask task = new DeleteTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testDeleteTaskFromJson() throws Exception
{
final DeleteTask task = (DeleteTask) jsonMapper.readValue(
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
Task.class
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertNotNull(task.getId());
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test @Test
public void testAppendTaskSerde() throws Exception public void testAppendTaskSerde() throws Exception
{ {
@ -413,7 +368,7 @@ public class TaskSerdeTest
true, true,
null, null,
false, false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE), null,
null, null,
false, false,
ImmutableMap.of("foo", "bar"), ImmutableMap.of("foo", "bar"),

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

18
pom.xml
View File

@ -18,19 +18,20 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection> <connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url> <url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.159-SNAPSHOT</tag> <tag>druid-0.7.0-SNAPSHOT</tag>
</scm> </scm>
<prerequisites> <prerequisites>
@ -41,7 +42,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.9</metamx.java-util.version> <metamx.java-util.version>0.26.9</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version> <apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.15</druid.api.version> <druid.api.version>0.2.16</druid.api.version>
</properties> </properties>
<modules> <modules>
@ -74,7 +75,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
<version>0.2.11</version> <version>0.2.12</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
@ -389,7 +390,7 @@
<dependency> <dependency>
<groupId>net.jpountz.lz4</groupId> <groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId> <artifactId>lz4</artifactId>
<version>1.1.2</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.protobuf</groupId> <groupId>com.google.protobuf</groupId>
@ -434,6 +435,11 @@
<version>2.3.0</version> <version>2.3.0</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<!-- Test Scope --> <!-- Test Scope -->
<dependency> <dependency>

View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>
@ -28,7 +29,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version> <version>0.7.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -82,6 +83,14 @@
<groupId>com.davekoelle</groupId> <groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId> <artifactId>alphanum</artifactId>
</dependency> </dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
@ -95,10 +104,10 @@
<artifactId>easymock</artifactId> <artifactId>easymock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.caliper</groupId> <groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId> <artifactId>caliper</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -26,6 +26,7 @@ import com.google.inject.Injector;
import com.google.inject.Module; import com.google.inject.Module;
import io.druid.jackson.JacksonModule; import io.druid.jackson.JacksonModule;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
@ -37,7 +38,7 @@ public class GuiceInjectors
return Guice.createInjector( return Guice.createInjector(
new DruidGuiceExtensions(), new DruidGuiceExtensions(),
new JacksonModule(), new JacksonModule(),
new PropertiesModule("runtime.properties"), new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")),
new ConfigModule(), new ConfigModule(),
new Module() new Module()
{ {
@ -56,7 +57,7 @@ public class GuiceInjectors
List<Module> theModules = Lists.newArrayList(); List<Module> theModules = Lists.newArrayList();
theModules.add(new DruidGuiceExtensions()); theModules.add(new DruidGuiceExtensions());
theModules.add(new JacksonModule()); theModules.add(new JacksonModule());
theModules.add(new PropertiesModule("runtime.properties")); theModules.add(new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")));
theModules.add(new ConfigModule()); theModules.add(new ConfigModule());
theModules.add( theModules.add(
new Module() new Module()

View File

@ -17,10 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.guice; package io.druid.guice;;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
@ -33,7 +34,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.Reader; import java.util.List;
import java.util.Properties; import java.util.Properties;
/** /**
@ -42,11 +43,11 @@ public class PropertiesModule implements Module
{ {
private static final Logger log = new Logger(PropertiesModule.class); private static final Logger log = new Logger(PropertiesModule.class);
private final String propertiesFile; private final List<String> propertiesFiles;
public PropertiesModule(String propertiesFile) public PropertiesModule(List<String> propertiesFiles)
{ {
this.propertiesFile = propertiesFile; this.propertiesFiles = propertiesFiles;
} }
@Override @Override
@ -58,30 +59,32 @@ public class PropertiesModule implements Module
Properties props = new Properties(fileProps); Properties props = new Properties(fileProps);
props.putAll(systemProps); props.putAll(systemProps);
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile); for (String propertiesFile : propertiesFiles) {
try { InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
if (stream == null) { try {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile)); if (stream == null) {
if (workingDirectoryFile.exists()) { File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile)); if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
}
} }
}
if (stream != null) { if (stream != null) {
log.info("Loading properties from %s", propertiesFile); log.info("Loading properties from %s", propertiesFile);
try(Reader reader = new InputStreamReader(stream, Charsets.UTF_8)) { try {
fileProps.load(reader); fileProps.load(new InputStreamReader(stream, Charsets.UTF_8));
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
}
} }
} }
} catch (FileNotFoundException e) {
catch (FileNotFoundException e) { log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up."); }
} finally {
finally { CloseQuietly.close(stream);
CloseQuietly.close(stream); }
} }
binder.bind(Properties.class).toInstance(props); binder.bind(Properties.class).toInstance(props);

View File

@ -70,14 +70,14 @@ public abstract class BaseQuery<T> implements Query<T>
} }
@Override @Override
public Sequence<T> run(QuerySegmentWalker walker) public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
{ {
return run(querySegmentSpec.lookup(this, walker)); return run(querySegmentSpec.lookup(this, walker), context);
} }
public Sequence<T> run(QueryRunner<T> runner) public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{ {
return runner.run(this); return runner.run(this, context);
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import org.joda.time.DateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -48,11 +49,11 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{ {
if (query.getContextBySegment(false)) { if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query);
final Sequence<T> baseSequence = base.run(query, context);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList()); final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple( return Sequences.simple(
Arrays.asList( Arrays.asList(
@ -67,7 +68,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
) )
); );
} }
return base.run(query, context);
return base.run(query);
} }
} }

View File

@ -21,6 +21,8 @@ package io.druid.query;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import java.util.Map;
/** /**
*/ */
public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T> public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@ -35,14 +37,14 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(Query<T> query) public Sequence<T> run(Query<T> query, Map<String, Object> context)
{ {
if (query.getContextBySegment(false)) { if (query.getContextBySegment(false)) {
return baseRunner.run(query); return baseRunner.run(query, context);
} }
return doRun(baseRunner, query); return doRun(baseRunner, query, context);
} }
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query); protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);
} }

View File

@ -39,6 +39,7 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -93,7 +94,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
@ -124,7 +125,11 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception public Iterable<T> call() throws Exception
{ {
try { try {
Sequence<T> result = input.run(query); if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query, context);
if (result == null) { if (result == null) {
throw new ISE("Got a null result! Segments are missing!"); throw new ISE("Got a null result! Segments are missing!");
} }

View File

@ -23,6 +23,9 @@ import com.google.common.base.Function;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/** /**
*/ */
public class ConcatQueryRunner<T> implements QueryRunner<T> public class ConcatQueryRunner<T> implements QueryRunner<T>
@ -36,7 +39,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
return Sequences.concat( return Sequences.concat(
Sequences.map( Sequences.map(
@ -46,7 +49,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> apply(final QueryRunner<T> input) public Sequence<T> apply(final QueryRunner<T> input)
{ {
return input.run(query); return input.run(query, context);
} }
} }
) )

View File

@ -28,6 +28,10 @@ import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.aggregation.MetricManipulatorFns;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
/** /**
*/ */
public class FinalizeResultsQueryRunner<T> implements QueryRunner<T> public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@ -45,7 +49,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{ {
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true); final boolean shouldFinalize = query.getContextFinalize(true);
@ -98,7 +102,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
return Sequences.map( return Sequences.map(
baseRunner.run(queryToRun), baseRunner.run(queryToRun, context),
finalizerFn finalizerFn
); );

View File

@ -32,16 +32,20 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -56,11 +60,13 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
private final ListeningExecutorService exec; private final ListeningExecutorService exec;
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher; private final QueryWatcher queryWatcher;
private final StupidPool<ByteBuffer> bufferPool;
public GroupByParallelQueryRunner( public GroupByParallelQueryRunner(
ExecutorService exec, ExecutorService exec,
Supplier<GroupByQueryConfig> configSupplier, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher, QueryWatcher queryWatcher,
StupidPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<T>> queryables Iterable<QueryRunner<T>> queryables
) )
{ {
@ -68,15 +74,17 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
this.queryWatcher = queryWatcher; this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier; this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
} }
@Override @Override
public Sequence<T> run(final Query<T> queryParam) public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> context)
{ {
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
configSupplier.get() configSupplier.get(),
bufferPool
); );
final Pair<List, Accumulator<List, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final Pair<List, Accumulator<List, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false); final boolean bySegment = query.getContextBySegment(false);
@ -103,10 +111,11 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
{ {
try { try {
if (bySegment) { if (bySegment) {
input.run(queryParam) input.run(queryParam, context)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else { } else {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); input.run(queryParam, context)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
} }
return null; return null;
@ -140,17 +149,21 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
catch (InterruptedException e) { catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true); futures.cancel(true);
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query interrupted"); throw new QueryInterruptedException("Query interrupted");
} }
catch (CancellationException e) { catch (CancellationException e) {
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query cancelled"); throw new QueryInterruptedException("Query cancelled");
} }
catch (TimeoutException e) { catch (TimeoutException e) {
indexAccumulatorPair.lhs.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true); futures.cancel(true);
throw new QueryInterruptedException("Query timeout"); throw new QueryInterruptedException("Query timeout");
} }
catch (ExecutionException e) { catch (ExecutionException e) {
indexAccumulatorPair.lhs.close();
throw Throwables.propagate(e.getCause()); throw Throwables.propagate(e.getCause());
} }
@ -158,18 +171,20 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
return Sequences.simple(bySegmentAccumulatorPair.lhs); return Sequences.simple(bySegmentAccumulatorPair.lhs);
} }
return Sequences.simple( return new ResourceClosingSequence<T>(
Iterables.transform( Sequences.simple(
indexAccumulatorPair.lhs.iterableWithPostAggregations(null), Iterables.transform(
new Function<Row, T>() indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
{ new Function<Row, T>()
@Override {
public T apply(Row input) @Override
{ public T apply(Row input)
return (T) input; {
} return (T) input;
} }
) }
)
), indexAccumulatorPair.lhs
); );
} }
} }

View File

@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -48,10 +49,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
if (period.getMillis() == 0) { if (period.getMillis() == 0) {
return baseRunner.run(query); return baseRunner.run(query, context);
} }
return Sequences.concat( return Sequences.concat(
@ -74,7 +75,8 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
public Sequence<T> apply(Interval singleInterval) public Sequence<T> apply(Interval singleInterval)
{ {
return baseRunner.run( return baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))) query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
context
); );
} }
} }

View File

@ -28,6 +28,8 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -82,7 +84,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
final ServiceMetricEvent.Builder builder = builderFn.apply(query); final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId(); String queryId = query.getId();
@ -100,7 +102,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
retVal = queryRunner.run(query).accumulate(outType, accumulator); retVal = queryRunner.run(query, context).accumulate(outType, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setUser10("failed");
@ -130,7 +132,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
retVal = queryRunner.run(query).toYielder(initValue, accumulator); retVal = queryRunner.run(query, context).toYielder(initValue, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setUser10("failed");

View File

@ -22,12 +22,15 @@ package io.druid.query;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/** /**
*/ */
public class NoopQueryRunner<T> implements QueryRunner<T> public class NoopQueryRunner<T> implements QueryRunner<T>
{ {
@Override @Override
public Sequence<T> run(Query query) public Sequence<T> run(Query<T> query, Map<String, Object> context)
{ {
return Sequences.empty(); return Sequences.empty();
} }

View File

@ -62,9 +62,9 @@ public interface Query<T>
public String getType(); public String getType();
public Sequence<T> run(QuerySegmentWalker walker); public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context);
public Sequence<T> run(QueryRunner<T> runner); public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context);
public List<Interval> getIntervals(); public List<Interval> getIntervals();

View File

@ -21,9 +21,11 @@ package io.druid.query;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import java.util.Map;
/** /**
*/ */
public interface QueryRunner<T> public interface QueryRunner<T>
{ {
public Sequence<T> run(Query<T> query); public Sequence<T> run(Query<T> query, Map<String, Object> context);
} }

View File

@ -25,6 +25,8 @@ import com.metamx.common.guava.Sequence;
import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.ReferenceCountingSegment;
import java.io.Closeable; import java.io.Closeable;
import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -43,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{ {
final Closeable closeable = adapter.increment(); final Closeable closeable = adapter.increment();
try { try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query); final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, context);
return new ResourceClosingSequence<T>(baseSequence, closeable); return new ResourceClosingSequence<T>(baseSequence, closeable);
} }

View File

@ -24,6 +24,9 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
import io.druid.common.guava.CombiningSequence; import io.druid.common.guava.CombiningSequence;
import java.util.List;
import java.util.Map;
/** /**
*/ */
public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T> public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
@ -36,9 +39,9 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
} }
@Override @Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query) public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
{ {
return CombiningSequence.create(baseRunner.run(query), makeOrdering(query), createMergeFn(query)); return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
} }
protected abstract Ordering<T> makeOrdering(Query<T> query); protected abstract Ordering<T> makeOrdering(Query<T> query);

View File

@ -0,0 +1,113 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException;
import java.util.List;
import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
public static String MISSING_SEGMENTS_KEY = "missingSegments";
private final QueryRunner<T> baseRunner;
private final RetryQueryRunnerConfig config;
private final ObjectMapper jsonMapper;
public RetryQueryRunner(
QueryRunner<T> baseRunner,
RetryQueryRunnerConfig config,
ObjectMapper jsonMapper
)
{
this.baseRunner = baseRunner;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
final Sequence<T> returningSeq = baseRunner.run(query, context);
return new YieldingSequenceBase<T>()
{
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
Yielder<OutType> yielder = returningSeq.toYielder(initValue, accumulator);
final List<SegmentDescriptor> missingSegments = getMissingSegments(context);
if (missingSegments.isEmpty()) {
return yielder;
}
for (int i = 0; i < config.numTries(); i++) {
context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
)
);
yielder = baseRunner.run(retryQuery, context).toYielder(initValue, accumulator);
if (getMissingSegments(context).isEmpty()) {
break;
}
}
final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context);
if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
return yielder;
}
};
}
private List<SegmentDescriptor> getMissingSegments(final Map<String, Object> context)
{
final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY);
if (maybeMissingSegments == null) {
return Lists.newArrayList();
}
return jsonMapper.convertValue(
maybeMissingSegments,
new TypeReference<List<SegmentDescriptor>>()
{
}
);
}
}

View File

@ -0,0 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
public class RetryQueryRunnerConfig
{
@JsonProperty
private int numTries = 0;
@JsonProperty
private boolean returnPartialResults = false;
public int numTries() { return numTries; }
public boolean returnPartialResults() { return returnPartialResults; }
}

View File

@ -23,6 +23,9 @@ package io.druid.query;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import java.util.List;
import java.util.Map;
/** /**
* If there's a subquery, run it instead of the outer query * If there's a subquery, run it instead of the outer query
*/ */
@ -36,13 +39,13 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{ {
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery()); return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), context);
} else { } else {
return baseRunner.run(query); return baseRunner.run(query, context);
} }
} }
} }

View File

@ -33,6 +33,8 @@ import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map;
/** /**
* TimewarpOperator is an example post-processing operator that maps current time * TimewarpOperator is an example post-processing operator that maps current time
@ -79,7 +81,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>() return new QueryRunner<T>()
{ {
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
final long offset = computeOffset(now); final long offset = computeOffset(now);
@ -90,7 +92,8 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
); );
return Sequences.map( return Sequences.map(
baseRunner.run( baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))) query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
context
), ),
new Function<T, T>() new Function<T, T>()
{ {

View File

@ -26,6 +26,9 @@ import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
public class UnionQueryRunner<T> implements QueryRunner<T> public class UnionQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunner<T> baseRunner; private final QueryRunner<T> baseRunner;
@ -41,7 +44,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{ {
DataSource dataSource = query.getDataSource(); DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) { if (dataSource instanceof UnionDataSource) {
@ -55,7 +58,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
public Sequence<T> apply(DataSource singleSource) public Sequence<T> apply(DataSource singleSource)
{ {
return baseRunner.run( return baseRunner.run(
query.withDataSource(singleSource) query.withDataSource(singleSource),
context
); );
} }
} }
@ -63,7 +67,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
) )
); );
} else { } else {
return baseRunner.run(query); return baseRunner.run(query, context);
} }
} }

View File

@ -95,6 +95,13 @@ public class Aggregators
return 0; return 0;
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return 0L;
}
@Override @Override
public void close() public void close()
{ {

View File

@ -89,6 +89,23 @@ public interface BufferAggregator
*/ */
float getFloat(ByteBuffer buf, int position); float getFloat(ByteBuffer buf, int position);
/**
* Returns the long representation of the given aggregate byte array
*
* Converts the given byte buffer representation into the intermediate aggregate value.
*
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
*
* Implementations are only required to support this method if they are aggregations which
* have an {@link AggregatorFactory#getTypeName()} of "long".
* If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended.
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the long representation of the aggregate
*/
long getLong(ByteBuffer buf, int position);
/** /**
* Release any resources used by the aggregator * Release any resources used by the aggregator
*/ */

View File

@ -50,6 +50,13 @@ public class CountBufferAggregator implements BufferAggregator
return buf.getLong(position); return buf.getLong(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -60,6 +60,13 @@ public class DoubleSumBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -19,12 +19,9 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class FilteredBufferAggregator implements BufferAggregator public class FilteredBufferAggregator implements BufferAggregator
@ -65,6 +62,12 @@ public class FilteredBufferAggregator implements BufferAggregator
return delegate.get(buf, position); return delegate.get(buf, position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return delegate.getLong(buf, position);
}
@Override @Override
public float getFloat(ByteBuffer buf, int position) public float getFloat(ByteBuffer buf, int position)
{ {

View File

@ -90,6 +90,12 @@ public class HistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()"); throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getLong()");
}
@Override @Override
public void close() public void close()
{ {

View File

@ -63,6 +63,13 @@ public class JavaScriptBufferAggregator implements BufferAggregator
return (float)buf.getDouble(position); return (float)buf.getDouble(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override @Override
public void close() { public void close() {
script.close(); script.close();

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import io.druid.segment.FloatColumnSelector; import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.util.Comparator; import java.util.Comparator;
@ -41,12 +42,12 @@ public class LongSumAggregator implements Aggregator
return ((Number) lhs).longValue() + ((Number) rhs).longValue(); return ((Number) lhs).longValue() + ((Number) rhs).longValue();
} }
private final FloatColumnSelector selector; private final LongColumnSelector selector;
private final String name; private final String name;
private long sum; private long sum;
public LongSumAggregator(String name, FloatColumnSelector selector) public LongSumAggregator(String name, LongColumnSelector selector)
{ {
this.name = name; this.name = name;
this.selector = selector; this.selector = selector;
@ -57,7 +58,7 @@ public class LongSumAggregator implements Aggregator
@Override @Override
public void aggregate() public void aggregate()
{ {
sum += (long) selector.get(); sum += selector.get();
} }
@Override @Override

View File

@ -58,14 +58,14 @@ public class LongSumAggregatorFactory implements AggregatorFactory
{ {
return new LongSumAggregator( return new LongSumAggregator(
name, name,
metricFactory.makeFloatColumnSelector(fieldName) metricFactory.makeLongColumnSelector(fieldName)
); );
} }
@Override @Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{ {
return new LongSumBufferAggregator(metricFactory.makeFloatColumnSelector(fieldName)); return new LongSumBufferAggregator(metricFactory.makeLongColumnSelector(fieldName));
} }
@Override @Override
@ -134,7 +134,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@Override @Override
public String getTypeName() public String getTypeName()
{ {
return "float"; return "long";
} }
@Override @Override

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector; import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -27,10 +28,10 @@ import java.nio.ByteBuffer;
*/ */
public class LongSumBufferAggregator implements BufferAggregator public class LongSumBufferAggregator implements BufferAggregator
{ {
private final FloatColumnSelector selector; private final LongColumnSelector selector;
public LongSumBufferAggregator( public LongSumBufferAggregator(
FloatColumnSelector selector LongColumnSelector selector
) )
{ {
this.selector = selector; this.selector = selector;
@ -45,7 +46,7 @@ public class LongSumBufferAggregator implements BufferAggregator
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
buf.putLong(position, buf.getLong(position) + (long) selector.get()); buf.putLong(position, buf.getLong(position) + selector.get());
} }
@Override @Override
@ -60,6 +61,13 @@ public class LongSumBufferAggregator implements BufferAggregator
return (float) buf.getLong(position); return (float) buf.getLong(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -58,6 +58,12 @@ public class MaxBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -58,6 +58,13 @@ public class MinBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position); return (float) buf.getDouble(position);
} }
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -79,7 +79,14 @@ public class CardinalityBufferAggregator implements BufferAggregator
@Override @Override
public float getFloat(ByteBuffer buf, int position) public float getFloat(ByteBuffer buf, int position)
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getLong()");
} }
@Override @Override

View File

@ -79,7 +79,14 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
@Override @Override
public float getFloat(ByteBuffer buf, int position) public float getFloat(ByteBuffer buf, int position)
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("HyperUniquesBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("HyperUniquesBufferAggregator does not support getLong()");
} }
@Override @Override

View File

@ -24,20 +24,25 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public class GroupByQueryHelper public class GroupByQueryHelper
{ {
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair( public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query, final GroupByQuery query,
final GroupByQueryConfig config final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool
) )
{ {
final QueryGranularity gran = query.getGranularity(); final QueryGranularity gran = query.getGranularity();
@ -69,21 +74,37 @@ public class GroupByQueryHelper
} }
} }
); );
IncrementalIndex index = new IncrementalIndex( final IncrementalIndex index;
if(query.getContextValue("useOffheap", false)){
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
);
} else {
index = new IncrementalIndex(
// use granularity truncated min timestamp // use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart // since incoming truncated timestamps may precede timeStart
granTimeStart, granTimeStart,
gran, gran,
aggs.toArray(new AggregatorFactory[aggs.size()]) aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
); );
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>() Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{ {
@Override @Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, T in) public IncrementalIndex accumulate(IncrementalIndex accumulated, T in)
{ {
if (in instanceof Row) { if (in instanceof Row) {
if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions), false) if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions))
> config.getMaxResults()) { > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
} }

View File

@ -32,13 +32,16 @@ import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence; import io.druid.collections.OrderedMergeSequence;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSource; import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
@ -82,19 +85,24 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
); );
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
private final StupidPool<ByteBuffer> bufferPool;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private GroupByQueryEngine engine; // For running the outer query around a subquery private GroupByQueryEngine engine; // For running the outer query around a subquery
@Inject @Inject
public GroupByQueryQueryToolChest( public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier, Supplier<GroupByQueryConfig> configSupplier,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
GroupByQueryEngine engine GroupByQueryEngine engine,
@Global StupidPool<ByteBuffer> bufferPool
) )
{ {
this.configSupplier = configSupplier; this.configSupplier = configSupplier;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.engine = engine; this.engine = engine;
this.bufferPool = bufferPool;
} }
@Override @Override
@ -103,24 +111,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return new QueryRunner<Row>() return new QueryRunner<Row>()
{ {
@Override @Override
public Sequence<Row> run(Query<Row> input) public Sequence<Row> run(Query<Row> input, Map<String, Object> context)
{ {
if (input.getContextBySegment(false)) { if (input.getContextBySegment(false)) {
return runner.run(input); return runner.run(input, context);
} }
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context);
} }
return runner.run(input); return runner.run(input, context);
} }
}; };
} }
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner) private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner, Map<String, Object> context)
{ {
// If there's a subquery, merge subquery results and then apply the aggregator // If there's a subquery, merge subquery results and then apply the aggregator
final DataSource dataSource = query.getDataSource(); final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery; GroupByQuery subquery;
try { try {
@ -129,7 +139,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
catch (ClassCastException e) { catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
} }
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
final List<AggregatorFactory> aggs = Lists.newArrayList(); final List<AggregatorFactory> aggs = Lists.newArrayList();
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
aggs.addAll(aggregatorFactory.getRequiredColumns()); aggs.addAll(aggregatorFactory.getRequiredColumns());
@ -145,13 +156,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final GroupByQuery outerQuery = new GroupByQuery.Builder(query) final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build(); .build();
IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);
final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter( return new ResourceClosingSequence<>(
makeIncrementalIndex(innerQuery, subqueryResult) outerQuery.applyLimit(
engine.process(
outerQuery,
new IncrementalIndexStorageAdapter(
index
)
)
),
index
); );
return outerQuery.applyLimit(engine.process(outerQuery, adapter));
} else { } else {
return query.applyLimit(postAggregate(query, makeIncrementalIndex(query, runner.run(query)))); final IncrementalIndex index = makeIncrementalIndex(query, runner.run(query, context));
return new ResourceClosingSequence<>(query.applyLimit(postAggregate(query, index)), index);
} }
} }
@ -180,13 +200,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final GroupByQueryConfig config = configSupplier.get(); final GroupByQueryConfig config = configSupplier.get();
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,
config config,
bufferPool
); );
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
} }
@Override @Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences) public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{ {

View File

@ -32,7 +32,9 @@ import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.ConcatQueryRunner; import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.GroupByParallelQueryRunner;
@ -46,7 +48,9 @@ import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -57,25 +61,27 @@ import java.util.concurrent.TimeoutException;
*/ */
public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery> public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery>
{ {
private static final Logger log = new Logger(GroupByQueryRunnerFactory.class);
private final GroupByQueryEngine engine; private final GroupByQueryEngine engine;
private final QueryWatcher queryWatcher; private final QueryWatcher queryWatcher;
private final Supplier<GroupByQueryConfig> config; private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest; private final GroupByQueryQueryToolChest toolChest;
private final StupidPool<ByteBuffer> computationBufferPool;
private static final Logger log = new Logger(GroupByQueryRunnerFactory.class);
@Inject @Inject
public GroupByQueryRunnerFactory( public GroupByQueryRunnerFactory(
GroupByQueryEngine engine, GroupByQueryEngine engine,
QueryWatcher queryWatcher, QueryWatcher queryWatcher,
Supplier<GroupByQueryConfig> config, Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest GroupByQueryQueryToolChest toolChest,
@Global StupidPool<ByteBuffer> computationBufferPool
) )
{ {
this.engine = engine; this.engine = engine;
this.queryWatcher = queryWatcher; this.queryWatcher = queryWatcher;
this.config = config; this.config = config;
this.toolChest = toolChest; this.toolChest = toolChest;
this.computationBufferPool = computationBufferPool;
} }
@Override @Override
@ -102,13 +108,14 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return new QueryRunner<Row>() return new QueryRunner<Row>()
{ {
@Override @Override
public Sequence<Row> run(final Query<Row> query) public Sequence<Row> run(final Query<Row> query, final Map<String, Object> context)
{ {
final GroupByQuery queryParam = (GroupByQuery) query; final GroupByQuery queryParam = (GroupByQuery) query;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper
.createIndexAccumulatorPair( .createIndexAccumulatorPair(
queryParam, queryParam,
config.get() config.get(),
computationBufferPool
); );
final Pair<List, Accumulator<List, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final Pair<List, Accumulator<List, Row>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
@ -121,13 +128,14 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
public Void call() throws Exception public Void call() throws Exception
{ {
if (bySegment) { if (bySegment) {
input.run(queryParam) input.run(queryParam, context)
.accumulate( .accumulate(
bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.lhs,
bySegmentAccumulatorPair.rhs bySegmentAccumulatorPair.rhs
); );
} else { } else {
input.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); input.run(query, context)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
} }
return null; return null;
@ -172,7 +180,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
) )
); );
} else { } else {
return new GroupByParallelQueryRunner(queryExecutor, config, queryWatcher, queryRunners);
return new GroupByParallelQueryRunner(queryExecutor, config, queryWatcher, computationBufferPool, queryRunners);
} }
} }
@ -194,7 +203,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
} }
@Override @Override
public Sequence<Row> run(Query<Row> input) public Sequence<Row> run(Query<Row> input, Map<String, Object> context)
{ {
if (!(input instanceof GroupByQuery)) { if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);

View File

@ -84,7 +84,7 @@ public class SegmentAnalyzer
columns.put(columnName, analysis); columns.put(columnName, analysis);
} }
columns.put("__time", lengthBasedAnalysis(index.getTimeColumn(), NUM_BYTES_IN_TIMESTAMP)); columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP));
return columns; return columns;
} }

View File

@ -77,7 +77,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>() return new QueryRunner<SegmentAnalysis>()
{ {
@Override @Override
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ) public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> context)
{ {
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
@ -136,7 +136,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>() return new QueryRunner<SegmentAnalysis>()
{ {
@Override @Override
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query) public Sequence<SegmentAnalysis> run(
final Query<SegmentAnalysis> query,
final Map<String, Object> context
)
{ {
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit( ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
@ -145,7 +148,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override @Override
public Sequence<SegmentAnalysis> call() throws Exception public Sequence<SegmentAnalysis> call() throws Exception
{ {
return input.run(query); return input.run(query, context);
} }
} }
); );

View File

@ -278,7 +278,10 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
} }
@Override @Override
public Sequence<Result<SearchResultValue>> run(Query<Result<SearchResultValue>> input) public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof SearchQuery)) { if (!(input instanceof SearchQuery)) {
throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass()); throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass());
@ -286,13 +289,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final SearchQuery query = (SearchQuery) input; final SearchQuery query = (SearchQuery) input;
if (query.getLimit() < config.getMaxSearchLimit()) { if (query.getLimit() < config.getMaxSearchLimit()) {
return runner.run(query); return runner.run(query, context);
} }
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit())), runner.run(query.withLimit(config.getMaxSearchLimit()), context),
new Function<Result<SearchResultValue>, Result<SearchResultValue>>() new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
{ {
@Override @Override

View File

@ -68,7 +68,10 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
} }
@Override @Override
public Sequence<Result<SearchResultValue>> run(final Query<Result<SearchResultValue>> input) public Sequence<Result<SearchResultValue>> run(
final Query<Result<SearchResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof SearchQuery)) { if (!(input instanceof SearchQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class);

View File

@ -28,10 +28,11 @@ import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector; import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector; import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -83,7 +84,7 @@ public class SelectQueryEngine
.getThreshold() .getThreshold()
); );
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap(); final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) { for (String dim : dims) {
@ -110,7 +111,7 @@ public class SelectQueryEngine
int offset = 0; int offset = 0;
while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) { while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) {
final Map<String, Object> theEvent = Maps.newLinkedHashMap(); final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.getTimestamp())); theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) { for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey(); final String dim = dimSelector.getKey();

View File

@ -31,6 +31,8 @@ import io.druid.query.QueryWatcher;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -88,7 +90,10 @@ public class SelectQueryRunnerFactory
} }
@Override @Override
public Sequence<Result<SelectResultValue>> run(Query<Result<SelectResultValue>> input) public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof SelectQuery)) { if (!(input instanceof SelectQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class);

View File

@ -20,14 +20,21 @@
package io.druid.query.spec; package io.druid.query.spec;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.RetryQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.SegmentMissingException;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
/** /**
@ -35,11 +42,11 @@ import java.util.concurrent.Callable;
public class SpecificSegmentQueryRunner<T> implements QueryRunner<T> public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunner<T> base; private final QueryRunner<T> base;
private final QuerySegmentSpec specificSpec; private final SpecificSegmentSpec specificSpec;
public SpecificSegmentQueryRunner( public SpecificSegmentQueryRunner(
QueryRunner<T> base, QueryRunner<T> base,
QuerySegmentSpec specificSpec SpecificSegmentSpec specificSpec
) )
{ {
this.base = base; this.base = base;
@ -47,7 +54,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public Sequence<T> run(final Query<T> input) public Sequence<T> run(final Query<T> input, final Map<String, Object> context)
{ {
final Query<T> query = input.withQuerySegmentSpec(specificSpec); final Query<T> query = input.withQuerySegmentSpec(specificSpec);
@ -55,14 +62,29 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
final String currThreadName = currThread.getName(); final String currThreadName = currThread.getName();
final String newName = String.format("%s_%s_%s", query.getType(), query.getDataSource(), query.getIntervals()); final String newName = String.format("%s_%s_%s", query.getType(), query.getDataSource(), query.getIntervals());
final Sequence<T> baseSequence = doNamed(currThread, currThreadName, newName, new Callable<Sequence<T>>() final Sequence<T> baseSequence = doNamed(
{ currThread, currThreadName, newName, new Callable<Sequence<T>>()
@Override {
public Sequence<T> call() throws Exception @Override
{ public Sequence<T> call() throws Exception
return base.run(query); {
} Sequence<T> returningSeq;
}); try {
returningSeq = base.run(query, context);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
returningSeq = Sequences.empty();
}
return returningSeq;
}
}
);
return new Sequence<T>() return new Sequence<T>()
{ {
@ -82,7 +104,10 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
} }
@Override @Override
public <OutType> Yielder<OutType> toYielder(final OutType initValue, final YieldingAccumulator<OutType, T> accumulator) public <OutType> Yielder<OutType> toYielder(
final OutType initValue,
final YieldingAccumulator<OutType, T> accumulator
)
{ {
return doItNamed( return doItNamed(
new Callable<Yielder<OutType>>() new Callable<Yielder<OutType>>()

View File

@ -52,6 +52,8 @@ public class SpecificSegmentSpec implements QuerySegmentSpec
return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor)); return walker.getQueryRunnerForSegments(query, Arrays.asList(descriptor));
} }
public SegmentDescriptor getDescriptor() { return descriptor; }
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -44,6 +44,7 @@ import org.joda.time.DateTime;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -94,13 +95,13 @@ public class TimeBoundaryQueryQueryToolChest
{ {
@Override @Override
protected Sequence<Result<TimeBoundaryResultValue>> doRun( protected Sequence<Result<TimeBoundaryResultValue>> doRun(
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input, Map<String, Object> context
) )
{ {
TimeBoundaryQuery query = (TimeBoundaryQuery) input; TimeBoundaryQuery query = (TimeBoundaryQuery) input;
return Sequences.simple( return Sequences.simple(
query.mergeResults( query.mergeResults(
Sequences.toList(baseRunner.run(query), Lists.<Result<TimeBoundaryResultValue>>newArrayList()) Sequences.toList(baseRunner.run(query, context), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
) )
); );
} }

View File

@ -35,6 +35,8 @@ import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -83,7 +85,10 @@ public class TimeBoundaryQueryRunnerFactory
} }
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> input) public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof TimeBoundaryQuery)) { if (!(input instanceof TimeBoundaryQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);

View File

@ -20,13 +20,13 @@
package io.druid.query.timeseries; package io.druid.query.timeseries;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
@ -39,7 +39,7 @@ public class TimeseriesQueryEngine
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter) public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) { if (adapter == null) {
throw new ISE( throw new SegmentMissingException(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
); );
} }

View File

@ -32,6 +32,8 @@ import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -89,7 +91,10 @@ public class TimeseriesQueryRunnerFactory
} }
@Override @Override
public Sequence<Result<TimeseriesResultValue>> run(Query<Result<TimeseriesResultValue>> input) public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof TimeseriesQuery)) { if (!(input instanceof TimeseriesQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class);

View File

@ -22,7 +22,6 @@ package io.druid.query.topn;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -33,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.segment.Capabilities; import io.druid.segment.Capabilities;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -56,7 +56,7 @@ public class TopNQueryEngine
public Sequence<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter) public Sequence<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) { if (adapter == null) {
throw new ISE( throw new SegmentMissingException(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
); );
} }

View File

@ -409,7 +409,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
} }
@Override @Override
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input) public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof TopNQuery)) { if (!(input instanceof TopNQuery)) {
throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass()); throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass());
@ -417,13 +420,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final TopNQuery query = (TopNQuery) input; final TopNQuery query = (TopNQuery) input;
if (query.getThreshold() > minTopNThreshold) { if (query.getThreshold() > minTopNThreshold) {
return runner.run(query); return runner.run(query, context);
} }
final boolean isBySegment = query.getContextBySegment(false); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)), runner.run(query.withThreshold(minTopNThreshold), context),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>() new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{ {
@Override @Override

View File

@ -34,6 +34,8 @@ import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -63,7 +65,10 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
return new QueryRunner<Result<TopNResultValue>>() return new QueryRunner<Result<TopNResultValue>>()
{ {
@Override @Override
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input) public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
Map<String, Object> context
)
{ {
if (!(input instanceof TopNQuery)) { if (!(input instanceof TopNQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class);

View File

@ -0,0 +1,61 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
/**
*/
public class BaseProgressIndicator implements ProgressIndicator
{
@Override
public void progress()
{
// do nothing
}
@Override
public void start()
{
// do nothing
}
@Override
public void stop()
{
// do nothing
}
@Override
public void startSection(String section)
{
// do nothing
}
@Override
public void progressSection(String section, String message)
{
// do nothing
}
@Override
public void stopSection(String section)
{
// do nothing
}
}

View File

@ -23,6 +23,5 @@ package io.druid.segment;import io.druid.segment.column.Column;
*/ */
public interface ColumnSelector public interface ColumnSelector
{ {
public Column getTimeColumn();
public Column getColumn(String columnName); public Column getColumn(String columnName);
} }

View File

@ -91,7 +91,7 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
{ {
GenericColumn column = null; GenericColumn column = null;
try { try {
column = index.getTimeColumn().getGenericColumn(); column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return column.length(); return column.length();
} }
finally { finally {

View File

@ -24,8 +24,8 @@ package io.druid.segment;
*/ */
public interface ColumnSelectorFactory public interface ColumnSelectorFactory
{ {
public TimestampColumnSelector makeTimestampColumnSelector();
public DimensionSelector makeDimensionSelector(String dimensionName); public DimensionSelector makeDimensionSelector(String dimensionName);
public FloatColumnSelector makeFloatColumnSelector(String columnName); public FloatColumnSelector makeFloatColumnSelector(String columnName);
public LongColumnSelector makeLongColumnSelector(String columnName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName); public ObjectColumnSelector makeObjectColumnSelector(String columnName);
} }

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.segment.data.CompressedFloatsSupplierSerializer; import io.druid.segment.data.CompressedFloatsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon; import io.druid.segment.data.IOPeon;
import java.io.File; import java.io.File;
@ -51,7 +52,8 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
public void open() throws IOException public void open() throws IOException
{ {
writer = CompressedFloatsSupplierSerializer.create( writer = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
); );
writer.open(); writer.open();

View File

@ -68,6 +68,6 @@ public class IncrementalIndexSegment implements Segment
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
// do nothing index.close();
} }
} }

Some files were not shown because too many files have changed in this diff Show More