introducing lists of existing columns in the fields of select queries' output (#2491)

* introducing lists of existing columns in the fields of select queries' output

* rebase master

* address the comment. add test code for select query caching

* change the cache code in SelectQueryQueryToolChest to 0x16
This commit is contained in:
jaehong choi 2016-08-26 01:07:53 +09:00 committed by Nishant
parent d624037698
commit 2e0f253c32
8 changed files with 223 additions and 15 deletions

View File

@ -19,10 +19,14 @@
package io.druid.query.select;
import com.google.common.collect.Sets;
import com.metamx.common.guava.nary.BinaryFn;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import java.util.Set;
import org.joda.time.DateTime;
import java.util.List;
@ -77,6 +81,9 @@ public class SelectBinaryFn
SelectResultValueBuilder builder = new SelectResultValueBuilder.MergeBuilder(timestamp, pagingSpec, descending);
builder.addDimensions(mergeColumns(arg1.getValue().getDimensions(), arg2.getValue().getDimensions()));
builder.addMetrics(mergeColumns(arg1.getValue().getMetrics(), arg2.getValue().getMetrics()));
for (EventHolder event : arg1Val) {
builder.addEntry(event);
}
@ -87,4 +94,21 @@ public class SelectBinaryFn
return builder.build();
}
private Set<String> mergeColumns(final Set<String> arg1, final Set<String> arg2)
{
if (arg1.isEmpty()) {
return arg2;
}
if (arg2.isEmpty()) {
return arg1;
}
if (arg1.equals(arg2)) {
return arg1;
}
return Sets.union(arg1, arg2);
}
}

View File

@ -108,12 +108,14 @@ public class SelectQueryEngine
for (DimensionSpec dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim.getOutputName(), dimSelector);
builder.addDimension(dim.getOutputName());
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
builder.addMetric(metric);
}
final PagingOffset offset = query.getPagingOffset(segmentId);

View File

@ -66,7 +66,7 @@ import java.util.TreeMap;
*/
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final byte SELECT_QUERY = 0x16;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
@ -220,6 +220,8 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
return Arrays.asList(
input.getTimestamp().getMillis(),
input.getValue().getPagingIdentifiers(),
input.getValue().getDimensions(),
input.getValue().getMetrics(),
input.getValue().getEvents()
);
}
@ -249,6 +251,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
{
}
),
(Set<String>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Set<String>>()
{
}
),
(Set<String>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Set<String>>()
{
}
),
(List<EventHolder>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<List<EventHolder>>()
{

View File

@ -25,21 +25,28 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class SelectResultValue implements Iterable<EventHolder>
{
private final Map<String, Integer> pagingIdentifiers;
private final Set<String> dimensions;
private final Set<String> metrics;
private final List<EventHolder> events;
@JsonCreator
public SelectResultValue(
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("dimensions") Set<String> dimensions,
@JsonProperty("metrics") Set<String> metrics,
@JsonProperty("events") List<EventHolder> events
)
{
this.pagingIdentifiers = pagingIdentifiers;
this.dimensions = dimensions;
this.metrics = metrics;
this.events = events;
}
@ -49,6 +56,18 @@ public class SelectResultValue implements Iterable<EventHolder>
return pagingIdentifiers;
}
@JsonProperty
public Set<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public Set<String> getMetrics()
{
return metrics;
}
@JsonProperty
public List<EventHolder> getEvents()
{
@ -76,6 +95,15 @@ public class SelectResultValue implements Iterable<EventHolder>
if (events != null ? !events.equals(that.events) : that.events != null) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) {
return false;
}
if (pagingIdentifiers != null
? !pagingIdentifiers.equals(that.pagingIdentifiers)
: that.pagingIdentifiers != null) {
@ -89,6 +117,8 @@ public class SelectResultValue implements Iterable<EventHolder>
public int hashCode()
{
int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0;
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (events != null ? events.hashCode() : 0);
return result;
}
@ -98,6 +128,8 @@ public class SelectResultValue implements Iterable<EventHolder>
{
return "SelectResultValue{" +
"pagingIdentifiers=" + pagingIdentifiers +
", dimensions=" + dimensions +
", metrics=" + metrics +
", events=" + events +
'}';
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.metamx.common.guava.Comparators;
import io.druid.query.Result;
@ -32,6 +33,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/**
*/
@ -59,6 +61,8 @@ public class SelectResultValueBuilder
protected final DateTime timestamp;
protected final PagingSpec pagingSpec;
protected final boolean descending;
protected Set<String> dimensions;
protected Set<String> metrics;
protected final Queue<EventHolder> pQueue;
protected final Map<String, Integer> pagingIdentifiers;
@ -68,6 +72,8 @@ public class SelectResultValueBuilder
this.timestamp = timestamp;
this.pagingSpec = pagingSpec;
this.descending = descending;
this.dimensions = Sets.newHashSet();
this.metrics = Sets.newHashSet();
this.pagingIdentifiers = Maps.newLinkedHashMap();
this.pQueue = instantiatePQueue();
}
@ -81,12 +87,32 @@ public class SelectResultValueBuilder
{
pagingIdentifiers.put(segmentId, lastOffset);
}
public void addDimension(String dimension)
{
dimensions.add(dimension);
}
public void addDimensions(Set<String> dimensions)
{
this.dimensions.addAll(dimensions);
}
public void addMetric(String metric)
{
metrics.add(metric);
}
public void addMetrics(Set<String> metrics)
{
this.metrics.addAll(metrics);
}
public Result<SelectResultValue> build()
{
return new Result<SelectResultValue>(
timestamp,
new SelectResultValue(pagingIdentifiers, getEventHolders())
new SelectResultValue(pagingIdentifiers, dimensions, metrics, getEventHolders())
);
}

View File

@ -20,7 +20,9 @@
package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import io.druid.granularity.QueryGranularities;
import io.druid.query.Result;
@ -32,6 +34,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
/**
*/
@ -50,6 +53,8 @@ public class SelectBinaryFnTest
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("first", "fourth"),
Sets.newHashSet("sixth"),
Arrays.asList(
new EventHolder(
segmentId1,
@ -90,6 +95,8 @@ public class SelectBinaryFnTest
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("second", "third"),
Sets.newHashSet("fifth"),
Arrays.asList(
new EventHolder(
segmentId2,
@ -203,6 +210,61 @@ public class SelectBinaryFnTest
verifyEvents(exEvents, acEvents);
}
@Test
public void testColumnMerge() throws Exception
{
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularities.ALL, new PagingSpec(null, 5), false);
Result<SelectResultValue> res1 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("first", "second", "fourth"),
Sets.newHashSet("eight", "nineth"),
Lists.<EventHolder>newArrayList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"), "dim", "first"
)
))
)
);
Result<SelectResultValue> res2 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("third", "second", "fifth"),
Sets.newHashSet("seventh"),
Lists.<EventHolder>newArrayList(
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
))
)
);
Result<SelectResultValue> merged = binaryFn.apply(res1, res2);
Set<String> exDimensions = Sets.newHashSet("first", "second", "fourth", "third", "fifth");
Set<String> exMetrics = Sets.newHashSet("eight", "nineth", "seventh");
Set<String> acDimensions = merged.getValue().getDimensions();
Set<String> acMetrics = merged.getValue().getMetrics();
Assert.assertEquals(exDimensions, acDimensions);
Assert.assertEquals(exMetrics, acMetrics);
}
private void verifyIters(Iterator iter1, Iterator iter2)
{
while (iter1.hasNext()) {

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
@ -56,6 +57,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@ -155,10 +157,12 @@ public class SelectQueryRunnerTest
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
toEvents(new String[]{EventHolder.timestampKey + ":TIME"}, V_0112_0114),
Lists.newArrayList("market", "quality", "placement", "placementish", "partial_null_column", "null_column"),
Lists.<String>newArrayList("index", "quality_uniques"),
offset.startOffset(),
offset.threshold()
);
verify(expectedResults, results);
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}
@Test
@ -242,6 +246,8 @@ public class SelectQueryRunnerTest
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Sets.newHashSet("mar", "qual", "place"),
Sets.newHashSet("index", "quality_uniques"),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
@ -286,6 +292,8 @@ public class SelectQueryRunnerTest
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, -3),
Sets.newHashSet("mar", "qual", "place"),
Sets.newHashSet("index", "quality_uniques"),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
@ -356,6 +364,8 @@ public class SelectQueryRunnerTest
},
V_0112_0114
),
Lists.newArrayList("market"),
Lists.<String>newArrayList("index"),
offset.startOffset(),
offset.threshold()
);
@ -387,6 +397,8 @@ public class SelectQueryRunnerTest
},
V_0112_0114
),
Lists.newArrayList("quality"),
Lists.<String>newArrayList("index"),
offset.startOffset(),
offset.threshold()
);
@ -450,6 +462,8 @@ public class SelectQueryRunnerTest
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
Lists.newArrayList("quality"),
Lists.<String>newArrayList("index"),
offset.startOffset(),
offset.threshold()
);
@ -504,6 +518,8 @@ public class SelectQueryRunnerTest
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
Lists.newArrayList(QueryRunnerTestHelper.qualityDimension),
Lists.<String>newArrayList(QueryRunnerTestHelper.indexMetric),
offset.startOffset(),
offset.threshold()
);
@ -537,12 +553,14 @@ public class SelectQueryRunnerTest
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("market", "quality", "placement", "placementish", "partial_null_column", "null_column"),
Sets.newHashSet("index", "quality_uniques"),
Lists.<EventHolder>newArrayList()
)
)
);
verify(expectedResults, results);
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}
@Test
@ -571,6 +589,8 @@ public class SelectQueryRunnerTest
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
events,
Lists.newArrayList("foo"),
Lists.<String>newArrayList("foo2"),
offset.startOffset(),
offset.threshold()
);
@ -626,6 +646,8 @@ public class SelectQueryRunnerTest
private List<Result<SelectResultValue>> toExpected(
List<List<Map<String, Object>>> targets,
List<String> dimensions,
List<String> metrics,
final int offset,
final int threshold
)
@ -653,7 +675,11 @@ public class SelectQueryRunnerTest
expected.add(
new Result(
new DateTime(group.get(0).get(EventHolder.timestampKey)),
new SelectResultValue(ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset), holders)
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset),
Sets.<String>newHashSet(dimensions),
Sets.<String>newHashSet(metrics),
holders)
)
);
}
@ -678,6 +704,9 @@ public class SelectQueryRunnerTest
Assert.assertEquals(entry.getValue(), actual.getValue().getPagingIdentifiers().get(entry.getKey()));
}
Assert.assertEquals(expected.getValue().getDimensions(), actual.getValue().getDimensions());
Assert.assertEquals(expected.getValue().getMetrics(), actual.getValue().getMetrics());
Iterator<EventHolder> expectedEvts = expected.getValue().getEvents().iterator();
Iterator<EventHolder> actualEvts = actual.getValue().getEvents().iterator();
@ -708,4 +737,19 @@ public class SelectQueryRunnerTest
throw new ISE("This iterator should be exhausted!");
}
}
private static Iterable<Result<SelectResultValue>> populateNullColumnAtLastForQueryableIndexCase(Iterable<Result<SelectResultValue>> results, String columnName)
{
// A Queryable index does not have the null column when it has loaded a index.
for (Result<SelectResultValue> value : results) {
Set<String> dimensions = value.getValue().getDimensions();
if (dimensions.contains(columnName)) {
break;
}
dimensions.add(columnName);
}
return results;
}
}

View File

@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
@ -154,6 +155,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -1155,6 +1157,9 @@ public class CachingClusteredClientTest
@Test
public void testSelectCaching() throws Exception
{
final Set<String> dimensions = Sets.<String>newHashSet("a");
final Set<String> metrics = Sets.<String>newHashSet("rows");
Druids.SelectQueryBuilder builder = Druids.newSelectQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
@ -1169,14 +1174,13 @@ public class CachingClusteredClientTest
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeSelectResults(new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1)),
new Interval("2011-01-02/2011-01-03"),
makeSelectResults(new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5)),
new Interval("2011-01-05/2011-01-10"),
makeSelectResults(
new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-06"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-07"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-08"), ImmutableMap.of("a", "g", "rows", 8),
@ -1184,8 +1188,7 @@ public class CachingClusteredClientTest
),
new Interval("2011-01-05/2011-01-10"),
makeSelectResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-06T01"), ImmutableMap.of("a", "e", "rows", 6),
new DateTime("2011-01-07T01"), ImmutableMap.of("a", "f", "rows", 7),
new DateTime("2011-01-08T01"), ImmutableMap.of("a", "g", "rows", 8),
@ -1202,8 +1205,7 @@ public class CachingClusteredClientTest
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
makeSelectResults(
new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1),
makeSelectResults(dimensions, metrics, new DateTime("2011-01-01"), ImmutableMap.of("a", "b", "rows", 1),
new DateTime("2011-01-02"), ImmutableMap.of("a", "c", "rows", 5),
new DateTime("2011-01-05"), ImmutableMap.of("a", "d", "rows", 5),
new DateTime("2011-01-05T01"), ImmutableMap.of("a", "d", "rows", 5),
@ -2426,7 +2428,7 @@ public class CachingClusteredClientTest
return retVal;
}
private Iterable<Result<SelectResultValue>> makeSelectResults(Object... objects)
private Iterable<Result<SelectResultValue>> makeSelectResults(Set<String> dimensions, Set<String> metrics, Object... objects)
{
List<Result<SelectResultValue>> retVal = Lists.newArrayList();
int index = 0;
@ -2434,11 +2436,15 @@ public class CachingClusteredClientTest
DateTime timestamp = (DateTime) objects[index++];
List<EventHolder> values = Lists.newArrayList();
while (index < objects.length && !(objects[index] instanceof DateTime)) {
values.add(new EventHolder(null, 0, (Map) objects[index++]));
}
retVal.add(new Result<>(timestamp, new SelectResultValue(null, values)));
retVal.add(new Result<>(
timestamp,
new SelectResultValue(null, dimensions, metrics, values)
));
}
return retVal;
}