diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 57150629f14..c8fec800e3e 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index b4bbaf0f4f6..03e902df7c5 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/common/src/main/java/io/druid/guice/DruidSecondaryModule.java b/common/src/main/java/io/druid/guice/DruidSecondaryModule.java
index fbafb29d42b..d768a60a7c1 100644
--- a/common/src/main/java/io/druid/guice/DruidSecondaryModule.java
+++ b/common/src/main/java/io/druid/guice/DruidSecondaryModule.java
@@ -71,7 +71,8 @@ public class DruidSecondaryModule implements Module
binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
- binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class));
+ // make objectMapper eager to ensure jackson gets setup with guice injection for JsonConfigurator
+ binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)).asEagerSingleton();
binder.bind(Validator.class).toInstance(validator);
binder.bind(JsonConfigurator.class).toInstance(jsonConfigurator);
}
diff --git a/examples/bin/run_example_server.sh b/examples/bin/run_example_server.sh
index 3d63e7cb0c3..461f937beb0 100755
--- a/examples/bin/run_example_server.sh
+++ b/examples/bin/run_example_server.sh
@@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
+DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:"
diff --git a/examples/config/_global/global.runtime.properties b/examples/config/_global/global.runtime.properties
new file mode 100644
index 00000000000..beb3dba77c2
--- /dev/null
+++ b/examples/config/_global/global.runtime.properties
@@ -0,0 +1,23 @@
+# Extensions
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"]
+
+# Zookeeper
+druid.zk.service.host=localhost
+
+# Metadata Storage
+druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
+druid.db.connector.user=druid
+druid.db.connector.password=diurd
+
+# Deep storage
+druid.storage.type=local
+druid.storage.storage.storageDirectory=/tmp/druid/localStorage
+
+# Indexing service discovery
+druid.selectors.indexing.serviceName=overlord
+
+# Monitoring (disabled for examples)
+# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
+
+# Metrics logging (disabled for examples)
+druid.emitter=noop
diff --git a/examples/config/broker/runtime.properties b/examples/config/broker/runtime.properties
index 8afae982654..23d1170343b 100644
--- a/examples/config/broker/runtime.properties
+++ b/examples/config/broker/runtime.properties
@@ -2,8 +2,6 @@ druid.host=localhost
druid.service=broker
druid.port=8080
-druid.zk.service.host=localhost
-
-# Change these to make Druid faster
+# Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
diff --git a/examples/config/coordinator/runtime.properties b/examples/config/coordinator/runtime.properties
index 3d68fec772e..c9f16857af4 100644
--- a/examples/config/coordinator/runtime.properties
+++ b/examples/config/coordinator/runtime.properties
@@ -2,10 +2,4 @@ druid.host=localhost
druid.service=coordinator
druid.port=8082
-druid.zk.service.host=localhost
-
-druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-druid.db.connector.user=druid
-druid.db.connector.password=diurd
-
druid.coordinator.startDelay=PT70s
\ No newline at end of file
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index e21651760b0..544e5c860e1 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -2,9 +2,7 @@ druid.host=localhost
druid.service=historical
druid.port=8081
-druid.zk.service.host=localhost
-
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -16,4 +14,4 @@ druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
-druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
\ No newline at end of file
+druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
diff --git a/examples/config/overlord/runtime.properties b/examples/config/overlord/runtime.properties
index f727c0e012b..3eebd15a9a9 100644
--- a/examples/config/overlord/runtime.properties
+++ b/examples/config/overlord/runtime.properties
@@ -1,18 +1,8 @@
druid.host=localhost
-druid.port=8087
+druid.port=8080
druid.service=overlord
-druid.zk.service.host=localhost
-
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
-
-druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-druid.db.connector.user=druid
-druid.db.connector.password=diurd
-
-druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m"
-druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.processing.numThreads=1
-druid.indexer.fork.property.druid.computation.buffer.size=100000000
\ No newline at end of file
+druid.indexer.fork.property.druid.computation.buffer.size=100000000
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index e20d1b56b8f..ccde5b5bb3b 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -2,19 +2,11 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.zk.service.host=localhost
-
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147","io.druid.extensions:druid-rabbitmq:0.6.147"]
-
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
-# These configs are only required for real hand off
-# druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-# druid.db.connector.user=druid
-# druid.db.connector.password=diurd
-
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
-druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
+# Enable Real monitoring
+# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]
diff --git a/examples/pom.xml b/examples/pom.xml
index 33515fffe61..896b772738e 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 7cdaea48728..ac2c29b7405 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/histogram/pom.xml b/histogram/pom.xml
index f0e11ea375e..a520de6725b 100644
--- a/histogram/pom.xml
+++ b/histogram/pom.xml
@@ -27,7 +27,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
index dbd566f2693..c150d3810ce 100644
--- a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
+++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramBufferAggregator.java
@@ -87,6 +87,13 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()");
}
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
+ }
+
@Override
public void close()
{
diff --git a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
index 4190ae50a40..cbf3e115500 100644
--- a/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
+++ b/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingBufferAggregator.java
@@ -91,6 +91,12 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()");
}
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
+ }
+
@Override
public void close()
{
diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
index 1c7b23b3a3f..48814642f5b 100644
--- a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
+++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java
@@ -86,7 +86,8 @@ public class ApproximateHistogramGroupByQueryTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
- new GroupByQueryQueryToolChest(configSupplier, mapper, engine)
+ new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool),
+ pool
);
GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig()
@@ -106,7 +107,8 @@ public class ApproximateHistogramGroupByQueryTest
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
- new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine)
+ new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
+ pool
);
diff --git a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
index 25f58c064bf..bba68f5339e 100644
--- a/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
+++ b/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java
@@ -48,6 +48,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -240,7 +241,8 @@ public class ApproximateHistogramTopNQueryTest
)
)
);
+ HashMap context = new HashMap();
- TestHelper.assertExpectedResults(expectedResults, runner.run(query));
+ TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
}
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 24373fc733f..f23382b6de9 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
index 335c8c1858b..8c6eaf8d528 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
@@ -48,7 +48,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
-import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
@@ -394,6 +393,11 @@ public class HadoopDruidIndexerConfig
}
}
+ public boolean isPersistInHeap()
+ {
+ return schema.getTuningConfig().isPersistInHeap();
+ }
+
/******************************************
Path helper logic
******************************************/
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
index 64bc1267146..4f41cc52df8 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
@@ -57,7 +57,11 @@ public class HadoopDruidIndexerJob implements Jobby
List jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
- indexJob = new IndexGeneratorJob(config);
+ if (config.isPersistInHeap()) {
+ indexJob = new IndexGeneratorJob(config);
+ } else {
+ indexJob = new LegacyIndexGeneratorJob(config);
+ }
jobs.add(indexJob);
if (dbUpdaterJob != null) {
@@ -66,15 +70,17 @@ public class HadoopDruidIndexerJob implements Jobby
log.info("No updaterJobSpec set, not uploading to database");
}
- jobs.add(new Jobby()
- {
- @Override
- public boolean run()
- {
- publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
- return true;
- }
- });
+ jobs.add(
+ new Jobby()
+ {
+ @Override
+ public boolean run()
+ {
+ publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
+ return true;
+ }
+ }
+ );
JobHelper.runJobs(jobs, config);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
index bb560e27656..fec163e4164 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
@@ -159,13 +159,15 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties;
private final boolean combineText;
+ private final boolean persistInHeap;
+ private final boolean ingestOffheap;
@JsonCreator
public HadoopTuningConfig(
@@ -81,7 +85,9 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map jobProperties,
- final @JsonProperty("combineText") boolean combineText
+ final @JsonProperty("combineText") boolean combineText,
+ final @JsonProperty("persistInHeap") boolean persistInHeap,
+ final @JsonProperty("ingestOffheap") boolean ingestOffheap
)
{
this.workingPath = workingPath == null ? null : workingPath;
@@ -97,6 +103,8 @@ public class HadoopTuningConfig implements TuningConfig
? ImmutableMap.of()
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
+ this.persistInHeap = persistInHeap;
+ this.ingestOffheap = ingestOffheap;
}
@JsonProperty
@@ -165,6 +173,17 @@ public class HadoopTuningConfig implements TuningConfig
return combineText;
}
+ @JsonProperty
+ public boolean isPersistInHeap()
+ {
+ return persistInHeap;
+ }
+
+ @JsonProperty
+ public boolean isIngestOffheap(){
+ return ingestOffheap;
+ }
+
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@@ -178,7 +197,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
- combineText
+ combineText,
+ persistInHeap,
+ ingestOffheap
);
}
@@ -195,7 +216,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
- combineText
+ combineText,
+ persistInHeap,
+ ingestOffheap
);
}
@@ -212,7 +235,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
- combineText
+ combineText,
+ persistInHeap,
+ ingestOffheap
);
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index e562f7f45d6..df3bdcaecce 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -35,13 +35,17 @@ import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
-import io.druid.segment.IndexMerger;
+import io.druid.segment.IndexMaker;
+import io.druid.segment.LoggingProgressIndicator;
+import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
@@ -86,20 +90,9 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);
- private final HadoopDruidIndexerConfig config;
- private IndexGeneratorStats jobStats;
-
- public IndexGeneratorJob(
- HadoopDruidIndexerConfig config
- )
- {
- this.config = config;
- this.jobStats = new IndexGeneratorStats();
- }
public static List getPublishedSegments(HadoopDruidIndexerConfig config)
{
-
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@@ -130,6 +123,22 @@ public class IndexGeneratorJob implements Jobby
return publishedSegments;
}
+ private final HadoopDruidIndexerConfig config;
+ private IndexGeneratorStats jobStats;
+
+ public IndexGeneratorJob(
+ HadoopDruidIndexerConfig config
+ )
+ {
+ this.config = config;
+ this.jobStats = new IndexGeneratorStats();
+ }
+
+ protected void setReducerClass(final Job job)
+ {
+ job.setReducerClass(IndexGeneratorReducer.class);
+ }
+
public IndexGeneratorStats getJobStats()
{
return jobStats;
@@ -161,7 +170,7 @@ public class IndexGeneratorJob implements Jobby
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
job.setPartitionerClass(IndexGeneratorPartitioner.class);
- job.setReducerClass(IndexGeneratorReducer.class);
+ setReducerClass(job);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
@@ -190,7 +199,6 @@ public class IndexGeneratorJob implements Jobby
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper
-
{
@Override
protected void innerMap(
@@ -256,6 +264,42 @@ public class IndexGeneratorJob implements Jobby
private List metricNames = Lists.newArrayList();
private StringInputRowParser parser;
+ protected ProgressIndicator makeProgressIndicator(final Context context)
+ {
+ return new LoggingProgressIndicator("IndexGeneratorJob")
+ {
+ @Override
+ public void progress()
+ {
+ context.progress();
+ }
+ };
+ }
+
+ protected File persist(
+ final IncrementalIndex index,
+ final Interval interval,
+ final File file,
+ final ProgressIndicator progressIndicator
+ ) throws IOException
+ {
+ return IndexMaker.persist(
+ index, interval, file, progressIndicator
+ );
+ }
+
+ protected File mergeQueryableIndex(
+ final List indexes,
+ final AggregatorFactory[] aggs,
+ final File file,
+ ProgressIndicator progressIndicator
+ ) throws IOException
+ {
+ return IndexMaker.mergeQueryableIndex(
+ indexes, aggs, file, progressIndicator
+ );
+ }
+
@Override
protected void setup(Context context)
throws IOException, InterruptedException
@@ -282,113 +326,84 @@ public class IndexGeneratorJob implements Jobby
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
+ try {
+ File baseFlushFile = File.createTempFile("base", "flush");
+ baseFlushFile.delete();
+ baseFlushFile.mkdirs();
- File baseFlushFile = File.createTempFile("base", "flush");
- baseFlushFile.delete();
- baseFlushFile.mkdirs();
+ Set toMerge = Sets.newTreeSet();
+ int indexCount = 0;
+ int lineCount = 0;
+ int runningTotalLineCount = 0;
+ long startTime = System.currentTimeMillis();
- Set toMerge = Sets.newTreeSet();
- int indexCount = 0;
- int lineCount = 0;
- int runningTotalLineCount = 0;
- long startTime = System.currentTimeMillis();
-
- Set allDimensionNames = Sets.newHashSet();
-
- for (final Text value : values) {
- context.progress();
- final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
- allDimensionNames.addAll(inputRow.getDimensions());
-
- int numRows = index.add(inputRow);
- ++lineCount;
-
- if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
- log.info(
- "%,d lines to %,d rows in %,d millis",
- lineCount - runningTotalLineCount,
- numRows,
- System.currentTimeMillis() - startTime
- );
- runningTotalLineCount = lineCount;
-
- final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
- toMerge.add(file);
+ Set allDimensionNames = Sets.newHashSet();
+ final ProgressIndicator progressIndicator = makeProgressIndicator(context);
+ for (final Text value : values) {
context.progress();
- IndexMerger.persist(
- index, interval, file, new IndexMerger.ProgressIndicator()
- {
- @Override
- public void progress()
- {
- context.progress();
- }
- }
- );
- index = makeIncrementalIndex(bucket, aggs);
+ final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
+ allDimensionNames.addAll(inputRow.getDimensions());
- startTime = System.currentTimeMillis();
- ++indexCount;
- }
- }
+ int numRows = index.add(inputRow);
+ ++lineCount;
- log.info("%,d lines completed.", lineCount);
+ if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
+ log.info(
+ "%,d lines to %,d rows in %,d millis",
+ lineCount - runningTotalLineCount,
+ numRows,
+ System.currentTimeMillis() - startTime
+ );
+ runningTotalLineCount = lineCount;
- List indexes = Lists.newArrayListWithCapacity(indexCount);
- final File mergedBase;
+ final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
+ toMerge.add(file);
- if (toMerge.size() == 0) {
- if (index.isEmpty()) {
- throw new IAE("If you try to persist empty indexes you are going to have a bad time");
- }
-
- mergedBase = new File(baseFlushFile, "merged");
- IndexMerger.persist(
- index, interval, mergedBase, new IndexMerger.ProgressIndicator()
- {
- @Override
- public void progress()
- {
context.progress();
+ persist(index, interval, file, progressIndicator);
+ // close this index and make a new one
+ index.close();
+ index = makeIncrementalIndex(bucket, aggs);
+
+ startTime = System.currentTimeMillis();
+ ++indexCount;
}
}
- );
- } else {
- if (!index.isEmpty()) {
- final File finalFile = new File(baseFlushFile, "final");
- IndexMerger.persist(
- index, interval, finalFile, new IndexMerger.ProgressIndicator()
- {
- @Override
- public void progress()
- {
- context.progress();
- }
- }
- );
- toMerge.add(finalFile);
- }
+ log.info("%,d lines completed.", lineCount);
+
+ List indexes = Lists.newArrayListWithCapacity(indexCount);
+ final File mergedBase;
+
+ if (toMerge.size() == 0) {
+ if (index.isEmpty()) {
+ throw new IAE("If you try to persist empty indexes you are going to have a bad time");
+ }
+
+ mergedBase = new File(baseFlushFile, "merged");
+ persist(index, interval, mergedBase, progressIndicator);
+ } else {
+ if (!index.isEmpty()) {
+ final File finalFile = new File(baseFlushFile, "final");
+ persist(index, interval, finalFile, progressIndicator);
+ toMerge.add(finalFile);
+ }
+
+ for (File file : toMerge) {
+ indexes.add(IndexIO.loadIndex(file));
+ }
+ mergedBase = mergeQueryableIndex(
+ indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
+ );
+ }
+ serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) {
- indexes.add(IndexIO.loadIndex(file));
+ FileUtils.deleteDirectory(file);
}
- mergedBase = IndexMerger.mergeQueryableIndex(
- indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
- {
- @Override
- public void progress()
- {
- context.progress();
- }
- }
- );
}
-
- serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
-
- for (File file : toMerge) {
- FileUtils.deleteDirectory(file);
+ finally {
+ index.close();
}
}
@@ -616,14 +631,29 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{
- return new IncrementalIndex(
- new IncrementalIndexSchema.Builder()
- .withMinTimestamp(theBucket.time.getMillis())
- .withSpatialDimensions(config.getSchema().getDataSchema().getParser())
- .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
- .withMetrics(aggs)
- .build()
- );
+ int aggsSize = 0;
+ for (AggregatorFactory agg : aggs) {
+ aggsSize += agg.getMaxIntermediateSize();
+ }
+ final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
+ int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
+ final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
+ .withMinTimestamp(theBucket.time.getMillis())
+ .withDimensionsSpec(config.getSchema().getDataSchema().getParser())
+ .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
+ .withMetrics(aggs)
+ .build();
+ if (tuningConfig.isIngestOffheap()) {
+ return new OffheapIncrementalIndex(
+ indexSchema,
+ new OffheapBufferPool(bufferSize)
+ );
+ } else {
+ return new IncrementalIndex(
+ indexSchema,
+ new OffheapBufferPool(bufferSize)
+ );
+ }
}
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java
new file mode 100644
index 00000000000..876492ba8f2
--- /dev/null
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java
@@ -0,0 +1,86 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexer;
+
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.segment.BaseProgressIndicator;
+import io.druid.segment.IndexMerger;
+import io.druid.segment.ProgressIndicator;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.incremental.IncrementalIndex;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.Interval;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ */
+public class LegacyIndexGeneratorJob extends IndexGeneratorJob
+{
+ public LegacyIndexGeneratorJob(
+ HadoopDruidIndexerConfig config
+ )
+ {
+ super(config);
+ }
+
+ @Override
+ protected void setReducerClass(Job job)
+ {
+ job.setReducerClass(LegacyIndexGeneratorReducer.class);
+ }
+
+ public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer
+ {
+ @Override
+ protected ProgressIndicator makeProgressIndicator(final Context context)
+ {
+ return new BaseProgressIndicator()
+ {
+ @Override
+ public void progress()
+ {
+ context.progress();
+ }
+ };
+ }
+
+ @Override
+ protected File persist(
+ IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
+ ) throws IOException
+ {
+ return IndexMerger.persist(index, interval, file, progressIndicator);
+ }
+
+ @Override
+ protected File mergeQueryableIndex(
+ List indexes,
+ AggregatorFactory[] aggs,
+ File file,
+ ProgressIndicator progressIndicator
+ ) throws IOException
+ {
+ return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator);
+ }
+ }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/rollup/DataRollupSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/rollup/DataRollupSpec.java
index 86d04f9276f..3f0fd29736e 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/rollup/DataRollupSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/rollup/DataRollupSpec.java
@@ -20,6 +20,7 @@
package io.druid.indexer.rollup;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.util.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@@ -30,32 +31,17 @@ import java.util.List;
*
* Adjust to JsonCreator and final fields when resolved.
*/
+@Deprecated
public class DataRollupSpec
{
@JsonProperty
- public List aggs;
+ public List aggs = Lists.newArrayList();
@JsonProperty
public QueryGranularity rollupGranularity = QueryGranularity.NONE;
- @JsonProperty
- public int rowFlushBoundary = 500000;
-
- public DataRollupSpec() {}
-
- public DataRollupSpec(List aggs, QueryGranularity rollupGranularity)
- {
- this.aggs = aggs;
- this.rollupGranularity = rollupGranularity;
- }
-
public List getAggs()
{
return aggs;
}
-
- public QueryGranularity getRollupGranularity()
- {
- return rollupGranularity;
- }
}
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
index 1fe6068f2ad..9149a127fda 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -152,6 +152,10 @@ public class HadoopDruidIndexerConfigTest
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
}
+ // Backwards compatibility
+ DataRollupSpec rollupSpec = new DataRollupSpec();
+ rollupSpec.rollupGranularity = QueryGranularity.MINUTE;
+
HadoopIngestionSpec spec = new HadoopIngestionSpec(
null, null, null,
"foo",
@@ -172,7 +176,7 @@ public class HadoopDruidIndexerConfigTest
true,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
false,
- new DataRollupSpec(ImmutableList.of(), QueryGranularity.MINUTE),
+ rollupSpec,
null,
false,
ImmutableMap.of("foo", "bar"),
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 653a2f6c033..81260ba3295 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
index 11f6bb2264d..0f7859f72a6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
@@ -35,7 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
-import io.druid.segment.IndexMerger;
+import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
@@ -166,7 +166,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
- IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
+ IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
// Map merged segment so we can extract dimensions
@@ -211,8 +211,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
-
- IndexMerger.persist(
+ IndexMaker.persist(
indexToPersist.getIndex(),
dirToPersist
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
index 51fb358ac8c..5b6bd37db16 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
@@ -27,7 +27,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO;
-import io.druid.segment.IndexMerger;
+import io.druid.segment.IndexMaker;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
@@ -106,7 +106,7 @@ public class AppendTask extends MergeTaskBase
);
}
- return IndexMerger.append(adapters, outDir);
+ return IndexMaker.append(adapters, outDir);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
deleted file mode 100644
index 39d443d1330..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.metamx.common.logger.Logger;
-import io.druid.granularity.QueryGranularity;
-import io.druid.indexing.common.TaskLock;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.segment.IndexMerger;
-import io.druid.segment.IndexableAdapter;
-import io.druid.segment.incremental.IncrementalIndex;
-import io.druid.segment.incremental.IncrementalIndexAdapter;
-import io.druid.timeline.DataSegment;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.io.File;
-
-public class DeleteTask extends AbstractFixedIntervalTask
-{
- private static final Logger log = new Logger(DeleteTask.class);
-
- @JsonCreator
- public DeleteTask(
- @JsonProperty("id") String id,
- @JsonProperty("dataSource") String dataSource,
- @JsonProperty("interval") Interval interval
- )
- {
- super(
- id != null ? id : String.format(
- "delete_%s_%s_%s_%s",
- dataSource,
- interval.getStart(),
- interval.getEnd(),
- new DateTime().toString()
- ),
- dataSource,
- Preconditions.checkNotNull(interval, "interval")
- );
- }
-
- @Override
- public String getType()
- {
- return "delete";
- }
-
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
- {
- // Strategy: Create an empty segment covering the interval to be deleted
- final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
- final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
- final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
-
- // Create DataSegment
- final DataSegment segment =
- DataSegment.builder()
- .dataSource(this.getDataSource())
- .interval(getInterval())
- .version(myLock.getVersion())
- .shardSpec(new NoneShardSpec())
- .build();
-
- final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
- final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
-
- // Upload the segment
- final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
-
- log.info(
- "Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
- segment.getDataSource(),
- segment.getInterval(),
- segment.getVersion()
- );
-
- toolbox.pushSegments(ImmutableList.of(uploadedSegment));
-
- return TaskStatus.success(getId());
- }
-}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index f04736a66e3..d1495e6b6d2 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -147,7 +147,7 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
- new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
+ new IndexTuningConfig(targetPartitionSize, 0, null)
);
}
this.jsonMapper = jsonMapper;
@@ -401,7 +401,11 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
- ).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
+ ).findPlumber(
+ schema,
+ new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
+ metrics
+ );
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
@@ -557,7 +561,7 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards
- )
+ )
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java
index 9bca4a3eee5..d856d1505bc 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java
@@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
-import io.druid.segment.IndexMerger;
+import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.timeline.DataSegment;
@@ -60,7 +60,7 @@ public class MergeTask extends MergeTaskBase
public File merge(final Map segments, final File outDir)
throws Exception
{
- return IndexMerger.mergeQueryableIndex(
+ return IndexMaker.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),
new Function()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 231febf66df..906e8e7a901 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -143,7 +143,9 @@ public class RealtimeIndexTask extends AbstractTask
null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
- spec.getShardSpec()
+ spec.getShardSpec(),
+ false,
+ false
),
null, null, null, null
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index e5db129128c..a4c093179b3 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -43,7 +43,6 @@ import io.druid.query.QueryRunner;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
- @JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 5744fa7f006..f0dacac6fa8 100644
--- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -51,10 +51,11 @@ import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
+import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
-import io.druid.segment.TimestampColumnSelector;
+import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
@@ -250,7 +251,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory apply(final Cursor cursor)
{
- final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
+ final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map dimSelectors = Maps.newHashMap();
for (String dim : dims) {
@@ -287,7 +288,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory theEvent = Maps.newLinkedHashMap();
- final long timestamp = timestampColumnSelector.getTimestamp();
+ final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry dimSelector : dimSelectors.entrySet()) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index 8d89d834785..8807debcb3d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -49,7 +49,7 @@ public class ForkingTaskRunnerConfig
@JsonProperty
@Min(1024)
@Max(65535)
- private int startPort = 8081;
+ private int startPort = 8100;
@JsonProperty
@NotNull
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index f3a6e9e1d1f..ed4769644d1 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -243,51 +243,6 @@ public class TaskSerdeTest
);
}
- @Test
- public void testDeleteTaskSerde() throws Exception
- {
- final DeleteTask task = new DeleteTask(
- null,
- "foo",
- new Interval("2010-01-01/P1D")
- );
-
- final String json = jsonMapper.writeValueAsString(task);
-
- Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
- final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
-
- Assert.assertEquals("foo", task.getDataSource());
- Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
-
- Assert.assertEquals(task.getId(), task2.getId());
- Assert.assertEquals(task.getGroupId(), task2.getGroupId());
- Assert.assertEquals(task.getDataSource(), task2.getDataSource());
- Assert.assertEquals(task.getInterval(), task2.getInterval());
- }
-
- @Test
- public void testDeleteTaskFromJson() throws Exception
- {
- final DeleteTask task = (DeleteTask) jsonMapper.readValue(
- "{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
- Task.class
- );
- final String json = jsonMapper.writeValueAsString(task);
-
- Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
- final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
-
- Assert.assertNotNull(task.getId());
- Assert.assertEquals("foo", task.getDataSource());
- Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
-
- Assert.assertEquals(task.getId(), task2.getId());
- Assert.assertEquals(task.getGroupId(), task2.getGroupId());
- Assert.assertEquals(task.getDataSource(), task2.getDataSource());
- Assert.assertEquals(task.getInterval(), task2.getInterval());
- }
-
@Test
public void testAppendTaskSerde() throws Exception
{
@@ -413,7 +368,7 @@ public class TaskSerdeTest
true,
null,
false,
- new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE),
+ null,
null,
false,
ImmutableMap.of("foo", "bar"),
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index 79e28c016ec..e0c5fb43ebe 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml
index 21acf89e6a4..a5e6ef4b806 100644
--- a/kafka-seven/pom.xml
+++ b/kafka-seven/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index e5baf549052..1ca0ace022d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,19 +18,20 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
-
+
4.0.0
io.druid
druid
pom
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
druid
druid
scm:git:ssh://git@github.com/metamx/druid.git
scm:git:ssh://git@github.com/metamx/druid.git
http://www.github.com/metamx/druid
- druid-0.6.159-SNAPSHOT
+ druid-0.7.0-SNAPSHOT
@@ -41,7 +42,7 @@
UTF-8
0.26.9
2.6.0
- 0.2.15
+ 0.2.16
@@ -74,7 +75,7 @@
com.metamx
emitter
- 0.2.11
+ 0.2.12
com.metamx
@@ -389,7 +390,7 @@
net.jpountz.lz4
lz4
- 1.1.2
+ 1.2.0
com.google.protobuf
@@ -434,6 +435,11 @@
2.3.0
provided
+
+ org.mapdb
+ mapdb
+ 1.0.6
+
diff --git a/processing/pom.xml b/processing/pom.xml
index 0b562f2a2fa..2f60f53d2bc 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
-
+
4.0.0
io.druid
druid-processing
@@ -28,7 +29,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
@@ -82,6 +83,14 @@
com.davekoelle
alphanum
+
+ net.jpountz.lz4
+ lz4
+
+
+ org.mapdb
+ mapdb
+
@@ -95,10 +104,10 @@
easymock
test
-
- com.google.caliper
- caliper
-
+
+ com.google.caliper
+ caliper
+
diff --git a/processing/src/main/java/io/druid/guice/GuiceInjectors.java b/processing/src/main/java/io/druid/guice/GuiceInjectors.java
index d979938d5d7..2628e2feee4 100644
--- a/processing/src/main/java/io/druid/guice/GuiceInjectors.java
+++ b/processing/src/main/java/io/druid/guice/GuiceInjectors.java
@@ -26,6 +26,7 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import io.druid.jackson.JacksonModule;
+import java.util.Arrays;
import java.util.List;
/**
@@ -37,7 +38,7 @@ public class GuiceInjectors
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
- new PropertiesModule("runtime.properties"),
+ new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")),
new ConfigModule(),
new Module()
{
@@ -56,7 +57,7 @@ public class GuiceInjectors
List theModules = Lists.newArrayList();
theModules.add(new DruidGuiceExtensions());
theModules.add(new JacksonModule());
- theModules.add(new PropertiesModule("runtime.properties"));
+ theModules.add(new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")));
theModules.add(new ConfigModule());
theModules.add(
new Module()
diff --git a/processing/src/main/java/io/druid/guice/PropertiesModule.java b/processing/src/main/java/io/druid/guice/PropertiesModule.java
index 2314134894b..cc766576450 100644
--- a/processing/src/main/java/io/druid/guice/PropertiesModule.java
+++ b/processing/src/main/java/io/druid/guice/PropertiesModule.java
@@ -17,10 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-package io.druid.guice;
+package io.druid.guice;;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
+import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly;
@@ -33,7 +34,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.Reader;
+import java.util.List;
import java.util.Properties;
/**
@@ -42,11 +43,11 @@ public class PropertiesModule implements Module
{
private static final Logger log = new Logger(PropertiesModule.class);
- private final String propertiesFile;
+ private final List propertiesFiles;
- public PropertiesModule(String propertiesFile)
+ public PropertiesModule(List propertiesFiles)
{
- this.propertiesFile = propertiesFile;
+ this.propertiesFiles = propertiesFiles;
}
@Override
@@ -58,30 +59,32 @@ public class PropertiesModule implements Module
Properties props = new Properties(fileProps);
props.putAll(systemProps);
- InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
- try {
- if (stream == null) {
- File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
- if (workingDirectoryFile.exists()) {
- stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
+ for (String propertiesFile : propertiesFiles) {
+ InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+ try {
+ if (stream == null) {
+ File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
+ if (workingDirectoryFile.exists()) {
+ stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
+ }
}
- }
- if (stream != null) {
- log.info("Loading properties from %s", propertiesFile);
- try(Reader reader = new InputStreamReader(stream, Charsets.UTF_8)) {
- fileProps.load(reader);
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
+ if (stream != null) {
+ log.info("Loading properties from %s", propertiesFile);
+ try {
+ fileProps.load(new InputStreamReader(stream, Charsets.UTF_8));
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
}
}
- }
- catch (FileNotFoundException e) {
- log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
- }
- finally {
- CloseQuietly.close(stream);
+ catch (FileNotFoundException e) {
+ log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
+ }
+ finally {
+ CloseQuietly.close(stream);
+ }
}
binder.bind(Properties.class).toInstance(props);
diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java
index a648f484c78..e6191fee6b0 100644
--- a/processing/src/main/java/io/druid/query/BaseQuery.java
+++ b/processing/src/main/java/io/druid/query/BaseQuery.java
@@ -70,14 +70,14 @@ public abstract class BaseQuery implements Query
}
@Override
- public Sequence run(QuerySegmentWalker walker)
+ public Sequence run(QuerySegmentWalker walker, Map context)
{
- return run(querySegmentSpec.lookup(this, walker));
+ return run(querySegmentSpec.lookup(this, walker), context);
}
- public Sequence run(QueryRunner runner)
+ public Sequence run(QueryRunner runner, Map context)
{
- return runner.run(this);
+ return runner.run(this, context);
}
@Override
diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
index 02bb5232e18..c1cf7886bfc 100644
--- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
@@ -26,6 +26,7 @@ import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
/**
*/
@@ -48,11 +49,11 @@ public class BySegmentQueryRunner implements QueryRunner
@Override
@SuppressWarnings("unchecked")
- public Sequence run(final Query query)
+ public Sequence run(final Query query, Map context)
{
if (query.getContextBySegment(false)) {
- final Sequence baseSequence = base.run(query);
+ final Sequence baseSequence = base.run(query, context);
final List results = Sequences.toList(baseSequence, Lists.newArrayList());
return Sequences.simple(
Arrays.asList(
@@ -67,7 +68,6 @@ public class BySegmentQueryRunner implements QueryRunner
)
);
}
-
- return base.run(query);
+ return base.run(query, context);
}
}
diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java
index 13ca4dd75df..5f9651f5222 100644
--- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java
@@ -21,6 +21,8 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
+import java.util.Map;
+
/**
*/
public abstract class BySegmentSkippingQueryRunner implements QueryRunner
@@ -35,14 +37,14 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner
}
@Override
- public Sequence run(Query query)
+ public Sequence run(Query query, Map context)
{
if (query.getContextBySegment(false)) {
- return baseRunner.run(query);
+ return baseRunner.run(query, context);
}
- return doRun(baseRunner, query);
+ return doRun(baseRunner, query, context);
}
- protected abstract Sequence doRun(QueryRunner baseRunner, Query query);
+ protected abstract Sequence doRun(QueryRunner baseRunner, Query query, Map context);
}
diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
index 6b474799de0..e5edfd3d4cf 100644
--- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
@@ -39,6 +39,7 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -93,7 +94,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map context)
{
final int priority = query.getContextPriority(0);
@@ -124,7 +125,11 @@ public class ChainedExecutionQueryRunner implements QueryRunner
public Iterable call() throws Exception
{
try {
- Sequence result = input.run(query);
+ if (input == null) {
+ throw new ISE("Input is null?! How is this possible?!");
+ }
+
+ Sequence result = input.run(query, context);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
diff --git a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java
index 9735d0f5a94..74c4a6481f5 100644
--- a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java
@@ -23,6 +23,9 @@ import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
+import java.util.List;
+import java.util.Map;
+
/**
*/
public class ConcatQueryRunner implements QueryRunner
@@ -36,7 +39,7 @@ public class ConcatQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map context)
{
return Sequences.concat(
Sequences.map(
@@ -46,7 +49,7 @@ public class ConcatQueryRunner implements QueryRunner
@Override
public Sequence apply(final QueryRunner input)
{
- return input.run(query);
+ return input.run(query, context);
}
}
)
diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
index 80c0f2d351b..10bff0a65e2 100644
--- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java
@@ -28,6 +28,10 @@ import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
/**
*/
public class FinalizeResultsQueryRunner implements QueryRunner
@@ -45,7 +49,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, Map context)
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
@@ -98,7 +102,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner
return Sequences.map(
- baseRunner.run(queryToRun),
+ baseRunner.run(queryToRun, context),
finalizerFn
);
diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
index 7e97ce03099..5997518dddd 100644
--- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
@@ -32,16 +32,20 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
+import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
+import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -56,11 +60,13 @@ public class GroupByParallelQueryRunner implements QueryRunner
private final ListeningExecutorService exec;
private final Supplier configSupplier;
private final QueryWatcher queryWatcher;
+ private final StupidPool bufferPool;
public GroupByParallelQueryRunner(
ExecutorService exec,
Supplier configSupplier,
QueryWatcher queryWatcher,
+ StupidPool bufferPool,
Iterable> queryables
)
{
@@ -68,15 +74,17 @@ public class GroupByParallelQueryRunner implements QueryRunner
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
+ this.bufferPool = bufferPool;
}
@Override
- public Sequence run(final Query queryParam)
+ public Sequence run(final Query queryParam, final Map context)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
- configSupplier.get()
+ configSupplier.get(),
+ bufferPool
);
final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false);
@@ -103,10 +111,11 @@ public class GroupByParallelQueryRunner implements QueryRunner
{
try {
if (bySegment) {
- input.run(queryParam)
+ input.run(queryParam, context)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
- input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
+ input.run(queryParam, context)
+ .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
return null;
@@ -140,17 +149,21 @@ public class GroupByParallelQueryRunner implements QueryRunner
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
+ indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query interrupted");
}
catch (CancellationException e) {
+ indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query cancelled");
}
catch (TimeoutException e) {
+ indexAccumulatorPair.lhs.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
+ indexAccumulatorPair.lhs.close();
throw Throwables.propagate(e.getCause());
}
@@ -158,18 +171,20 @@ public class GroupByParallelQueryRunner implements QueryRunner
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}
- return Sequences.simple(
- Iterables.transform(
- indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
- new Function()
- {
- @Override
- public T apply(Row input)
- {
- return (T) input;
- }
- }
- )
+ return new ResourceClosingSequence(
+ Sequences.simple(
+ Iterables.transform(
+ indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
+ new Function()
+ {
+ @Override
+ public T apply(Row input)
+ {
+ return (T) input;
+ }
+ }
+ )
+ ), indexAccumulatorPair.lhs
);
}
}
diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java
index df4c8596217..557420aa377 100644
--- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
*/
@@ -48,10 +49,10 @@ public class IntervalChunkingQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map context)
{
if (period.getMillis() == 0) {
- return baseRunner.run(query);
+ return baseRunner.run(query, context);
}
return Sequences.concat(
@@ -74,7 +75,8 @@ public class IntervalChunkingQueryRunner implements QueryRunner
public Sequence apply(Interval singleInterval)
{
return baseRunner.run(
- query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval)))
+ query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
+ context
);
}
}
diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java
index 0d1d39910fb..5a8005185a7 100644
--- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java
@@ -28,6 +28,8 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
/**
*/
@@ -82,7 +84,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map context)
{
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId();
@@ -100,7 +102,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner
long startTime = System.currentTimeMillis();
try {
- retVal = queryRunner.run(query).accumulate(outType, accumulator);
+ retVal = queryRunner.run(query, context).accumulate(outType, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");
@@ -130,7 +132,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner
long startTime = System.currentTimeMillis();
try {
- retVal = queryRunner.run(query).toYielder(initValue, accumulator);
+ retVal = queryRunner.run(query, context).toYielder(initValue, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");
diff --git a/processing/src/main/java/io/druid/query/NoopQueryRunner.java b/processing/src/main/java/io/druid/query/NoopQueryRunner.java
index 355ee4f7a20..d2f3863ab62 100644
--- a/processing/src/main/java/io/druid/query/NoopQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/NoopQueryRunner.java
@@ -22,12 +22,15 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
+import java.util.List;
+import java.util.Map;
+
/**
*/
public class NoopQueryRunner implements QueryRunner
{
@Override
- public Sequence run(Query query)
+ public Sequence run(Query query, Map context)
{
return Sequences.empty();
}
diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java
index 8c34be7ec28..c255fc46d60 100644
--- a/processing/src/main/java/io/druid/query/Query.java
+++ b/processing/src/main/java/io/druid/query/Query.java
@@ -62,9 +62,9 @@ public interface Query
public String getType();
- public Sequence run(QuerySegmentWalker walker);
+ public Sequence run(QuerySegmentWalker walker, Map context);
- public Sequence run(QueryRunner runner);
+ public Sequence run(QueryRunner runner, Map context);
public List getIntervals();
diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java
index 62c44ad5163..d7a3f8af36f 100644
--- a/processing/src/main/java/io/druid/query/QueryRunner.java
+++ b/processing/src/main/java/io/druid/query/QueryRunner.java
@@ -21,9 +21,11 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
+import java.util.Map;
+
/**
*/
public interface QueryRunner
{
- public Sequence run(Query query);
-}
+ public Sequence run(Query query, Map context);
+}
\ No newline at end of file
diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
index e801e5516ac..736c60f76ab 100644
--- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
@@ -25,6 +25,8 @@ import com.metamx.common.guava.Sequence;
import io.druid.segment.ReferenceCountingSegment;
import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
/**
*/
@@ -43,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, Map context)
{
final Closeable closeable = adapter.increment();
try {
- final Sequence baseSequence = factory.createRunner(adapter).run(query);
+ final Sequence baseSequence = factory.createRunner(adapter).run(query, context);
return new ResourceClosingSequence(baseSequence, closeable);
}
diff --git a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
index 94b670444e8..f5378cfc4ed 100644
--- a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
@@ -24,6 +24,9 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import io.druid.common.guava.CombiningSequence;
+import java.util.List;
+import java.util.Map;
+
/**
*/
public abstract class ResultMergeQueryRunner extends BySegmentSkippingQueryRunner
@@ -36,9 +39,9 @@ public abstract class ResultMergeQueryRunner extends BySegmentSkippingQueryRu
}
@Override
- public Sequence doRun(QueryRunner baseRunner, Query query)
+ public Sequence doRun(QueryRunner baseRunner, Query query, Map context)
{
- return CombiningSequence.create(baseRunner.run(query), makeOrdering(query), createMergeFn(query));
+ return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
}
protected abstract Ordering makeOrdering(Query query);
diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java
new file mode 100644
index 00000000000..9f3921bd87d
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java
@@ -0,0 +1,113 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.metamx.common.guava.Sequence;
+import com.metamx.common.guava.Yielder;
+import com.metamx.common.guava.YieldingAccumulator;
+import com.metamx.common.guava.YieldingSequenceBase;
+import io.druid.query.spec.MultipleSpecificSegmentSpec;
+import io.druid.segment.SegmentMissingException;
+
+import java.util.List;
+import java.util.Map;
+
+public class RetryQueryRunner implements QueryRunner
+{
+ public static String MISSING_SEGMENTS_KEY = "missingSegments";
+
+ private final QueryRunner baseRunner;
+ private final RetryQueryRunnerConfig config;
+ private final ObjectMapper jsonMapper;
+
+ public RetryQueryRunner(
+ QueryRunner baseRunner,
+ RetryQueryRunnerConfig config,
+ ObjectMapper jsonMapper
+ )
+ {
+ this.baseRunner = baseRunner;
+ this.config = config;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public Sequence run(final Query query, final Map context)
+ {
+ final Sequence returningSeq = baseRunner.run(query, context);
+
+ return new YieldingSequenceBase()
+ {
+ @Override
+ public Yielder toYielder(
+ OutType initValue, YieldingAccumulator accumulator
+ )
+ {
+ Yielder yielder = returningSeq.toYielder(initValue, accumulator);
+
+ final List missingSegments = getMissingSegments(context);
+
+ if (missingSegments.isEmpty()) {
+ return yielder;
+ }
+
+ for (int i = 0; i < config.numTries(); i++) {
+ context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList());
+ final Query retryQuery = query.withQuerySegmentSpec(
+ new MultipleSpecificSegmentSpec(
+ missingSegments
+ )
+ );
+ yielder = baseRunner.run(retryQuery, context).toYielder(initValue, accumulator);
+ if (getMissingSegments(context).isEmpty()) {
+ break;
+ }
+ }
+
+ final List finalMissingSegs = getMissingSegments(context);
+ if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) {
+ throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
+ }
+
+ return yielder;
+ }
+ };
+ }
+
+ private List getMissingSegments(final Map context)
+ {
+ final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY);
+ if (maybeMissingSegments == null) {
+ return Lists.newArrayList();
+ }
+
+ return jsonMapper.convertValue(
+ maybeMissingSegments,
+ new TypeReference>()
+ {
+ }
+ );
+ }
+}
+
diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java
new file mode 100644
index 00000000000..2b8bb730b68
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RetryQueryRunnerConfig
+{
+ @JsonProperty
+ private int numTries = 0;
+ @JsonProperty
+ private boolean returnPartialResults = false;
+
+ public int numTries() { return numTries; }
+ public boolean returnPartialResults() { return returnPartialResults; }
+}
diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java
index 8e13d9219e9..d16a660e25a 100644
--- a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java
@@ -23,6 +23,9 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
+import java.util.List;
+import java.util.Map;
+
/**
* If there's a subquery, run it instead of the outer query
*/
@@ -36,13 +39,13 @@ public class SubqueryQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, Map context)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
- return run((Query) ((QueryDataSource) dataSource).getQuery());
+ return run((Query) ((QueryDataSource) dataSource).getQuery(), context);
} else {
- return baseRunner.run(query);
+ return baseRunner.run(query, context);
}
}
}
diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java
index c59908172d3..2f26f6890f1 100644
--- a/processing/src/main/java/io/druid/query/TimewarpOperator.java
+++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java
@@ -33,6 +33,8 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Arrays;
+import java.util.Map;
+
/**
* TimewarpOperator is an example post-processing operator that maps current time
@@ -79,7 +81,7 @@ public class TimewarpOperator implements PostProcessingOperator
return new QueryRunner()
{
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map context)
{
final long offset = computeOffset(now);
@@ -90,7 +92,8 @@ public class TimewarpOperator implements PostProcessingOperator
);
return Sequences.map(
baseRunner.run(
- query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval)))
+ query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
+ context
),
new Function()
{
diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java
index f14bb180f62..2426bda9310 100644
--- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java
@@ -26,6 +26,9 @@ import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
+import java.util.List;
+import java.util.Map;
+
public class UnionQueryRunner implements QueryRunner
{
private final QueryRunner baseRunner;
@@ -41,7 +44,7 @@ public class UnionQueryRunner implements QueryRunner
}
@Override
- public Sequence run(final Query query)
+ public Sequence run(final Query query, final Map