diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 86ecc759553..d129375e3b9 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index 4e828108927..7dc9cfa1e38 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/common/src/main/java/io/druid/collections/OrderedMergeSequence.java b/common/src/main/java/io/druid/collections/OrderedMergeSequence.java
index 1b076c4c299..cfc29682d0d 100644
--- a/common/src/main/java/io/druid/collections/OrderedMergeSequence.java
+++ b/common/src/main/java/io/druid/collections/OrderedMergeSequence.java
@@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.metamx.common.guava.Accumulator;
+import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.Yielders;
@@ -70,7 +71,7 @@ public class OrderedMergeSequence implements Sequence
return yielder.get();
}
finally {
- Closeables.closeQuietly(yielder);
+ CloseQuietly.close(yielder);
}
}
diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java
index 28cdf7629a3..b5dc650de5c 100644
--- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java
+++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java
@@ -46,14 +46,9 @@ public class SerializerUtils
public void writeString(OutputSupplier extends OutputStream> supplier, String name) throws IOException
{
- OutputStream out = null;
- try {
- out = supplier.getOutput();
+ try (OutputStream out = supplier.getOutput()) {
writeString(out, name);
}
- finally {
- Closeables.closeQuietly(out);
- }
}
public void writeString(WritableByteChannel out, String name) throws IOException
diff --git a/docs/content/Indexing-Service-Config.md b/docs/content/Indexing-Service-Config.md
index 0cd9e9978ea..122a6623504 100644
--- a/docs/content/Indexing-Service-Config.md
+++ b/docs/content/Indexing-Service-Config.md
@@ -69,10 +69,9 @@ A sample worker setup spec is shown below:
"keyName":"keyName"
},
"userData":{
- "classType":"galaxy",
- "env":"druid",
- "version":"druid_version",
- "type":"sample_cluster/worker"
+ "impl":"string",
+ "data":"version=:VERSION:",
+ "versionReplacementString":":VERSION:"
}
}
```
@@ -81,8 +80,8 @@ Issuing a GET request at the same URL will return the current worker setup spec
|Property|Description|Default|
|--------|-----------|-------|
-|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be the same as the coordinator version.|none|
+|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none|
|`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0|
|`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0|
-|`nodeData`|A JSON object that contains metadata about new nodes to create.|none|
-|`userData`|A JSON object that contains metadata about how the node should register itself on startup. This data is sent with node creation requests.|none|
+|`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required|
+|`userData`|A JSON object that describes how to configure new nodes. Currently, only EC2 is supported. If you have set druid.indexer.autoscale.workerVersion, this must have a versionReplacementString. Otherwise, a versionReplacementString is not necessary.|none; optional|
diff --git a/docs/content/Peons.md b/docs/content/Peons.md
index b5d16e1df2a..87b99fad362 100644
--- a/docs/content/Peons.md
+++ b/docs/content/Peons.md
@@ -22,6 +22,7 @@ Additional peon configs include:
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|/tmp/persistent/tasks|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
+|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.chathandler.type`|Choices are "noop" and "announce". Certain tasks will use service discovery to announce an HTTP endpoint that events can be posted to.|noop|
If the peon is running in remote mode, there must be an overlord up and running. Running peons in remote mode require the following configurations:
diff --git a/examples/pom.xml b/examples/pom.xml
index 5b9cbc47847..5789834bac3 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 46b6a44ce6d..7d8308c04b7 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java
index 6af99afcbcd..6b6f002f774 100644
--- a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java
+++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java
@@ -21,6 +21,7 @@ package io.druid.storage.hdfs;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
+import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
@@ -52,22 +53,17 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
final FileSystem fs = checkPathAndGetFilesystem(path);
- FSDataInputStream in = null;
- try {
- if (path.getName().endsWith(".zip")) {
- in = fs.open(path);
- CompressionUtils.unzip(in, dir);
- in.close();
+ if (path.getName().endsWith(".zip")) {
+ try {
+ try (FSDataInputStream in = fs.open(path)) {
+ CompressionUtils.unzip(in, dir);
+ }
}
- else {
- throw new SegmentLoadingException("Unknown file type[%s]", path);
+ catch (IOException e) {
+ throw new SegmentLoadingException(e, "Some IOException");
}
- }
- catch (IOException e) {
- throw new SegmentLoadingException(e, "Some IOException");
- }
- finally {
- Closeables.closeQuietly(in);
+ } else {
+ throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
@@ -85,7 +81,8 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
}
}
- private Path getPath(DataSegment segment) {
+ private Path getPath(DataSegment segment)
+ {
return new Path(String.valueOf(segment.getLoadSpec().get("path")));
}
diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java
index 2bb36abb2a0..239cfc01d04 100644
--- a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.google.inject.Inject;
+import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
@@ -78,17 +79,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
- FSDataOutputStream out = null;
+
long size;
- try {
- out = fs.create(outFile);
-
+ try (FSDataOutputStream out = fs.create(outFile)) {
size = CompressionUtils.zip(inDir, out);
-
- out.close();
- }
- finally {
- Closeables.closeQuietly(out);
}
return createDescriptorFile(
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 6decbae2e15..7d2a9c8de06 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index e06b63871e3..6dd97b920ba 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -31,6 +31,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
+import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
@@ -425,7 +426,7 @@ public class IndexGeneratorJob implements Jobby
if (caughtException == null) {
Closeables.close(out, false);
} else {
- Closeables.closeQuietly(out);
+ CloseQuietly.close(out);
throw Throwables.propagate(caughtException);
}
}
@@ -605,7 +606,7 @@ public class IndexGeneratorJob implements Jobby
}
}
finally {
- Closeables.closeQuietly(in);
+ CloseQuietly.close(in);
}
out.closeEntry();
context.progress();
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 9db9badd919..aa45a4e2b81 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java
index 8651a65e70d..1671fb84bd3 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java
@@ -21,11 +21,17 @@ package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.util.List;
public class TaskConfig
{
+ public static List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
+ "org.apache.hadoop:hadoop-client:2.3.0"
+ );
+
@JsonProperty
private final String baseDir;
@@ -38,40 +44,57 @@ public class TaskConfig
@JsonProperty
private final int defaultRowFlushBoundary;
+ @JsonProperty
+ private final List defaultHadoopCoordinates;
+
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
- @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary
+ @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
+ @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing");
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary;
+ this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
+ ? DEFAULT_DEFAULT_HADOOP_COORDINATES
+ : defaultHadoopCoordinates;
}
+ @JsonProperty
public String getBaseDir()
{
return baseDir;
}
+ @JsonProperty
public File getBaseTaskDir()
{
return baseTaskDir;
}
+ @JsonProperty
public String getHadoopWorkingPath()
{
return hadoopWorkingPath;
}
+ @JsonProperty
public int getDefaultRowFlushBoundary()
{
return defaultRowFlushBoundary;
}
+ @JsonProperty
+ public List getDefaultHadoopCoordinates()
+ {
+ return defaultHadoopCoordinates;
+ }
+
private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {
@@ -80,4 +103,4 @@ public class TaskConfig
return configParameter;
}
-}
\ No newline at end of file
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 4f017b96ce6..ce6324e32e8 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -27,6 +27,7 @@ import com.google.api.client.util.Lists;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
@@ -65,8 +66,6 @@ public class HadoopIndexTask extends AbstractTask
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
}
- public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
-
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
{
if (spec != null) {
@@ -115,9 +114,14 @@ public class HadoopIndexTask extends AbstractTask
Preconditions.checkArgument(this.spec.getTuningConfig().getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(this.spec.getIOConfig().getMetadataUpdateSpec() == null, "updaterJobSpec must be absent");
- this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.asList(
- hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates
- ) : hadoopDependencyCoordinates;
+ if (hadoopDependencyCoordinates != null) {
+ this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
+ } else if (hadoopCoordinates != null) {
+ this.hadoopDependencyCoordinates = ImmutableList.of(hadoopCoordinates);
+ } else {
+ // Will be defaulted to something at runtime, based on taskConfig.
+ this.hadoopDependencyCoordinates = null;
+ }
}
@Override
@@ -158,6 +162,10 @@ public class HadoopIndexTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
+ final List finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
+ ? hadoopDependencyCoordinates
+ : toolbox.getConfig().getDefaultHadoopCoordinates();
+
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
final List extensionURLs = Lists.newArrayList();
@@ -174,7 +182,7 @@ public class HadoopIndexTask extends AbstractTask
final List driverURLs = Lists.newArrayList();
driverURLs.addAll(nonHadoopURLs);
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
- for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
+ for (String hadoopDependencyCoordinate : finalHadoopDependencyCoordinates) {
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopDependencyCoordinate
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index e0fc20d1216..84ef51a9c4f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -24,9 +24,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
-import com.google.common.io.Closeables;
import com.metamx.common.Granularity;
-import com.metamx.common.exception.FormattedException;
+import com.metamx.common.guava.CloseQuietly;
+import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@@ -44,8 +44,8 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema;
-import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentConfig;
@@ -353,7 +353,7 @@ public class RealtimeIndexTask extends AbstractTask
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
- catch (FormattedException e) {
+ catch (ParseException e) {
log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable();
}
@@ -375,7 +375,7 @@ public class RealtimeIndexTask extends AbstractTask
log.makeAlert(e, "Failed to finish realtime task").emit();
}
finally {
- Closeables.closeQuietly(firehose);
+ CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
index a20d2a42dbc..c546fbcce06 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
@@ -585,7 +585,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
- Stopwatch timeoutStopwatch = new Stopwatch();
+ Stopwatch timeoutStopwatch = Stopwatch.createUnstarted();
timeoutStopwatch.start();
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task.getId())) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
index 9081e7323d1..421530f520d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategy.java
@@ -29,19 +29,14 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
-import io.druid.guice.annotations.Json;
import io.druid.indexing.overlord.setup.EC2NodeData;
-import io.druid.indexing.overlord.setup.GalaxyUserData;
import io.druid.indexing.overlord.setup.WorkerSetupData;
-import org.apache.commons.codec.binary.Base64;
-import javax.annotation.Nullable;
import java.util.List;
/**
@@ -50,20 +45,17 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
- private final ObjectMapper jsonMapper;
private final AmazonEC2 amazonEC2Client;
private final SimpleResourceManagementConfig config;
private final Supplier workerSetupDataRef;
@Inject
public EC2AutoScalingStrategy(
- @Json ObjectMapper jsonMapper,
AmazonEC2 amazonEC2Client,
SimpleResourceManagementConfig config,
Supplier workerSetupDataRef
)
{
- this.jsonMapper = jsonMapper;
this.amazonEC2Client = amazonEC2Client;
this.config = config;
this.workerSetupDataRef = workerSetupDataRef;
@@ -73,15 +65,21 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
public AutoScalingData provision()
{
try {
- WorkerSetupData setupData = workerSetupDataRef.get();
- EC2NodeData workerConfig = setupData.getNodeData();
+ final WorkerSetupData setupData = workerSetupDataRef.get();
+ final EC2NodeData workerConfig = setupData.getNodeData();
+ final String userDataBase64;
- GalaxyUserData userData = setupData.getUserData();
- if (config.getWorkerVersion() != null) {
- userData = userData.withVersion(config.getWorkerVersion());
+ if (setupData.getUserData() == null) {
+ userDataBase64 = null;
+ } else {
+ if (config.getWorkerVersion() == null) {
+ userDataBase64 = setupData.getUserData().getUserDataBase64();
+ } else {
+ userDataBase64 = setupData.getUserData().withVersion(config.getWorkerVersion()).getUserDataBase64();
+ }
}
- RunInstancesResult result = amazonEC2Client.runInstances(
+ final RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest(
workerConfig.getAmiId(),
workerConfig.getMinInstances(),
@@ -91,16 +89,10 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withPlacement(new Placement(setupData.getAvailabilityZone()))
.withKeyName(workerConfig.getKeyName())
- .withUserData(
- Base64.encodeBase64String(
- jsonMapper.writeValueAsBytes(
- userData
- )
- )
- )
+ .withUserData(userDataBase64)
);
- List instanceIds = Lists.transform(
+ final List instanceIds = Lists.transform(
result.getReservation().getInstances(),
new Function()
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EC2UserData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EC2UserData.java
new file mode 100644
index 00000000000..3568bf45b7b
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EC2UserData.java
@@ -0,0 +1,42 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.overlord.setup;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Represents any user data that may be needed to launch EC2 instances.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "impl", defaultImpl = GalaxyEC2UserData.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "galaxy", value = GalaxyEC2UserData.class),
+ @JsonSubTypes.Type(name = "string", value = StringEC2UserData.class)
+})
+public interface EC2UserData
+{
+ /**
+ * Return a copy of this instance with a different worker version. If no changes are needed (possibly because the
+ * user data does not depend on the worker version) then it is OK to return "this".
+ */
+ public EC2UserData withVersion(String version);
+
+ public String getUserDataBase64();
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyUserData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyEC2UserData.java
similarity index 63%
rename from indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyUserData.java
rename to indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyEC2UserData.java
index ee721e9ac8c..bfc822dd52c 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyUserData.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/GalaxyEC2UserData.java
@@ -19,24 +19,32 @@
package io.druid.indexing.overlord.setup;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
+import io.druid.guice.annotations.Json;
+import org.apache.commons.codec.binary.Base64;
/**
*/
-public class GalaxyUserData
+public class GalaxyEC2UserData implements EC2UserData
{
- public final String env;
- public final String version;
- public final String type;
+ private final ObjectMapper jsonMapper;
+ private final String env;
+ private final String version;
+ private final String type;
@JsonCreator
- public GalaxyUserData(
+ public GalaxyEC2UserData(
+ @JacksonInject @Json ObjectMapper jsonMapper,
@JsonProperty("env") String env,
@JsonProperty("version") String version,
@JsonProperty("type") String type
)
{
+ this.jsonMapper = jsonMapper;
this.env = env;
this.version = version;
this.type = type;
@@ -60,9 +68,21 @@ public class GalaxyUserData
return type;
}
- public GalaxyUserData withVersion(String ver)
+ @Override
+ public GalaxyEC2UserData withVersion(String ver)
{
- return new GalaxyUserData(env, ver, type);
+ return new GalaxyEC2UserData(jsonMapper, env, ver, type);
+ }
+
+ @Override
+ public String getUserDataBase64()
+ {
+ try {
+ return Base64.encodeBase64String(jsonMapper.writeValueAsBytes(this));
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/StringEC2UserData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/StringEC2UserData.java
new file mode 100644
index 00000000000..50f61fc1f67
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/StringEC2UserData.java
@@ -0,0 +1,90 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.overlord.setup;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.util.Charsets;
+import org.apache.commons.codec.binary.Base64;
+
+public class StringEC2UserData implements EC2UserData
+{
+ private final String data;
+ private final String versionReplacementString;
+ private final String version;
+
+ @JsonCreator
+ public StringEC2UserData(
+ @JsonProperty("data") String data,
+ @JsonProperty("versionReplacementString") String versionReplacementString,
+ @JsonProperty("version") String version
+ )
+ {
+ this.data = data;
+ this.versionReplacementString = versionReplacementString;
+ this.version = version;
+ }
+
+ @JsonProperty
+ public String getData()
+ {
+ return data;
+ }
+
+ @JsonProperty
+ public String getVersionReplacementString()
+ {
+ return versionReplacementString;
+ }
+
+ @JsonProperty
+ public String getVersion()
+ {
+ return version;
+ }
+
+ @Override
+ public StringEC2UserData withVersion(final String _version)
+ {
+ return new StringEC2UserData(data, versionReplacementString, _version);
+ }
+
+ @Override
+ public String getUserDataBase64()
+ {
+ final String finalData;
+ if (versionReplacementString != null && version != null) {
+ finalData = data.replace(versionReplacementString, version);
+ } else {
+ finalData = data;
+ }
+ return Base64.encodeBase64String(finalData.getBytes(Charsets.UTF_8));
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StringEC2UserData{" +
+ "data='" + data + '\'' +
+ ", versionReplacementString='" + versionReplacementString + '\'' +
+ ", version='" + version + '\'' +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java
index 47bb145eefe..e792f347aed 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java
@@ -33,7 +33,7 @@ public class WorkerSetupData
private final int maxNumWorkers;
private final String availabilityZone;
private final EC2NodeData nodeData;
- private final GalaxyUserData userData;
+ private final EC2UserData userData;
@JsonCreator
public WorkerSetupData(
@@ -42,7 +42,7 @@ public class WorkerSetupData
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("availabilityZone") String availabilityZone,
@JsonProperty("nodeData") EC2NodeData nodeData,
- @JsonProperty("userData") GalaxyUserData userData
+ @JsonProperty("userData") EC2UserData userData
)
{
this.minVersion = minVersion;
@@ -84,7 +84,7 @@ public class WorkerSetupData
}
@JsonProperty
- public GalaxyUserData getUserData()
+ public EC2UserData getUserData()
{
return userData;
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
index 2ec6c474e6c..8ea9e596d83 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java
@@ -19,19 +19,51 @@
package io.druid.indexing.common;
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.metamx.common.ISE;
+import io.druid.guice.ServerModule;
+import io.druid.jackson.DefaultObjectMapper;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*/
public class TestUtils
{
+ public static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
+ static {
+ final List extends Module> list = new ServerModule().getJacksonModules();
+ for (Module module : list) {
+ MAPPER.registerModule(module);
+ }
+ MAPPER.setInjectableValues(
+ new InjectableValues()
+ {
+ @Override
+ public Object findInjectableValue(
+ Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
+ )
+ {
+ if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
+ return TestUtils.MAPPER;
+ }
+ throw new ISE("No Injectable value found");
+ }
+ }
+ );
+ }
+
public static boolean conditionValid(IndexingServiceCondition condition)
{
try {
- Stopwatch stopwatch = new Stopwatch();
+ Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
while (!condition.isValid()) {
Thread.sleep(100);
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index 30325c9d398..370952edfda 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -135,7 +135,7 @@ public class TaskLifecycleTest
mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory(
- new TaskConfig(tmp.toString(), null, null, 50000),
+ new TaskConfig(tmp.toString(), null, null, 50000, null),
tac,
newMockEmitter(),
new DataSegmentPusher()
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java
new file mode 100644
index 00000000000..39b35dd951a
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java
@@ -0,0 +1,62 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.google.common.base.Charsets;
+import io.druid.indexing.common.TestUtils;
+import io.druid.indexing.overlord.setup.EC2UserData;
+import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
+import io.druid.indexing.overlord.setup.StringEC2UserData;
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class WorkerSetupDataTest
+{
+ @Test
+ public void testGalaxyEC2UserDataSerde() throws IOException
+ {
+ final String json = "{\"env\":\"druid\",\"version\":null,\"type\":\"typical\"}";
+ final GalaxyEC2UserData userData = (GalaxyEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
+ Assert.assertEquals("druid", userData.getEnv());
+ Assert.assertEquals("typical", userData.getType());
+ Assert.assertNull(userData.getVersion());
+ Assert.assertEquals("1234", userData.withVersion("1234").getVersion());
+ }
+
+ @Test
+ public void testStringEC2UserDataSerde() throws IOException
+ {
+ final String json = "{\"impl\":\"string\",\"data\":\"hey :ver:\",\"versionReplacementString\":\":ver:\",\"version\":\"1234\"}";
+ final StringEC2UserData userData = (StringEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
+ Assert.assertEquals("hey :ver:", userData.getData());
+ Assert.assertEquals("1234", userData.getVersion());
+ Assert.assertEquals(
+ Base64.encodeBase64String("hey 1234".getBytes(Charsets.UTF_8)),
+ userData.getUserDataBase64()
+ );
+ Assert.assertEquals(
+ Base64.encodeBase64String("hey xyz".getBytes(Charsets.UTF_8)),
+ userData.withVersion("xyz").getUserDataBase64()
+ );
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
index 1fd3510b45a..1ccacc66df4 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java
@@ -30,7 +30,7 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import io.druid.common.guava.DSuppliers;
import io.druid.indexing.overlord.setup.EC2NodeData;
-import io.druid.indexing.overlord.setup.GalaxyUserData;
+import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
@@ -75,7 +75,6 @@ public class EC2AutoScalingStrategyTest
.withPrivateIpAddress(IP);
strategy = new EC2AutoScalingStrategy(
- new DefaultObjectMapper(),
amazonEC2Client,
new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""),
DSuppliers.of(workerSetupData)
@@ -101,7 +100,7 @@ public class EC2AutoScalingStrategyTest
1,
"",
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo"),
- new GalaxyUserData("env", "version", "type")
+ new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
)
);
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
index 8d4bf32b870..bad6fae4939 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -121,7 +121,7 @@ public class WorkerTaskMonitorTest
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
- new TaskConfig(tmp.toString(), null, null, 0),
+ new TaskConfig(tmp.toString(), null, null, 0, null),
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.of(
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index 5840e57e491..05393555a35 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java
index ab7987f93ee..a97f18ae1b7 100644
--- a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java
+++ b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
@@ -115,7 +114,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java
index 9448626723d..671c2246b60 100644
--- a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java
+++ b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
@@ -123,7 +122,7 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory
-
+
4.0.0
io.druid
druid
pom
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
druid
druid
scm:git:ssh://git@github.com/metamx/druid.git
scm:git:ssh://git@github.com/metamx/druid.git
http://www.github.com/metamx/druid
- druid-0.6.107-SNAPSHOT
+ druid-0.6.117-SNAPSHOT
@@ -40,9 +39,9 @@
UTF-8
- 0.25.6
- 2.4.0
- 0.2.3
+ 0.26.5
+ 2.5.0
+ 0.2.4
@@ -199,22 +198,22 @@
com.google.guava
guava
- 14.0.1
+ 17.0
com.google.inject
guice
- 4.0-beta4
+ 4.0-beta
com.google.inject.extensions
guice-servlet
- 4.0-beta4
+ 4.0-beta
com.google.inject.extensions
guice-multibindings
- 4.0-beta4
+ 4.0-beta
com.ibm.icu
@@ -562,15 +561,7 @@
org.apache.maven.plugins
maven-release-plugin
- 2.4.2
-
-
- org.apache.maven.scm
- maven-scm-provider-gitexe
-
- 1.8.1
-
-
+ 2.5
diff --git a/processing/pom.xml b/processing/pom.xml
index f23eb94be46..755d2f553f7 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.122-SNAPSHOT
+ 0.6.129-SNAPSHOT
diff --git a/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java b/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java
index 63620ffd498..df8bdb5cef8 100644
--- a/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java
+++ b/processing/src/main/java/io/druid/data/input/ProtoBufInputRowParser.java
@@ -28,10 +28,8 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DimensionsSpec;
-import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
@@ -94,7 +92,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
}
@Override
- public InputRow parse(ByteBuffer input) throws FormattedException
+ public InputRow parse(ByteBuffer input)
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
index 6184221a1db..a0cc2b87f73 100644
--- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
+++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
@@ -132,17 +132,17 @@ public class DruidDefaultSerializersModule extends SimpleModule
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
- jgen.writeStartArray();
try {
+ jgen.writeStartArray();
while (!yielder.isDone()) {
final Object o = yielder.get();
jgen.writeObject(o);
yielder = yielder.next(null);
}
+ jgen.writeEndArray();
} finally {
yielder.close();
}
- jgen.writeEndArray();
}
}
);
diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
index a537c7d48f5..12c0c3ec731 100644
--- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java
@@ -28,6 +28,7 @@ import com.metamx.common.guava.Yielders;
import com.metamx.common.guava.YieldingAccumulator;
import org.joda.time.DateTime;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -55,16 +56,11 @@ public class BySegmentQueryRunner implements QueryRunner
public Sequence run(final Query query, Map context)
{
if (query.getContextBySegment(false)) {
+
final Sequence baseSequence = base.run(query, context);
- return new Sequence()
- {
- @Override
- public OutType accumulate(OutType initValue, Accumulator accumulator)
- {
- List results = Sequences.toList(baseSequence, Lists.newArrayList());
-
- return accumulator.accumulate(
- initValue,
+ final List results = Sequences.toList(baseSequence, Lists.newArrayList());
+ return Sequences.simple(
+ Arrays.asList(
(T) new Result>(
timestamp,
new BySegmentResultValueClass(
@@ -73,29 +69,8 @@ public class BySegmentQueryRunner implements QueryRunner
query.getIntervals().get(0)
)
)
- );
- }
-
- @Override
- public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator)
- {
- List results = Sequences.toList(baseSequence, Lists.newArrayList());
-
- final OutType retVal = accumulator.accumulate(
- initValue,
- (T) new Result>(
- timestamp,
- new BySegmentResultValueClass(
- results,
- segmentIdentifier,
- query.getIntervals().get(0)
- )
- )
- );
-
- return Yielders.done(retVal, null);
- }
- };
+ )
+ );
}
return base.run(query, context);
}
diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java
index 084652b322f..932af432ad1 100644
--- a/processing/src/main/java/io/druid/query/Druids.java
+++ b/processing/src/main/java/io/druid/query/Druids.java
@@ -692,12 +692,14 @@ public class Druids
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
+ private String bound;
private Map context;
public TimeBoundaryQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
+ bound = null;
context = null;
}
@@ -706,6 +708,7 @@ public class Druids
return new TimeBoundaryQuery(
dataSource,
querySegmentSpec,
+ bound,
context
);
}
@@ -715,6 +718,7 @@ public class Druids
return new TimeBoundaryQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
+ .bound(builder.bound)
.context(builder.context);
}
@@ -748,6 +752,12 @@ public class Druids
return this;
}
+ public TimeBoundaryQueryBuilder bound(String b)
+ {
+ bound = b;
+ return this;
+ }
+
public TimeBoundaryQueryBuilder context(Map c)
{
context = c;
diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
index 48db79f1ec3..7a60141cd7b 100644
--- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
@@ -89,7 +89,6 @@ public class GroupByParallelQueryRunner implements QueryRunner
@Override
public Sequence run(final Query queryParam, final Map context)
{
-
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
index f104d8db026..736c60f76ab 100644
--- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java
@@ -19,7 +19,7 @@
package io.druid.query;
-import com.google.common.io.Closeables;
+import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import io.druid.segment.ReferenceCountingSegment;
@@ -54,7 +54,7 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner
return new ResourceClosingSequence(baseSequence, closeable);
}
catch (RuntimeException e) {
- Closeables.closeQuietly(closeable);
+ CloseQuietly.close(closeable);
throw e;
}
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
index 87077e45b31..ad355c71b6b 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
@@ -26,9 +26,9 @@ import java.util.List;
/**
* Processing related interface
- *
+ *
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
- *
+ *
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
@@ -37,7 +37,9 @@ import java.util.List;
public interface AggregatorFactory
{
public Aggregator factorize(ColumnSelectorFactory metricFactory);
+
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
+
public Comparator getComparator();
/**
@@ -48,6 +50,7 @@ public interface AggregatorFactory
*
* @param lhs The left hand side of the combine
* @param rhs The right hand side of the combine
+ *
* @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs
*/
public Object combine(Object lhs, Object rhs);
@@ -61,11 +64,19 @@ public interface AggregatorFactory
*/
public AggregatorFactory getCombiningFactory();
+ /**
+ * Gets a list of all columns that this AggregatorFactory will scan
+ *
+ * @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
+ */
+ public List getRequiredColumns();
+
/**
* A method that knows how to "deserialize" the object from whatever form it might have been put into
* in order to transfer via JSON.
*
* @param object the object to deserialize
+ *
* @return the deserialized object
*/
public Object deserialize(Object object);
@@ -75,13 +86,17 @@ public interface AggregatorFactory
* intermediate format than their final resultant output.
*
* @param object the object to be finalized
+ *
* @return the finalized value that should be returned for the initial query
*/
public Object finalizeComputation(Object object);
public String getName();
+
public List requiredFields();
+
public byte[] getCacheKey();
+
public String getTypeName();
/**
diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java
index e47999e8719..9549015f687 100644
--- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
import io.druid.segment.ColumnSelectorFactory;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -76,6 +77,12 @@ public class CountAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new CountAggregatorFactory(name));
+ }
+
@Override
public Object deserialize(Object object)
{
@@ -136,12 +143,18 @@ public class CountAggregatorFactory implements AggregatorFactory
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
CountAggregatorFactory that = (CountAggregatorFactory) o;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (name != null ? !name.equals(that.name) : that.name != null) {
+ return false;
+ }
return true;
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
index c7f3eba75f4..083a16589dc 100644
--- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
@@ -85,6 +85,12 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
return new DoubleSumAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
+ }
+
@Override
public Object deserialize(Object object)
{
@@ -158,13 +164,21 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
- if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
+ return false;
+ }
+ if (name != null ? !name.equals(that.name) : that.name != null) {
+ return false;
+ }
return true;
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java
index 060d40d2798..422d8279a2e 100644
--- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java
@@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
this.name = name;
this.fieldName = fieldName;
- this.breaksList = (breaksList == null) ? Lists.newArrayList() :breaksList;
+ this.breaksList = (breaksList == null) ? Lists.newArrayList() : breaksList;
this.breaks = new float[this.breaksList.size()];
for (int i = 0; i < this.breaksList.size(); ++i) {
this.breaks[i] = this.breaksList.get(i);
@@ -100,6 +100,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory
return new HistogramAggregatorFactory(name, name, breaksList);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new HistogramAggregatorFactory(fieldName, fieldName, breaksList));
+ }
+
@Override
public Object deserialize(Object object)
{
@@ -183,15 +189,27 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
HistogramAggregatorFactory that = (HistogramAggregatorFactory) o;
- if (!Arrays.equals(breaks, that.breaks)) return false;
- if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false;
- if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (!Arrays.equals(breaks, that.breaks)) {
+ return false;
+ }
+ if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) {
+ return false;
+ }
+ if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
+ return false;
+ }
+ if (name != null ? !name.equals(that.name) : that.name != null) {
+ return false;
+ }
return true;
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
index 0c1e4d2ad9d..fc5603f2fe9 100644
--- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
@@ -140,6 +140,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Lists.transform(
+ fieldNames,
+ new com.google.common.base.Function()
+ {
+ @Override
+ public AggregatorFactory apply(String input)
+ {
+ return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine);
+ }
+ }
+ );
+ }
+
@Override
public Object deserialize(Object object)
{
diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java
index 50ef5130756..963d9458beb 100644
--- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java
@@ -31,11 +31,11 @@ import java.util.Comparator;
import java.util.List;
/**
-*/
+ */
public class LongSumAggregatorFactory implements AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x1;
-
+
private final String fieldName;
private final String name;
@@ -85,6 +85,12 @@ public class LongSumAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new LongSumAggregatorFactory(fieldName, fieldName));
+ }
+
@Override
public Object deserialize(Object object)
{
@@ -154,13 +160,21 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
LongSumAggregatorFactory that = (LongSumAggregatorFactory) o;
- if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
+ return false;
+ }
+ if (name != null ? !name.equals(that.name) : that.name != null) {
+ return false;
+ }
return true;
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
index b731c4319e7..56e6de5f56e 100644
--- a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
@@ -82,6 +82,12 @@ public class MaxAggregatorFactory implements AggregatorFactory
return new MaxAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new MaxAggregatorFactory(fieldName, fieldName));
+ }
+
@Override
public Object deserialize(Object object)
{
diff --git a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
index d3956c94b52..e7256d0ccb9 100644
--- a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
@@ -82,6 +82,12 @@ public class MinAggregatorFactory implements AggregatorFactory
return new MinAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new MinAggregatorFactory(fieldName, fieldName));
+ }
+
@Override
public Object deserialize(Object object)
{
diff --git a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java
index 6c559ba8ec6..a884bb9be4d 100644
--- a/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/ToLowerCaseAggregatorFactory.java
@@ -65,6 +65,12 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
return baseAggregatorFactory.getCombiningFactory();
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return baseAggregatorFactory.getRequiredColumns();
+ }
+
@Override
public Object deserialize(Object object)
{
diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java
index 85a07fd0715..afd893afc3f 100644
--- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java
+++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java
@@ -54,7 +54,7 @@ public class CardinalityAggregator implements Aggregator
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) {
final String value = selector.lookupName(row.get(0));
- hasher.putString(value != null ? value : NULL_STRING);
+ hasher.putUnencodedChars(value != null ? value : NULL_STRING);
} else if (size != 0) {
final String[] values = new String[size];
for (int i = 0; i < size; ++i) {
@@ -67,7 +67,7 @@ public class CardinalityAggregator implements Aggregator
if (i != 0) {
hasher.putChar(SEPARATOR);
}
- hasher.putString(values[i]);
+ hasher.putUnencodedChars(values[i]);
}
}
}
@@ -79,7 +79,7 @@ public class CardinalityAggregator implements Aggregator
for (final DimensionSelector selector : selectors) {
for (final Integer index : selector.getRow()) {
final String value = selector.lookupName(index);
- collector.add(hashFn.hashString(value == null ? NULL_STRING : value).asBytes());
+ collector.add(hashFn.hashUnencodedChars(value == null ? NULL_STRING : value).asBytes());
}
}
}
diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
index 2978dba9d9f..10443c828cd 100644
--- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
@@ -32,12 +32,14 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -142,7 +144,23 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
- return new CardinalityAggregatorFactory(name, fieldNames, byRow);
+ return new HyperUniquesAggregatorFactory(name, name);
+ }
+
+ @Override
+ public List getRequiredColumns()
+ {
+ return Lists.transform(
+ fieldNames,
+ new Function()
+ {
+ @Override
+ public AggregatorFactory apply(String input)
+ {
+ return new CardinalityAggregatorFactory(input, fieldNames, byRow);
+ }
+ }
+ );
}
@Override
diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
index fd4e36fb781..e4597c71a0b 100644
--- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java
@@ -73,12 +73,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return Aggregators.noopAggregator();
}
- if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
+ final Class classOfObject = selector.classOfObject();
+ if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
return new HyperUniquesAggregator(name, selector);
}
throw new IAE(
- "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
+ "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
);
}
@@ -91,12 +92,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return Aggregators.noopBufferAggregator();
}
- if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
+ final Class classOfObject = selector.classOfObject();
+ if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
return new HyperUniquesBufferAggregator(selector);
}
throw new IAE(
- "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
+ "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
);
}
@@ -131,6 +133,12 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name);
}
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
+ }
+
@Override
public Object deserialize(Object object)
{
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
index 0a23b5704b7..98ac83e32cd 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
@@ -72,7 +72,7 @@ public class GroupByQuery extends BaseQuery
private final List aggregatorSpecs;
private final List postAggregatorSpecs;
- private final Function, Sequence> orderByLimitFn;
+ private final Function, Sequence> limitFn;
@JsonCreator
public GroupByQuery(
@@ -85,8 +85,9 @@ public class GroupByQuery extends BaseQuery
@JsonProperty("postAggregations") List postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
- @JsonProperty("orderBy") LimitSpec orderBySpec,
- @JsonProperty("context") Map context
+ @JsonProperty("context") Map context,
+ // Backwards compatible
+ @JsonProperty("orderBy") LimitSpec orderBySpec
)
{
super(dataSource, querySegmentSpec, context);
@@ -129,7 +130,7 @@ public class GroupByQuery extends BaseQuery
);
}
- orderByLimitFn = postProcFn;
+ limitFn = postProcFn;
}
/**
@@ -146,7 +147,7 @@ public class GroupByQuery extends BaseQuery
List postAggregatorSpecs,
HavingSpec havingSpec,
LimitSpec orderBySpec,
- Function, Sequence> orderByLimitFn,
+ Function, Sequence> limitFn,
Map context
)
{
@@ -159,7 +160,7 @@ public class GroupByQuery extends BaseQuery
this.postAggregatorSpecs = postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = orderBySpec;
- this.orderByLimitFn = orderByLimitFn;
+ this.limitFn = limitFn;
}
@JsonProperty("filter")
@@ -199,7 +200,7 @@ public class GroupByQuery extends BaseQuery
}
@JsonProperty
- public LimitSpec getOrderBy()
+ public LimitSpec getLimitSpec()
{
return limitSpec;
}
@@ -218,7 +219,7 @@ public class GroupByQuery extends BaseQuery
public Sequence applyLimit(Sequence results)
{
- return orderByLimitFn.apply(results);
+ return limitFn.apply(results);
}
@Override
@@ -234,7 +235,7 @@ public class GroupByQuery extends BaseQuery
postAggregatorSpecs,
havingSpec,
limitSpec,
- orderByLimitFn,
+ limitFn,
computeOverridenContext(contextOverride)
);
}
@@ -252,7 +253,7 @@ public class GroupByQuery extends BaseQuery
postAggregatorSpecs,
havingSpec,
limitSpec,
- orderByLimitFn,
+ limitFn,
getContext()
);
}
@@ -270,7 +271,7 @@ public class GroupByQuery extends BaseQuery
postAggregatorSpecs,
havingSpec,
limitSpec,
- orderByLimitFn,
+ limitFn,
getContext()
);
}
@@ -292,11 +293,25 @@ public class GroupByQuery extends BaseQuery
private List orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
- private Builder()
+ public Builder()
{
}
- private Builder(Builder builder)
+ public Builder(GroupByQuery query)
+ {
+ dataSource = query.getDataSource();
+ querySegmentSpec = query.getQuerySegmentSpec();
+ limitSpec = query.getLimitSpec();
+ dimFilter = query.getDimFilter();
+ granularity = query.getGranularity();
+ dimensions = query.getDimensions();
+ aggregatorSpecs = query.getAggregatorSpecs();
+ postAggregatorSpecs = query.getPostAggregatorSpecs();
+ havingSpec = query.getHavingSpec();
+ context = query.getContext();
+ }
+
+ public Builder(Builder builder)
{
dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec;
@@ -490,7 +505,11 @@ public class GroupByQuery extends BaseQuery
{
final LimitSpec theLimitSpec;
if (limitSpec == null) {
- theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
+ if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
+ theLimitSpec = new NoopLimitSpec();
+ } else {
+ theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
+ }
} else {
theLimitSpec = limitSpec;
}
@@ -504,9 +523,9 @@ public class GroupByQuery extends BaseQuery
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
- null,
theLimitSpec,
- context
+ context,
+ null
);
}
}
@@ -515,36 +534,57 @@ public class GroupByQuery extends BaseQuery
public String toString()
{
return "GroupByQuery{" +
- "limitSpec=" + limitSpec +
- ", dimFilter=" + dimFilter +
- ", granularity=" + granularity +
- ", dimensions=" + dimensions +
- ", aggregatorSpecs=" + aggregatorSpecs +
- ", postAggregatorSpecs=" + postAggregatorSpecs +
- ", orderByLimitFn=" + orderByLimitFn +
- '}';
+ "limitSpec=" + limitSpec +
+ ", dimFilter=" + dimFilter +
+ ", granularity=" + granularity +
+ ", dimensions=" + dimensions +
+ ", aggregatorSpecs=" + aggregatorSpecs +
+ ", postAggregatorSpecs=" + postAggregatorSpecs +
+ ", limitFn=" + limitFn +
+ '}';
}
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
GroupByQuery that = (GroupByQuery) o;
- if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
+ if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
return false;
- if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
- if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
- if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
- if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false;
- if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false;
- if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null)
+ }
+ if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
- if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
+ }
+ if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
+ }
+ if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
+ return false;
+ }
+ if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
+ return false;
+ }
+ if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
+ return false;
+ }
+ if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) {
+ return false;
+ }
+ if (postAggregatorSpecs != null
+ ? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
+ : that.postAggregatorSpecs != null) {
+ return false;
+ }
return true;
}
@@ -560,7 +600,7 @@ public class GroupByQuery extends BaseQuery
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
- result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0);
+ result = 31 * result + (limitFn != null ? limitFn.hashCode() : 0);
return result;
}
}
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
index d9e52014f86..8599c6bdd16 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
@@ -25,12 +25,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
+import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@@ -123,7 +123,7 @@ public class GroupByQueryEngine
@Override
public void cleanup(RowIterator iterFromMake)
{
- Closeables.closeQuietly(iterFromMake);
+ CloseQuietly.close(iterFromMake);
}
}
);
@@ -135,7 +135,7 @@ public class GroupByQueryEngine
@Override
public void close() throws IOException
{
- Closeables.closeQuietly(bufferHolder);
+ CloseQuietly.close(bufferHolder);
}
}
)
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java
index 00298f18ba0..b7183b12354 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java
@@ -24,12 +24,14 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
+import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
import javax.annotation.Nullable;
import java.util.List;
@@ -53,7 +55,7 @@ public class GroupByQueryHelper
new Function()
{
@Override
- public AggregatorFactory apply(@Nullable AggregatorFactory input)
+ public AggregatorFactory apply(AggregatorFactory input)
{
return input.getCombiningFactory();
}
@@ -64,7 +66,7 @@ public class GroupByQueryHelper
new Function()
{
@Override
- public String apply(@Nullable DimensionSpec input)
+ public String apply(DimensionSpec input)
{
return input.getOutputName();
}
@@ -83,14 +85,14 @@ public class GroupByQueryHelper
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
- if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
+ if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
};
- return new Pair>(index, accumulator);
+ return new Pair<>(index, accumulator);
}
}
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
index d6ce385ed21..ae0224313fe 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.Pair;
@@ -44,6 +45,7 @@ import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
+import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
@@ -60,7 +62,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
+ private static final Map NO_MERGE_CONTEXT = ImmutableMap.of(
+ GROUP_BY_MERGE_KEY,
+ "false"
+ );
private final Supplier configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@@ -82,7 +87,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query input, Map context)
{
- if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
+ if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context);
} else {
return runner.run(input, context);
@@ -93,33 +98,45 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner, Map context)
{
-
- Sequence result;
-
// If there's a subquery, merge subquery results and then apply the aggregator
- DataSource dataSource = query.getDataSource();
+ final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery;
try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
- } catch (ClassCastException e) {
+ }
+ catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}
- Sequence subqueryResult = mergeGroupByResults(subquery, runner, context);
- IncrementalIndexStorageAdapter adapter
- = new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult));
- result = engine.process(query, adapter);
+ final Sequence subqueryResult = mergeGroupByResults(subquery, runner, context);
+ final List aggs = Lists.newArrayList();
+ for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
+ aggs.addAll(aggregatorFactory.getRequiredColumns());
+ }
+
+ // We need the inner incremental index to have all the columns required by the outer query
+ final GroupByQuery innerQuery = new GroupByQuery.Builder(query)
+ .setAggregatorSpecs(aggs)
+ .setInterval(subquery.getIntervals())
+ .setPostAggregatorSpecs(Lists.newArrayList())
+ .build();
+
+ final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
+ .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
+ .build();
+
+ final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(
+ makeIncrementalIndex(innerQuery, subqueryResult)
+ );
+ return outerQuery.applyLimit(engine.process(outerQuery, adapter));
} else {
- result = runner.run(query, context);
+ return query.applyLimit(postAggregate(query, makeIncrementalIndex(query, runner.run(query, context))));
}
-
- return postAggregate(query, makeIncrementalIndex(query, result));
}
-
private Sequence postAggregate(final GroupByQuery query, IncrementalIndex index)
{
- Sequence sequence = Sequences.map(
+ return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function()
{
@@ -129,13 +146,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest rows)
@@ -153,7 +169,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences)
{
- return new ConcatSequence(seqOfSequences);
+ return new ConcatSequence<>(seqOfSequences);
}
@Override
diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
index eda54ea0dc3..3d78e112cb5 100644
--- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
+++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -87,12 +87,17 @@ public class DefaultLimitSpec implements LimitSpec
if (limit == Integer.MAX_VALUE) {
return new SortingFn(ordering);
- }
- else {
+ } else {
return new TopNFunction(ordering, limit);
}
}
+ @Override
+ public LimitSpec merge(LimitSpec other)
+ {
+ return this;
+ }
+
private Ordering makeComparator(
List dimensions, List aggs, List postAggs
)
@@ -200,12 +205,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
LimitingFn that = (LimitingFn) o;
- if (limit != that.limit) return false;
+ if (limit != that.limit) {
+ return false;
+ }
return true;
}
@@ -232,12 +243,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
SortingFn sortingFn = (SortingFn) o;
- if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false;
+ if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) {
+ return false;
+ }
return true;
}
@@ -273,13 +290,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
TopNFunction that = (TopNFunction) o;
- if (limit != that.limit) return false;
- if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false;
+ if (limit != that.limit) {
+ return false;
+ }
+ if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) {
+ return false;
+ }
return true;
}
@@ -296,13 +321,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
DefaultLimitSpec that = (DefaultLimitSpec) o;
- if (limit != that.limit) return false;
- if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false;
+ if (limit != that.limit) {
+ return false;
+ }
+ if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
+ return false;
+ }
return true;
}
diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java
index 0d07f1f91c9..fa50d62016c 100644
--- a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java
+++ b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java
@@ -38,5 +38,11 @@ import java.util.List;
})
public interface LimitSpec
{
- public Function, Sequence> build(List dimensions, List aggs, List postAggs);
+ public Function, Sequence> build(
+ List dimensions,
+ List aggs,
+ List postAggs
+ );
+
+ public LimitSpec merge(LimitSpec other);
}
diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java
index d975e24a65f..e71038d4918 100644
--- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java
+++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java
@@ -41,6 +41,12 @@ public class NoopLimitSpec implements LimitSpec
return Functions.identity();
}
+ @Override
+ public LimitSpec merge(LimitSpec other)
+ {
+ return other;
+ }
+
@Override
public String toString()
{
diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java
index 6f3e70b9851..db9fa913bc9 100644
--- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java
+++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java
@@ -21,6 +21,7 @@ package io.druid.query.timeboundary;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -48,12 +49,16 @@ public class TimeBoundaryQuery extends BaseQuery
);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";
+
private static final byte CACHE_TYPE_ID = 0x0;
+ private final String bound;
+
@JsonCreator
public TimeBoundaryQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
+ @JsonProperty("bound") String bound,
@JsonProperty("context") Map context
)
{
@@ -63,6 +68,8 @@ public class TimeBoundaryQuery extends BaseQuery
: querySegmentSpec,
context
);
+
+ this.bound = bound == null ? "" : bound;
}
@Override
@@ -77,12 +84,19 @@ public class TimeBoundaryQuery extends BaseQuery
return Query.TIME_BOUNDARY;
}
+ @JsonProperty
+ public String getBound()
+ {
+ return bound;
+ }
+
@Override
public TimeBoundaryQuery withOverriddenContext(Map contextOverrides)
{
return new TimeBoundaryQuery(
getDataSource(),
getQuerySegmentSpec(),
+ bound,
computeOverridenContext(contextOverrides)
);
}
@@ -93,6 +107,7 @@ public class TimeBoundaryQuery extends BaseQuery
return new TimeBoundaryQuery(
getDataSource(),
spec,
+ bound,
getContext()
);
}
@@ -103,40 +118,33 @@ public class TimeBoundaryQuery extends BaseQuery
return new TimeBoundaryQuery(
dataSource,
getQuerySegmentSpec(),
+ bound,
getContext()
);
}
public byte[] getCacheKey()
{
- return ByteBuffer.allocate(1)
+ final byte[] boundBytes = bound.getBytes(Charsets.UTF_8);
+ return ByteBuffer.allocate(1 + boundBytes.length)
.put(CACHE_TYPE_ID)
+ .put(boundBytes)
.array();
}
- @Override
- public String toString()
- {
- return "TimeBoundaryQuery{" +
- "dataSource='" + getDataSource() + '\'' +
- ", querySegmentSpec=" + getQuerySegmentSpec() +
- ", duration=" + getDuration() +
- '}';
- }
-
public Iterable> buildResult(DateTime timestamp, DateTime min, DateTime max)
{
List> results = Lists.newArrayList();
Map result = Maps.newHashMap();
if (min != null) {
- result.put(TimeBoundaryQuery.MIN_TIME, min);
+ result.put(MIN_TIME, min);
}
if (max != null) {
- result.put(TimeBoundaryQuery.MAX_TIME, max);
+ result.put(MAX_TIME, max);
}
if (!result.isEmpty()) {
- results.add(new Result(timestamp, new TimeBoundaryResultValue(result)));
+ results.add(new Result<>(timestamp, new TimeBoundaryResultValue(result)));
}
return results;
@@ -154,25 +162,74 @@ public class TimeBoundaryQuery extends BaseQuery
TimeBoundaryResultValue val = result.getValue();
DateTime currMinTime = val.getMinTime();
- if (currMinTime.isBefore(min)) {
+ if (currMinTime != null && currMinTime.isBefore(min)) {
min = currMinTime;
}
DateTime currMaxTime = val.getMaxTime();
- if (currMaxTime.isAfter(max)) {
+ if (currMaxTime != null && currMaxTime.isAfter(max)) {
max = currMaxTime;
}
}
- return Arrays.asList(
- new Result(
- min,
- new TimeBoundaryResultValue(
- ImmutableMap.of(
- TimeBoundaryQuery.MIN_TIME, min,
- TimeBoundaryQuery.MAX_TIME, max
- )
- )
- )
- );
+ final DateTime ts;
+ final DateTime minTime;
+ final DateTime maxTime;
+
+ if (bound.equalsIgnoreCase(MIN_TIME)) {
+ ts = min;
+ minTime = min;
+ maxTime = null;
+ } else if (bound.equalsIgnoreCase(MAX_TIME)) {
+ ts = max;
+ minTime = null;
+ maxTime = max;
+ } else {
+ ts = min;
+ minTime = min;
+ maxTime = max;
+ }
+
+ return buildResult(ts, minTime, maxTime);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TimeBoundaryQuery{" +
+ "dataSource='" + getDataSource() + '\'' +
+ ", querySegmentSpec=" + getQuerySegmentSpec() +
+ ", duration=" + getDuration() +
+ ", bound" + bound +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ TimeBoundaryQuery that = (TimeBoundaryQuery) o;
+
+ if (!bound.equals(that.bound)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = super.hashCode();
+ result = 31 * result + bound.hashCode();
+ return result;
}
}
diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 5700aa68185..eb779ae6a7d 100644
--- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -68,8 +68,8 @@ public class TimeBoundaryQueryQueryToolChest
return segments;
}
- final T first = segments.get(0);
- final T second = segments.get(segments.size() - 1);
+ final T min = segments.get(0);
+ final T max = segments.get(segments.size() - 1);
return Lists.newArrayList(
Iterables.filter(
@@ -79,8 +79,8 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public boolean apply(T input)
{
- return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
- .overlaps(second.getInterval());
+ return (min != null && input.getInterval().overlaps(min.getInterval())) ||
+ (max != null && input.getInterval().overlaps(max.getInterval()));
}
}
)
@@ -112,7 +112,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public Sequence> mergeSequences(Sequence>> seqOfSequences)
{
- return new OrderedMergeSequence>(getOrdering(), seqOfSequences);
+ return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
@@ -147,9 +147,9 @@ public class TimeBoundaryQueryQueryToolChest
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
- .put(TIMEBOUNDARY_QUERY)
- .put(query.getCacheKey())
- .array();
+ .put(TIMEBOUNDARY_QUERY)
+ .put(query.getCacheKey())
+ .array();
}
@Override
@@ -178,11 +178,11 @@ public class TimeBoundaryQueryQueryToolChest
{
@Override
@SuppressWarnings("unchecked")
- public Result apply(@Nullable Object input)
+ public Result apply(Object input)
{
List