Speed up String first/last aggregators when folding isn't needed. (#9181)

* Speed up String first/last aggregators when folding isn't needed.

Examines the value column, and disables fold checking via a needsFoldCheck
flag if that column can't possibly contain SerializableLongStringPairs. This
is helpful because it avoids calling getObject on the value selector when
unnecessary; say, because the time selector didn't yield an earlier or later
value.

* PR comments.

* Move fastLooseChop to StringUtils.
This commit is contained in:
Gian Merlino 2020-01-16 21:02:02 -08:00 committed by Clint Wylie
parent 486c0fd149
commit 448da78765
13 changed files with 321 additions and 85 deletions

View File

@ -550,8 +550,8 @@ public class StringUtils
* Returns the string truncated to maxBytes.
* If given string input is shorter than maxBytes, then it remains the same.
*
* @param s The input string to possibly be truncated
* @param maxBytes The max bytes that string input will be truncated to
* @param s The input string to possibly be truncated
* @param maxBytes The max bytes that string input will be truncated to
*
* @return the string after truncated to maxBytes
*/
@ -568,4 +568,17 @@ public class StringUtils
}
}
/**
* Shorten "s" to "maxBytes" chars. Fast and loose because these are *chars* not *bytes*. Use
* {@link #chop(String, int)} for slower, but accurate chopping.
*/
@Nullable
public static String fastLooseChop(@Nullable final String s, final int maxBytes)
{
if (s == null || s.length() <= maxBytes) {
return s;
} else {
return s.substring(0, maxBytes);
}
}
}

View File

@ -246,4 +246,32 @@ public class StringUtilsTest
Assert.assertEquals(s5, null);
}
@Test
public void testChop()
{
Assert.assertEquals("foo", StringUtils.chop("foo", 5));
Assert.assertEquals("fo", StringUtils.chop("foo", 2));
Assert.assertEquals("", StringUtils.chop("foo", 0));
Assert.assertEquals("smile 🙂 for", StringUtils.chop("smile 🙂 for the camera", 14));
Assert.assertEquals("smile 🙂", StringUtils.chop("smile 🙂 for the camera", 10));
Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 9));
Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 8));
Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 7));
Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 6));
Assert.assertEquals("smile", StringUtils.chop("smile 🙂 for the camera", 5));
}
@Test
public void testFastLooseChop()
{
Assert.assertEquals("foo", StringUtils.fastLooseChop("foo", 5));
Assert.assertEquals("fo", StringUtils.fastLooseChop("foo", 2));
Assert.assertEquals("", StringUtils.fastLooseChop("foo", 0));
Assert.assertEquals("smile 🙂 for", StringUtils.fastLooseChop("smile 🙂 for the camera", 12));
Assert.assertEquals("smile 🙂 ", StringUtils.fastLooseChop("smile 🙂 for the camera", 9));
Assert.assertEquals("smile 🙂", StringUtils.fastLooseChop("smile 🙂 for the camera", 8));
Assert.assertEquals("smile \uD83D", StringUtils.fastLooseChop("smile 🙂 for the camera", 7));
Assert.assertEquals("smile ", StringUtils.fastLooseChop("smile 🙂 for the camera", 6));
Assert.assertEquals("smile", StringUtils.fastLooseChop("smile 🙂 for the camera", 5));
}
}

View File

@ -25,28 +25,29 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
import org.apache.druid.segment.DimensionHandlerUtils;
public class StringFirstAggregator implements Aggregator
{
@Nullable
private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
private final boolean needsFoldCheck;
protected long firstTime;
protected String firstValue;
public StringFirstAggregator(
@Nullable BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector valueSelector,
int maxStringBytes
BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector<?> valueSelector,
int maxStringBytes,
boolean needsFoldCheck
)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes;
this.needsFoldCheck = needsFoldCheck;
firstTime = DateTimes.MAX.getMillis();
firstValue = null;
@ -55,17 +56,28 @@ public class StringFirstAggregator implements Aggregator
@Override
public void aggregate()
{
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
firstTime = inPair.lhs;
firstValue = inPair.rhs;
if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) {
firstTime = inPair.lhs;
firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
}
} else {
final long time = timeSelector.getLong();
if (firstValue.length() > maxStringBytes) {
firstValue = firstValue.substring(0, maxStringBytes);
if (time < firstTime) {
final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
if (value != null) {
firstTime = time;
firstValue = StringUtils.fastLooseChop(value, maxStringBytes);
}
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
@ -118,20 +119,24 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector = metricFactory.makeColumnValueSelector(fieldName);
return new StringFirstAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes
valueSelector,
maxStringBytes,
StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector = metricFactory.makeColumnValueSelector(fieldName);
return new StringFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes
valueSelector,
maxStringBytes,
StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
);
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import java.nio.ByteBuffer;
@ -36,18 +37,21 @@ public class StringFirstBufferAggregator implements BufferAggregator
);
private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
private final boolean needsFoldCheck;
public StringFirstBufferAggregator(
BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector valueSelector,
int maxStringBytes
BaseObjectColumnValueSelector<?> valueSelector,
int maxStringBytes,
boolean needsFoldCheck
)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
this.needsFoldCheck = needsFoldCheck;
}
@Override
@ -59,20 +63,40 @@ public class StringFirstBufferAggregator implements BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (inPair != null && inPair.rhs != null) {
if (inPair != null && inPair.rhs != null) {
final long firstTime = buf.getLong(position);
if (inPair.lhs < firstTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
}
}
} else {
final long time = timeSelector.getLong();
final long firstTime = buf.getLong(position);
if (inPair.lhs < firstTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
if (time < firstTime) {
final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
if (value != null) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(time, value),
maxStringBytes
);
}
}
}
}

View File

@ -24,6 +24,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -32,10 +35,34 @@ public class StringFirstLastUtils
{
private static final int NULL_VALUE = -1;
/**
* Returns whether a given value selector *might* contain SerializablePairLongString objects.
*/
public static boolean selectorNeedsFoldCheck(
final BaseObjectColumnValueSelector<?> valueSelector,
@Nullable final ColumnCapabilities valueSelectorCapabilities
)
{
if (valueSelectorCapabilities != null && valueSelectorCapabilities.getType() != ValueType.COMPLEX) {
// Known, non-complex type.
return false;
}
if (valueSelector instanceof NilColumnValueSelector) {
// Nil column, definitely no SerializablePairLongStrings.
return false;
}
// Check if the selector class could possibly be a SerializablePairLongString (either a superclass or subclass).
final Class<?> clazz = valueSelector.classOfObject();
return clazz.isAssignableFrom(SerializablePairLongString.class)
|| SerializablePairLongString.class.isAssignableFrom(clazz);
}
@Nullable
public static SerializablePairLongString readPairFromSelectors(
final BaseLongColumnValueSelector timeSelector,
final BaseObjectColumnValueSelector valueSelector
final BaseObjectColumnValueSelector<?> valueSelector
)
{
final long time;

View File

@ -26,25 +26,29 @@ import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
public class StringLastAggregator implements Aggregator
{
private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
private final boolean needsFoldCheck;
protected long lastTime;
protected String lastValue;
public StringLastAggregator(
BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector valueSelector,
int maxStringBytes
final BaseLongColumnValueSelector timeSelector,
final BaseObjectColumnValueSelector<?> valueSelector,
final int maxStringBytes,
final boolean needsFoldCheck
)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;
this.maxStringBytes = maxStringBytes;
this.needsFoldCheck = needsFoldCheck;
lastTime = DateTimes.MIN.getMillis();
lastValue = null;
@ -53,22 +57,28 @@ public class StringLastAggregator implements Aggregator
@Override
public void aggregate()
{
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (inPair == null) {
// Don't aggregate nulls.
return;
}
if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
lastTime = inPair.lhs;
lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes);
}
} else {
final long time = timeSelector.getLong();
if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) {
lastTime = inPair.lhs;
lastValue = inPair.rhs;
if (time >= lastTime) {
final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
if (lastValue.length() > maxStringBytes) {
lastValue = lastValue.substring(0, maxStringBytes);
if (value != null) {
lastTime = time;
lastValue = StringUtils.fastLooseChop(value, maxStringBytes);
}
}
}
}

View File

@ -32,7 +32,9 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnHolder;
@ -74,20 +76,24 @@ public class StringLastAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector = metricFactory.makeColumnValueSelector(fieldName);
return new StringLastAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes
valueSelector,
maxStringBytes,
StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final BaseObjectColumnValueSelector<?> valueSelector = metricFactory.makeColumnValueSelector(fieldName);
return new StringLastBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
metricFactory.makeColumnValueSelector(fieldName),
maxStringBytes
valueSelector,
maxStringBytes,
StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
);
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.DimensionHandlerUtils;
import java.nio.ByteBuffer;
@ -37,18 +38,21 @@ public class StringLastBufferAggregator implements BufferAggregator
);
private final BaseLongColumnValueSelector timeSelector;
private final BaseObjectColumnValueSelector valueSelector;
private final BaseObjectColumnValueSelector<?> valueSelector;
private final int maxStringBytes;
private final boolean needsFoldCheck;
public StringLastBufferAggregator(
BaseLongColumnValueSelector timeSelector,
BaseObjectColumnValueSelector valueSelector,
int maxStringBytes
BaseObjectColumnValueSelector<?> valueSelector,
int maxStringBytes,
boolean needsFoldCheck
)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.maxStringBytes = maxStringBytes;
this.needsFoldCheck = needsFoldCheck;
}
@Override
@ -60,20 +64,40 @@ public class StringLastBufferAggregator implements BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors(
timeSelector,
valueSelector
);
if (inPair != null && inPair.rhs != null) {
if (inPair != null && inPair.rhs != null) {
final long lastTime = buf.getLong(position);
if (inPair.lhs >= lastTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
}
}
} else {
final long time = timeSelector.getLong();
final long lastTime = buf.getLong(position);
if (inPair.lhs >= lastTime) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(inPair.lhs, inPair.rhs),
maxStringBytes
);
if (time >= lastTime) {
final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject());
if (value != null) {
StringFirstLastUtils.writePair(
buf,
position,
new SerializablePairLongString(time, value),
maxStringBytes
);
}
}
}
}

View File

@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@ -68,6 +70,9 @@ public class StringFirstAggregationTest
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}
@ -133,8 +138,7 @@ public class StringFirstAggregationTest
@Test
public void testStringFirstCombiningBufferAggregator()
{
BufferAggregator agg = combiningAggFactory.factorizeBuffered(
colSelectorFactory);
BufferAggregator agg = combiningAggFactory.factorizeBuffered(colSelectorFactory);
ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

View File

@ -46,7 +46,6 @@ public class StringFirstBufferAggregatorTest
@Test
public void testBufferAggregate()
{
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
@ -61,7 +60,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@ -78,7 +78,43 @@ public class StringFirstBufferAggregatorTest
Assert.assertEquals("expected last string value", strings[0], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs));
}
@Test
public void testBufferAggregateWithFoldCheck()
{
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes,
true
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < timestamps.length; i++) {
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
}
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expected last string value", strings[0], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs));
}
@Test
@ -99,7 +135,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@ -137,7 +174,8 @@ public class StringFirstBufferAggregatorTest
StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());

View File

@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.TestLongColumnSelector;
import org.apache.druid.query.aggregation.TestObjectColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@ -68,6 +70,9 @@ public class StringLastAggregationTest
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
.andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING));
EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null);
EasyMock.replay(colSelectorFactory);
}

View File

@ -61,7 +61,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@ -76,11 +77,48 @@ public class StringLastBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs));
}
@Test
public void testBufferAggregateWithFoldCheck()
{
final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
Integer maxStringBytes = 1024;
TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
"billy", "billy", maxStringBytes
);
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes,
true
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
int position = 0;
agg.init(buf, position);
//noinspection ForLoopReplaceableByForEach
for (int i = 0; i < timestamps.length; i++) {
aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
}
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expected last string value", "DDDD", sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs));
}
@Test
public void testNullBufferAggregate()
{
@ -99,7 +137,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
@ -114,7 +153,7 @@ public class StringLastBufferAggregatorTest
SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
Assert.assertEquals("expected last string value", strings[2], sp.rhs);
Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs));
}
@ -137,7 +176,8 @@ public class StringLastBufferAggregatorTest
StringLastBufferAggregator agg = new StringLastBufferAggregator(
longColumnSelector,
objectColumnSelector,
maxStringBytes
maxStringBytes,
false
);
ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());