From 448da787654ef6ebcab7bb5cba340b931c454320 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 16 Jan 2020 21:02:02 -0800 Subject: [PATCH] Speed up String first/last aggregators when folding isn't needed. (#9181) * Speed up String first/last aggregators when folding isn't needed. Examines the value column, and disables fold checking via a needsFoldCheck flag if that column can't possibly contain SerializableLongStringPairs. This is helpful because it avoids calling getObject on the value selector when unnecessary; say, because the time selector didn't yield an earlier or later value. * PR comments. * Move fastLooseChop to StringUtils. --- .../druid/java/util/common/StringUtils.java | 17 +++++- .../java/util/common/StringUtilsTest.java | 28 ++++++++++ .../first/StringFirstAggregator.java | 44 +++++++++------ .../first/StringFirstAggregatorFactory.java | 13 +++-- .../first/StringFirstBufferAggregator.java | 54 +++++++++++++------ .../first/StringFirstLastUtils.java | 29 +++++++++- .../last/StringLastAggregator.java | 44 +++++++++------ .../last/StringLastAggregatorFactory.java | 14 +++-- .../last/StringLastBufferAggregator.java | 54 +++++++++++++------ .../first/StringFirstAggregationTest.java | 8 ++- .../StringFirstBufferAggregatorTest.java | 46 ++++++++++++++-- .../last/StringLastAggregationTest.java | 5 ++ .../last/StringLastBufferAggregatorTest.java | 50 +++++++++++++++-- 13 files changed, 321 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 381a9d81970..33a4e3c74d5 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -550,8 +550,8 @@ public class StringUtils * Returns the string truncated to maxBytes. * If given string input is shorter than maxBytes, then it remains the same. * - * @param s The input string to possibly be truncated - * @param maxBytes The max bytes that string input will be truncated to + * @param s The input string to possibly be truncated + * @param maxBytes The max bytes that string input will be truncated to * * @return the string after truncated to maxBytes */ @@ -568,4 +568,17 @@ public class StringUtils } } + /** + * Shorten "s" to "maxBytes" chars. Fast and loose because these are *chars* not *bytes*. Use + * {@link #chop(String, int)} for slower, but accurate chopping. + */ + @Nullable + public static String fastLooseChop(@Nullable final String s, final int maxBytes) + { + if (s == null || s.length() <= maxBytes) { + return s; + } else { + return s.substring(0, maxBytes); + } + } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index e9f5f214b08..1d4cb6dbade 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -246,4 +246,32 @@ public class StringUtilsTest Assert.assertEquals(s5, null); } + @Test + public void testChop() + { + Assert.assertEquals("foo", StringUtils.chop("foo", 5)); + Assert.assertEquals("fo", StringUtils.chop("foo", 2)); + Assert.assertEquals("", StringUtils.chop("foo", 0)); + Assert.assertEquals("smile 🙂 for", StringUtils.chop("smile 🙂 for the camera", 14)); + Assert.assertEquals("smile 🙂", StringUtils.chop("smile 🙂 for the camera", 10)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 9)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 8)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 7)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 6)); + Assert.assertEquals("smile", StringUtils.chop("smile 🙂 for the camera", 5)); + } + + @Test + public void testFastLooseChop() + { + Assert.assertEquals("foo", StringUtils.fastLooseChop("foo", 5)); + Assert.assertEquals("fo", StringUtils.fastLooseChop("foo", 2)); + Assert.assertEquals("", StringUtils.fastLooseChop("foo", 0)); + Assert.assertEquals("smile 🙂 for", StringUtils.fastLooseChop("smile 🙂 for the camera", 12)); + Assert.assertEquals("smile 🙂 ", StringUtils.fastLooseChop("smile 🙂 for the camera", 9)); + Assert.assertEquals("smile 🙂", StringUtils.fastLooseChop("smile 🙂 for the camera", 8)); + Assert.assertEquals("smile \uD83D", StringUtils.fastLooseChop("smile 🙂 for the camera", 7)); + Assert.assertEquals("smile ", StringUtils.fastLooseChop("smile 🙂 for the camera", 6)); + Assert.assertEquals("smile", StringUtils.fastLooseChop("smile 🙂 for the camera", 5)); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index 34a81d29a3d..2d5ee990ed1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -25,28 +25,29 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; - -import javax.annotation.Nullable; +import org.apache.druid.segment.DimensionHandlerUtils; public class StringFirstAggregator implements Aggregator { - @Nullable private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; protected long firstTime; protected String firstValue; public StringFirstAggregator( - @Nullable BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.valueSelector = valueSelector; this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; firstTime = DateTimes.MAX.getMillis(); firstValue = null; @@ -55,17 +56,28 @@ public class StringFirstAggregator implements Aggregator @Override public void aggregate() { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { - firstTime = inPair.lhs; - firstValue = inPair.rhs; + if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { + firstTime = inPair.lhs; + firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); + } + } else { + final long time = timeSelector.getLong(); - if (firstValue.length() > maxStringBytes) { - firstValue = firstValue.substring(0, maxStringBytes); + if (time < firstTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); + + if (value != null) { + firstTime = time; + firstValue = StringUtils.fastLooseChop(value, maxStringBytes); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 1b30bf716fe..6ad8558ef0b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -118,20 +119,24 @@ public class StringFirstAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringFirstAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringFirstBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java index 5a4c00686e2..b7d5ac89b06 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java @@ -25,6 +25,7 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; import java.nio.ByteBuffer; @@ -36,18 +37,21 @@ public class StringFirstBufferAggregator implements BufferAggregator ); private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; public StringFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; } @Override @@ -59,20 +63,40 @@ public class StringFirstBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair != null && inPair.rhs != null) { + if (inPair != null && inPair.rhs != null) { + final long firstTime = buf.getLong(position); + if (inPair.lhs < firstTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final long time = timeSelector.getLong(); final long firstTime = buf.getLong(position); - if (inPair.lhs < firstTime) { - StringFirstLastUtils.writePair( - buf, - position, - new SerializablePairLongString(inPair.lhs, inPair.rhs), - maxStringBytes - ); + + if (time < firstTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); + + if (value != null) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(time, value), + maxStringBytes + ); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 3f28a18596a..b26877e776c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -24,6 +24,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -32,10 +35,34 @@ public class StringFirstLastUtils { private static final int NULL_VALUE = -1; + /** + * Returns whether a given value selector *might* contain SerializablePairLongString objects. + */ + public static boolean selectorNeedsFoldCheck( + final BaseObjectColumnValueSelector valueSelector, + @Nullable final ColumnCapabilities valueSelectorCapabilities + ) + { + if (valueSelectorCapabilities != null && valueSelectorCapabilities.getType() != ValueType.COMPLEX) { + // Known, non-complex type. + return false; + } + + if (valueSelector instanceof NilColumnValueSelector) { + // Nil column, definitely no SerializablePairLongStrings. + return false; + } + + // Check if the selector class could possibly be a SerializablePairLongString (either a superclass or subclass). + final Class clazz = valueSelector.classOfObject(); + return clazz.isAssignableFrom(SerializablePairLongString.class) + || SerializablePairLongString.class.isAssignableFrom(clazz); + } + @Nullable public static SerializablePairLongString readPairFromSelectors( final BaseLongColumnValueSelector timeSelector, - final BaseObjectColumnValueSelector valueSelector + final BaseObjectColumnValueSelector valueSelector ) { final long time; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index 47bb6d2fe05..ea37ff42040 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -26,25 +26,29 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; public class StringLastAggregator implements Aggregator { private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; protected long lastTime; protected String lastValue; public StringLastAggregator( - BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + final BaseLongColumnValueSelector timeSelector, + final BaseObjectColumnValueSelector valueSelector, + final int maxStringBytes, + final boolean needsFoldCheck ) { this.valueSelector = valueSelector; this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; lastTime = DateTimes.MIN.getMillis(); lastValue = null; @@ -53,22 +57,28 @@ public class StringLastAggregator implements Aggregator @Override public void aggregate() { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair == null) { - // Don't aggregate nulls. - return; - } + if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { + lastTime = inPair.lhs; + lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); + } + } else { + final long time = timeSelector.getLong(); - if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { - lastTime = inPair.lhs; - lastValue = inPair.rhs; + if (time >= lastTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); - if (lastValue.length() > maxStringBytes) { - lastValue = lastValue.substring(0, maxStringBytes); + if (value != null) { + lastTime = time; + lastValue = StringUtils.fastLooseChop(value, maxStringBytes); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index 9277d0529dd..9a3264f1fba 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -32,7 +32,9 @@ import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -74,20 +76,24 @@ public class StringLastAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringLastAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringLastBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java index 30ea4289939..09e3276d3d5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java @@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; import java.nio.ByteBuffer; @@ -37,18 +38,21 @@ public class StringLastBufferAggregator implements BufferAggregator ); private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; public StringLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; } @Override @@ -60,20 +64,40 @@ public class StringLastBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair != null && inPair.rhs != null) { + if (inPair != null && inPair.rhs != null) { + final long lastTime = buf.getLong(position); + if (inPair.lhs >= lastTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final long time = timeSelector.getLong(); final long lastTime = buf.getLong(position); - if (inPair.lhs >= lastTime) { - StringFirstLastUtils.writePair( - buf, - position, - new SerializablePairLongString(inPair.lhs, inPair.rhs), - maxStringBytes - ); + + if (time >= lastTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); + + if (value != null) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(time, value), + maxStringBytes + ); + } } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java index 190c3435885..0e450d32c37 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -68,6 +70,9 @@ public class StringFirstAggregationTest EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -133,8 +138,7 @@ public class StringFirstAggregationTest @Test public void testStringFirstCombiningBufferAggregator() { - BufferAggregator agg = combiningAggFactory.factorizeBuffered( - colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered(colSelectorFactory); ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java index 04700fae42d..3b4ef692bf5 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java @@ -46,7 +46,6 @@ public class StringFirstBufferAggregatorTest @Test public void testBufferAggregate() { - final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; Integer maxStringBytes = 1024; @@ -61,7 +60,8 @@ public class StringFirstBufferAggregatorTest StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -78,7 +78,43 @@ public class StringFirstBufferAggregatorTest Assert.assertEquals("expected last string value", strings[0], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); + } + @Test + public void testBufferAggregateWithFoldCheck() + { + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringFirstBufferAggregator agg = new StringFirstBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes, + true + ); + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expected last string value", strings[0], sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); } @Test @@ -99,7 +135,8 @@ public class StringFirstBufferAggregatorTest StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -137,7 +174,8 @@ public class StringFirstBufferAggregatorTest StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java index 2e22f9cfcf7..39f9925606e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java @@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -68,6 +70,9 @@ public class StringLastAggregationTest EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java index 18a378855a6..6c350c4cff1 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java @@ -61,7 +61,8 @@ public class StringLastBufferAggregatorTest StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -76,11 +77,48 @@ public class StringLastBufferAggregatorTest SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", "DDDD", sp.rhs); + Assert.assertEquals("expected last string value", "DDDD", sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs)); } + @Test + public void testBufferAggregateWithFoldCheck() + { + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringLastAggregatorFactory factory = new StringLastAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringLastBufferAggregator agg = new StringLastBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes, + true + ); + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expected last string value", "DDDD", sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs)); + } + @Test public void testNullBufferAggregate() { @@ -99,7 +137,8 @@ public class StringLastBufferAggregatorTest StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -114,7 +153,7 @@ public class StringLastBufferAggregatorTest SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[2], sp.rhs); + Assert.assertEquals("expected last string value", strings[2], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs)); } @@ -137,7 +176,8 @@ public class StringLastBufferAggregatorTest StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());