diff --git a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java index a80b6f3470c..cb29e665351 100644 --- a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java +++ b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java @@ -19,10 +19,14 @@ package io.druid.query.select; +import com.google.common.collect.Sets; import com.metamx.common.guava.nary.BinaryFn; import io.druid.granularity.AllGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; + +import java.util.Set; + import org.joda.time.DateTime; import java.util.List; @@ -77,6 +81,9 @@ public class SelectBinaryFn SelectResultValueBuilder builder = new SelectResultValueBuilder.MergeBuilder(timestamp, pagingSpec, descending); + builder.addDimensions(mergeColumns(arg1.getValue().getDimensions(), arg2.getValue().getDimensions())); + builder.addMetrics(mergeColumns(arg1.getValue().getMetrics(), arg2.getValue().getMetrics())); + for (EventHolder event : arg1Val) { builder.addEntry(event); } @@ -87,4 +94,21 @@ public class SelectBinaryFn return builder.build(); } + + private Set mergeColumns(final Set arg1, final Set arg2) + { + if (arg1.isEmpty()) { + return arg2; + } + + if (arg2.isEmpty()) { + return arg1; + } + + if (arg1.equals(arg2)) { + return arg1; + } + + return Sets.union(arg1, arg2); + } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index eaa37ccd99b..3d3802df777 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -108,12 +108,14 @@ public class SelectQueryEngine for (DimensionSpec dim : dims) { final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); dimSelectors.put(dim.getOutputName(), dimSelector); + builder.addDimension(dim.getOutputName()); } final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); + builder.addMetric(metric); } final PagingOffset offset = query.getPagingOffset(segmentId); diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 3bf1bd4ad02..5329205ec21 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -66,7 +66,7 @@ import java.util.TreeMap; */ public class SelectQueryQueryToolChest extends QueryToolChest, SelectQuery> { - private static final byte SELECT_QUERY = 0x13; + private static final byte SELECT_QUERY = 0x16; private static final TypeReference OBJECT_TYPE_REFERENCE = new TypeReference() { @@ -220,6 +220,8 @@ public class SelectQueryQueryToolChest extends QueryToolChest) jsonMapper.convertValue( + resultIter.next(), new TypeReference>() + { + } + ), + (Set) jsonMapper.convertValue( + resultIter.next(), new TypeReference>() + { + } + ), (List) jsonMapper.convertValue( resultIter.next(), new TypeReference>() { diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValue.java b/processing/src/main/java/io/druid/query/select/SelectResultValue.java index b48f72de1f9..a4eaf66d7ac 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValue.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValue.java @@ -25,21 +25,28 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** */ public class SelectResultValue implements Iterable { private final Map pagingIdentifiers; + private final Set dimensions; + private final Set metrics; private final List events; @JsonCreator public SelectResultValue( @JsonProperty("pagingIdentifiers") Map pagingIdentifiers, + @JsonProperty("dimensions") Set dimensions, + @JsonProperty("metrics") Set metrics, @JsonProperty("events") List events ) { this.pagingIdentifiers = pagingIdentifiers; + this.dimensions = dimensions; + this.metrics = metrics; this.events = events; } @@ -49,6 +56,18 @@ public class SelectResultValue implements Iterable return pagingIdentifiers; } + @JsonProperty + public Set getDimensions() + { + return dimensions; + } + + @JsonProperty + public Set getMetrics() + { + return metrics; + } + @JsonProperty public List getEvents() { @@ -76,6 +95,15 @@ public class SelectResultValue implements Iterable if (events != null ? !events.equals(that.events) : that.events != null) { return false; } + + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + + if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) { + return false; + } + if (pagingIdentifiers != null ? !pagingIdentifiers.equals(that.pagingIdentifiers) : that.pagingIdentifiers != null) { @@ -89,6 +117,8 @@ public class SelectResultValue implements Iterable public int hashCode() { int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0; + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); result = 31 * result + (events != null ? events.hashCode() : 0); return result; } @@ -98,6 +128,8 @@ public class SelectResultValue implements Iterable { return "SelectResultValue{" + "pagingIdentifiers=" + pagingIdentifiers + + ", dimensions=" + dimensions + + ", metrics=" + metrics + ", events=" + events + '}'; } diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java index fba023caafa..05fa4c952bf 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Queues; +import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import com.metamx.common.guava.Comparators; import io.druid.query.Result; @@ -32,6 +33,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; /** */ @@ -59,6 +61,8 @@ public class SelectResultValueBuilder protected final DateTime timestamp; protected final PagingSpec pagingSpec; protected final boolean descending; + protected Set dimensions; + protected Set metrics; protected final Queue pQueue; protected final Map pagingIdentifiers; @@ -68,6 +72,8 @@ public class SelectResultValueBuilder this.timestamp = timestamp; this.pagingSpec = pagingSpec; this.descending = descending; + this.dimensions = Sets.newHashSet(); + this.metrics = Sets.newHashSet(); this.pagingIdentifiers = Maps.newLinkedHashMap(); this.pQueue = instantiatePQueue(); } @@ -81,12 +87,32 @@ public class SelectResultValueBuilder { pagingIdentifiers.put(segmentId, lastOffset); } + + public void addDimension(String dimension) + { + dimensions.add(dimension); + } + + public void addDimensions(Set dimensions) + { + this.dimensions.addAll(dimensions); + } + + public void addMetric(String metric) + { + metrics.add(metric); + } + + public void addMetrics(Set metrics) + { + this.metrics.addAll(metrics); + } public Result build() { return new Result( timestamp, - new SelectResultValue(pagingIdentifiers, getEventHolders()) + new SelectResultValue(pagingIdentifiers, dimensions, metrics, getEventHolders()) ); } diff --git a/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java index 78fed650123..8b7e80a30d8 100644 --- a/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectBinaryFnTest.java @@ -20,7 +20,9 @@ package io.druid.query.select; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.metamx.common.ISE; import io.druid.granularity.QueryGranularities; import io.druid.query.Result; @@ -32,6 +34,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Set; /** */ @@ -50,6 +53,8 @@ public class SelectBinaryFnTest new DateTime("2013-01-01"), new SelectResultValue( ImmutableMap.of(), + Sets.newHashSet("first", "fourth"), + Sets.newHashSet("sixth"), Arrays.asList( new EventHolder( segmentId1, @@ -90,6 +95,8 @@ public class SelectBinaryFnTest new DateTime("2013-01-01"), new SelectResultValue( ImmutableMap.of(), + Sets.newHashSet("second", "third"), + Sets.newHashSet("fifth"), Arrays.asList( new EventHolder( segmentId2, @@ -203,6 +210,61 @@ public class SelectBinaryFnTest verifyEvents(exEvents, acEvents); } + @Test + public void testColumnMerge() throws Exception + { + SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularities.ALL, new PagingSpec(null, 5), false); + + Result res1 = new Result<>( + new DateTime("2013-01-01"), + new SelectResultValue( + ImmutableMap.of(), + Sets.newHashSet("first", "second", "fourth"), + Sets.newHashSet("eight", "nineth"), + Lists.newArrayList( + new EventHolder( + segmentId1, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), "dim", "first" + ) + )) + ) + ); + + Result res2 = new Result<>( + new DateTime("2013-01-01"), + new SelectResultValue( + ImmutableMap.of(), + Sets.newHashSet("third", "second", "fifth"), + Sets.newHashSet("seventh"), + Lists.newArrayList( + new EventHolder( + segmentId2, + 0, + ImmutableMap.of( + EventHolder.timestampKey, + new DateTime("2013-01-01T00"), + "dim", + "second" + ) + )) + ) + ); + + Result merged = binaryFn.apply(res1, res2); + + Set exDimensions = Sets.newHashSet("first", "second", "fourth", "third", "fifth"); + Set exMetrics = Sets.newHashSet("eight", "nineth", "seventh"); + + Set acDimensions = merged.getValue().getDimensions(); + Set acMetrics = merged.getValue().getMetrics(); + + Assert.assertEquals(exDimensions, acDimensions); + Assert.assertEquals(exMetrics, acMetrics); + } + private void verifyIters(Iterator iter1, Iterator iter2) { while (iter1.hasNext()) { diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 20ae203b9af..c7b59ef9c61 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.ObjectArrays; +import com.google.common.collect.Sets; import com.metamx.common.ISE; import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; @@ -56,6 +57,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** */ @@ -155,10 +157,12 @@ public class SelectQueryRunnerTest PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); List> expectedResults = toExpected( toEvents(new String[]{EventHolder.timestampKey + ":TIME"}, V_0112_0114), + Lists.newArrayList("market", "quality", "placement", "placementish", "partial_null_column", "null_column"), + Lists.newArrayList("index", "quality_uniques"), offset.startOffset(), offset.threshold() ); - verify(expectedResults, results); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); } @Test @@ -242,6 +246,8 @@ public class SelectQueryRunnerTest new DateTime("2011-01-12T00:00:00.000Z"), new SelectResultValue( ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), + Sets.newHashSet("mar", "qual", "place"), + Sets.newHashSet("index", "quality_uniques"), Arrays.asList( new EventHolder( QueryRunnerTestHelper.segmentId, @@ -286,6 +292,8 @@ public class SelectQueryRunnerTest new DateTime("2011-01-12T00:00:00.000Z"), new SelectResultValue( ImmutableMap.of(QueryRunnerTestHelper.segmentId, -3), + Sets.newHashSet("mar", "qual", "place"), + Sets.newHashSet("index", "quality_uniques"), Arrays.asList( new EventHolder( QueryRunnerTestHelper.segmentId, @@ -356,6 +364,8 @@ public class SelectQueryRunnerTest }, V_0112_0114 ), + Lists.newArrayList("market"), + Lists.newArrayList("index"), offset.startOffset(), offset.threshold() ); @@ -387,6 +397,8 @@ public class SelectQueryRunnerTest }, V_0112_0114 ), + Lists.newArrayList("quality"), + Lists.newArrayList("index"), offset.startOffset(), offset.threshold() ); @@ -450,6 +462,8 @@ public class SelectQueryRunnerTest PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); List> expectedResults = toExpected( events, + Lists.newArrayList("quality"), + Lists.newArrayList("index"), offset.startOffset(), offset.threshold() ); @@ -504,6 +518,8 @@ public class SelectQueryRunnerTest PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); List> expectedResults = toExpected( events, + Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), + Lists.newArrayList(QueryRunnerTestHelper.indexMetric), offset.startOffset(), offset.threshold() ); @@ -537,12 +553,14 @@ public class SelectQueryRunnerTest new DateTime("2011-01-12T00:00:00.000Z"), new SelectResultValue( ImmutableMap.of(), + Sets.newHashSet("market", "quality", "placement", "placementish", "partial_null_column", "null_column"), + Sets.newHashSet("index", "quality_uniques"), Lists.newArrayList() ) ) ); - verify(expectedResults, results); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); } @Test @@ -571,6 +589,8 @@ public class SelectQueryRunnerTest PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); List> expectedResults = toExpected( events, + Lists.newArrayList("foo"), + Lists.newArrayList("foo2"), offset.startOffset(), offset.threshold() ); @@ -626,6 +646,8 @@ public class SelectQueryRunnerTest private List> toExpected( List>> targets, + List dimensions, + List metrics, final int offset, final int threshold ) @@ -653,7 +675,11 @@ public class SelectQueryRunnerTest expected.add( new Result( new DateTime(group.get(0).get(EventHolder.timestampKey)), - new SelectResultValue(ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset), holders) + new SelectResultValue( + ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset), + Sets.newHashSet(dimensions), + Sets.newHashSet(metrics), + holders) ) ); } @@ -678,6 +704,9 @@ public class SelectQueryRunnerTest Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey())); } + Assert.assertEquals(expected.getValue().getDimensions(), actual.getValue().getDimensions()); + Assert.assertEquals(expected.getValue().getMetrics(), actual.getValue().getMetrics()); + Iterator expectedEvts = expected.getValue().getEvents().iterator(); Iterator actualEvts = actual.getValue().getEvents().iterator(); @@ -708,4 +737,19 @@ public class SelectQueryRunnerTest throw new ISE("This iterator should be exhausted!"); } } + + private static Iterable> populateNullColumnAtLastForQueryableIndexCase(Iterable> results, String columnName) + { + // A Queryable index does not have the null column when it has loaded a index. + for (Result value : results) { + Set dimensions = value.getValue().getDimensions(); + if (dimensions.contains(columnName)) { + break; + } + dimensions.add(columnName); + } + + return results; + } + } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index c30fc07bbf3..24ea25feb6c 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ForwardingListeningExecutorService; @@ -154,6 +155,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; @@ -1155,6 +1157,9 @@ public class CachingClusteredClientTest @Test public void testSelectCaching() throws Exception { + final Set dimensions = Sets.newHashSet("a"); + final Set metrics = Sets.newHashSet("rows"); + Druids.SelectQueryBuilder builder = Druids.newSelectQueryBuilder() .dataSource(DATA_SOURCE) .intervals(SEG_SPEC) @@ -1169,14 +1174,13 @@ public class CachingClusteredClientTest client, builder.build(), new Interval("2011-01-01/2011-01-02"), - makeSelectResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)), + makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)), new Interval("2011-01-02/2011-01-03"), - makeSelectResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)), + makeSelectResults(dimensions, metrics, new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)), new Interval("2011-01-05/2011-01-10"), - makeSelectResults( - new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5), + makeSelectResults(dimensions, metrics, new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5), new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6), new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7), new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8), @@ -1184,8 +1188,7 @@ public class CachingClusteredClientTest ), new Interval("2011-01-05/2011-01-10"), - makeSelectResults( - new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5), + makeSelectResults(dimensions, metrics, new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5), new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6), new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7), new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8), @@ -1202,8 +1205,7 @@ public class CachingClusteredClientTest ); HashMap context = new HashMap(); TestHelper.assertExpectedResults( - makeSelectResults( - new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1), + makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1), new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5), new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5), new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5), @@ -2426,7 +2428,7 @@ public class CachingClusteredClientTest return retVal; } - private Iterable> makeSelectResults(Object... objects) + private Iterable> makeSelectResults(Set dimensions, Set metrics, Object... objects) { List> retVal = Lists.newArrayList(); int index = 0; @@ -2434,11 +2436,15 @@ public class CachingClusteredClientTest DateTime timestamp = (DateTime) objects[index++]; List values = Lists.newArrayList(); + while (index < objects.length && !(objects[index] instanceof DateTime)) { values.add(new EventHolder(null, 0, (Map) objects[index++])); } - retVal.add(new Result<>(timestamp, new SelectResultValue(null, values))); + retVal.add(new Result<>( + timestamp, + new SelectResultValue(null, dimensions, metrics, values) + )); } return retVal; }