Order rows during incremental index persist when rollup is disabled. (#6107)

* order using IncrementalIndexRowComparator at persist time when rollup is disabled, allowing increased effectiveness of dimension compression, resolves #6066

* fix stuff from review
This commit is contained in:
Clint Wylie 2018-08-06 14:17:48 -07:00 committed by Gian Merlino
parent ef2d6e9118
commit 62677212cc
9 changed files with 226 additions and 46 deletions

View File

@ -157,4 +157,54 @@ public class BenchmarkSchemas
);
SCHEMA_MAP.put("simpleFloat", basicSchema);
}
static { // schema with high opportunity for rollup
List<BenchmarkColumnSchema> rolloColumns = ImmutableList.of(
// dims
BenchmarkColumnSchema.makeEnumerated(
"dimEnumerated",
ValueType.STRING,
false,
1,
null,
Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
),
BenchmarkColumnSchema.makeEnumerated(
"dimEnumerated2",
ValueType.STRING,
false,
1,
null,
Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
),
BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0),
BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100),
// metrics
BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0),
BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5)
);
List<AggregatorFactory> rolloSchemaIngestAggs = new ArrayList<>();
rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo(
rolloColumns,
rolloSchemaIngestAggs,
basicSchemaDataInterval,
true
);
SCHEMA_MAP.put("rollo", rolloSchema);
}
}

View File

@ -32,7 +32,6 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.serde.ComplexMetrics;
@ -64,44 +63,38 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class IndexPersistBenchmark
{
@Param({"75000"})
private int rowsPerSegment;
@Param({"basic"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
public static final ObjectMapper JSON_MAPPER;
private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
private ArrayList<InputRow> rows;
private BenchmarkSchemaInfo schemaInfo;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
() -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
@Param({"75000"})
private int rowsPerSegment;
@Param({"rollo"})
private String schema;
@Param({"true", "false"})
private boolean rollup;
@Param({"none", "moderate", "high"})
private String rollupOpportunity;
private IncrementalIndex incIndex;
private ArrayList<InputRow> rows;
private BenchmarkSchemaInfo schemaInfo;
@Setup
public void setup()
{
@ -114,11 +107,23 @@ public class IndexPersistBenchmark
rows = new ArrayList<InputRow>();
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
int valuesPerTimestamp = 1;
switch (rollupOpportunity) {
case "moderate":
valuesPerTimestamp = 1000;
break;
case "high":
valuesPerTimestamp = 10000;
break;
}
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
schemaInfo.getDataInterval().getStartMillis(),
valuesPerTimestamp,
1000.0
);
for (int i = 0; i < rowsPerSegment; i++) {
@ -128,8 +133,6 @@ public class IndexPersistBenchmark
}
rows.add(row);
}
}
@Setup(Level.Iteration)
@ -154,9 +157,9 @@ public class IndexPersistBenchmark
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)

View File

@ -80,6 +80,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
@ -94,6 +95,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
/**
*/
@ -366,9 +368,24 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
@VisibleForTesting
public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
this.incrementalIndexSchema = new IncrementalIndexSchema.Builder()
.withMetrics(metrics)
.build();
return setSimpleTestingIndexSchema(null, metrics);
}
/**
* A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
* other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
* would use it in production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
{
IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
return this;
}
@ -1202,6 +1219,12 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
Iterable<IncrementalIndexRow> keySet();
/**
* Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
* @return
*/
Iterable<IncrementalIndexRow> persistIterable();
/**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
@ -1289,6 +1312,13 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return facts.keySet();
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
// with rollup, facts are already pre-sorted so just return keyset
return keySet();
}
@Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
@ -1310,7 +1340,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
private final boolean sortFacts;
private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
public PlainFactsHolder(boolean sortFacts)
private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
{
this.sortFacts = sortFacts;
if (sortFacts) {
@ -1318,6 +1350,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
} else {
this.facts = new ConcurrentHashMap<>();
}
this.incrementalIndexRowComparator = incrementalIndexRowComparator;
}
@Override
@ -1351,10 +1384,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
return concat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
.descendingMap().values(), true).iterator();
}
return concat(facts.values(), false).iterator();
return timeOrderedConcat(facts.values(), false).iterator();
}
@Override
@ -1363,10 +1396,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).subMap(timeStart, timeEnd);
final Map<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
return concat(rangeMap.values(), descending);
return timeOrderedConcat(rangeMap.values(), descending);
}
private Iterable<IncrementalIndexRow> concat(
private Iterable<IncrementalIndexRow> timeOrderedConcat(
final Iterable<Deque<IncrementalIndexRow>> iterable,
final boolean descending
)
@ -1379,10 +1412,25 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
);
}
private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
final Collection<Deque<IncrementalIndexRow>> rowGroups
)
{
return rowGroups.stream()
.flatMap(Collection::stream)
.sorted(incrementalIndexRowComparator);
}
@Override
public Iterable<IncrementalIndexRow> keySet()
{
return concat(facts.values(), false);
return timeOrderedConcat(facts.values(), false);
}
@Override
public Iterable<IncrementalIndexRow> persistIterable()
{
return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
}
@Override

View File

@ -93,7 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
)
{
int rowNum = 0;
for (IncrementalIndexRow row : index.getFacts().keySet()) {
for (IncrementalIndexRow row : index.getFacts().persistIterable()) {
final Object[] dims = row.getDims();
for (IncrementalIndex.DimensionDesc dimension : dimensions) {

View File

@ -50,7 +50,7 @@ class IncrementalIndexRowIterator implements TransformableRowIterator
IncrementalIndexRowIterator(IncrementalIndex<?> incrementalIndex)
{
this.timeAndDimsIterator = incrementalIndex.getFacts().keySet().iterator();
this.timeAndDimsIterator = incrementalIndex.getFacts().persistIterable().iterator();
this.currentRowPointer = makeRowPointer(incrementalIndex, currentRowHolder, currentRowNumCounter);
// markedRowPointer doesn't actually need to be a RowPointer (just a TimeAndDimsPointer), but we create a RowPointer
// in order to reuse the makeRowPointer() method. Passing a dummy RowNumCounter.

View File

@ -84,7 +84,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
this.bufferPool = bufferPool;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts);
: new PlainFactsHolder(sortFacts, dimsComparator());
//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();

View File

@ -79,7 +79,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts);
: new PlainFactsHolder(sortFacts, dimsComparator());
maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
}

View File

@ -240,7 +240,7 @@ public class IncrementalIndexTest
}
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggregatorFactories)
.setSimpleTestingIndexSchema(false, aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.incremental;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.RowIterator;
@ -32,6 +33,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
public class IncrementalIndexAdapterTest
{
@ -83,4 +85,81 @@ public class IncrementalIndexAdapterTest
Assert.assertEquals(0, (long) rowNums.get(0));
Assert.assertEquals(1, (long) rowNums.get(1));
}
@Test
public void testGetRowsIterableNoRollup() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
ArrayList<Integer> dim1Vals = new ArrayList<>();
for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
dim1Vals.add(((int[]) row.getDims()[0])[0]);
}
ArrayList<Integer> dim2Vals = new ArrayList<>();
for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
dim2Vals.add(((int[]) row.getDims()[1])[0]);
}
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
INDEX_SPEC.getBitmapSerdeFactory()
.getBitmapFactory()
);
RowIterator rows = incrementalAdapter.getRows();
List<String> rowStrings = new ArrayList<>();
while (rows.moveToNext()) {
rowStrings.add(rows.getPointer().toString());
}
Function<Integer, String> getExpected = (rowNumber) -> {
if (rowNumber < 3) {
return StringUtils.format(
"RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=1, dim2=2}, metrics={count=1}}",
rowNumber,
timestamp
);
} else {
return StringUtils.format(
"RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=3, dim2=4}, metrics={count=1}}",
rowNumber,
timestamp
);
}
};
// without sorting, output would be
// RowPointer{indexNum=0, rowNumber=0, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=1, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=2, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=3, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=4, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=5, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
// but with sorting, output should be
// RowPointer{indexNum=0, rowNumber=0, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=1, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=2, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=3, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=4, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
// RowPointer{indexNum=0, rowNumber=5, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
Assert.assertEquals(6, rowStrings.size());
for (int i = 0; i < 6; i++) {
if (i % 2 == 0) {
Assert.assertEquals(0, (long) dim1Vals.get(i));
Assert.assertEquals(0, (long) dim2Vals.get(i));
} else {
Assert.assertEquals(1, (long) dim1Vals.get(i));
Assert.assertEquals(1, (long) dim2Vals.get(i));
}
Assert.assertEquals(getExpected.apply(i), rowStrings.get(i));
}
}
}