Dimension dictionary reduce locking (#13710)

* perf: introduce benchmark for StringDimensionIndexer

jdk11 -- Benchmark                                                       Mode  Cnt      Score     Error  Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10  30471.552 ±  456.716  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10  18069.863 ±  327.923  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  67676.617 ± 2351.311  us/op
StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10   1048.079 ±    1.120  us/op
StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   4629.769 ±   29.353  us/op

* perf: switch DimensionDictionary to StampedLock

jdk11 - Benchmark                                                        Mode  Cnt      Score      Error  Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10  37958.372 ± 1685.206  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10  31192.232 ± 2755.365  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  58256.791 ± 1998.220  us/op
StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10   1079.440 ±    1.753  us/op
StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   4585.690 ±   13.225  us/op

* perf: use optimistic locking in DimensionDictionary

jdk11 - Benchmark                                                        Mode  Cnt      Score     Error  Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   6212.366 ± 162.684  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10   1807.235 ± 109.339  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  19427.759 ± 611.692  us/op
StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    194.370 ±   1.050  us/op
StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   2871.423 ±  14.426  us/op

* perf: refactor DimensionDictionary null handling to need less locks

jdk11 - Benchmark                                                        Mode  Cnt      Score      Error  Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   6591.619 ±  470.497  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10   1387.338 ±  144.587  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  22204.462 ± 1620.806  us/op
StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    204.911 ±    0.459  us/op
StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   2935.376 ±   12.639  us/op

* perf: refactor DimensionDictionary add handling to do a little less work

jdk11 - Benchmark                                                        Mode  Cnt      Score    Error  Units
StringDimensionIndexerProcessBenchmark.parallelReadWrite                 avgt   10   2914.859 ± 22.519  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelReader  avgt   10    508.010 ± 14.675  us/op
StringDimensionIndexerProcessBenchmark.parallelReadWrite:parallelWriter  avgt   10  10135.408 ± 82.745  us/op
StringDimensionIndexerProcessBenchmark.soloReader                        avgt   10    205.415 ±  0.158  us/op
StringDimensionIndexerProcessBenchmark.soloWriter                        avgt   10   3098.743 ± 23.603  us/op
This commit is contained in:
Jason Koch 2023-02-01 02:59:12 -08:00 committed by GitHub
parent ec1e6ac840
commit 7a3bd89a85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 278 additions and 41 deletions

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.indexing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.segment.StringDimensionIndexer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
public class StringDimensionIndexerProcessBenchmark
{
static {
NullHandling.initializeForTests();
}
String[] inputData;
StringDimensionIndexer emptyIndexer;
StringDimensionIndexer fullIndexer;
int[] readOrder;
@Setup(Level.Trial)
public void setup()
{
// maxNumbers : inputData ratio of 1000:50000 is 1:50, in other words we should expect each element to be 'added'
// 50x, the first time it is new, the next 49 times it is an existing entry; also 5% of values will be false
final int maxNumbers = 1000;
final int nullNumbers = 50; // 5%
final int validNumbers = (maxNumbers + 1) - nullNumbers;
// set up dummy input data, and load to indexer
inputData = new String[50000];
for (int i = 0; i < inputData.length; i++) {
int next = ThreadLocalRandom.current().nextInt(maxNumbers);
inputData[i] = (next < nullNumbers) ? null : ("abcd-" + next + "-efgh");
}
fullIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
for (String data : inputData) {
fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
}
// set up a random read order
readOrder = new int[inputData.length];
for (int i = 0; i < readOrder.length; i++) {
readOrder[i] = ThreadLocalRandom.current().nextInt(validNumbers);
}
}
@Setup(Level.Iteration)
public void setupEmptyIndexer()
{
emptyIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false);
}
@Setup(Level.Iteration)
public void shuffleReadOrder()
{
ArrayList<Integer> asList = new ArrayList<>(readOrder.length);
for (int i : readOrder) {
asList.add(i);
}
Collections.shuffle(asList);
for (int i = 0; i < readOrder.length; i++) {
readOrder[i] = asList.get(i);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void soloWriter()
{
// load ALL input data to an empty index; duplicates will be present / should be ignored
for (String data : inputData) {
emptyIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Group("soloReader")
public void soloReader(Blackhole blackhole)
{
// read ALL elements from a fully loaded index
for (int i : readOrder) {
Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
blackhole.consume(result);
}
}
// parallel read/write test should simulate what happens when we are (1) ingesting data (aka writing to dictionary)
// and also (2) running query (aka reading from dictionary)
// the read side should continuously read
// the write side should continuously write; but we also need to throw in some random writes too
// since our dataset will fill the 'new write' path quickly, so 1-in-50 elements should be new
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Group("parallelReadWrite")
public void parallelWriter()
{
int count = 0;
// load ALL input data to an empty index; duplicates will be present / should be ignored
for (String data : inputData) {
fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true);
count++;
if (count == 50) {
int next = ThreadLocalRandom.current().nextInt(10000);
fullIndexer.processRowValsToUnsortedEncodedKeyComponent("xxx-" + next + "yz", true);
count = 0;
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Group("parallelReadWrite")
@GroupThreads(3)
public void parallelReader(Blackhole blackhole)
{
for (int i : readOrder) {
Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i});
blackhole.consume(result);
}
}
}

View File

@ -27,7 +27,7 @@ import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
/**
* Buildable dictionary for some comparable type. Values are unsorted, or rather sorted in the order which they are
@ -51,41 +51,51 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
private final List<T> idToValue = new ArrayList<>();
private final ReentrantReadWriteLock lock;
private final StampedLock lock;
public DimensionDictionary(Class<T> cls)
{
this.cls = cls;
this.lock = new ReentrantReadWriteLock();
this.lock = new StampedLock();
valueToId.defaultReturnValue(ABSENT_VALUE_ID);
}
public int getId(@Nullable T value)
{
lock.readLock().lock();
if (value == null) {
return idForNull;
}
long stamp = lock.readLock();
try {
if (value == null) {
return idForNull;
}
return valueToId.getInt(value);
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
@Nullable
public T getValue(int id)
{
lock.readLock().lock();
if (id == idForNull) {
return null;
}
// optimistic read
long stamp = lock.tryOptimisticRead();
T output = idToValue.get(id);
if (lock.validate(stamp)) {
return output;
}
// classic lock
stamp = lock.readLock();
try {
if (id == idForNull) {
return null;
}
return idToValue.get(id);
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
@ -93,27 +103,36 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
{
T[] values = (T[]) Array.newInstance(cls, ids.length);
lock.readLock().lock();
long stamp = lock.readLock();
try {
for (int i = 0; i < ids.length; i++) {
values[i] = (ids[i] == idForNull) ? null : idToValue.get(ids[i]);
values[i] = idToValue.get(ids[i]);
}
return values;
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
public int size()
{
lock.readLock().lock();
// using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
// optimistic read
long stamp = lock.tryOptimisticRead();
int size = idToValue.size();
if (lock.validate(stamp)) {
return size;
}
// classic lock
stamp = lock.readLock();
try {
// using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present.
return idToValue.size();
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
@ -133,56 +152,104 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
public int add(@Nullable T originalValue)
{
lock.writeLock().lock();
try {
if (originalValue == null) {
if (idForNull == ABSENT_VALUE_ID) {
idForNull = idToValue.size();
idToValue.add(null);
if (originalValue == null) {
return addNull();
}
long stamp = lock.tryReadLock();
if (stamp != 0) {
try {
int existing = valueToId.getInt(originalValue);
if (existing >= 0) {
return existing;
}
return idForNull;
}
int prev = valueToId.getInt(originalValue);
finally {
lock.unlockRead(stamp);
}
}
long extraSize = 0;
if (computeOnHeapSize()) {
// Add size of new dim value and 2 references (valueToId and idToValue)
extraSize = estimateSizeOfValue(originalValue) + 2L * Long.BYTES;
}
stamp = lock.writeLock();
try {
final int index = idToValue.size();
final int prev = valueToId.putIfAbsent(originalValue, index);
if (prev >= 0) {
return prev;
}
final int index = idToValue.size();
valueToId.put(originalValue, index);
idToValue.add(originalValue);
if (computeOnHeapSize()) {
// Add size of new dim value and 2 references (valueToId and idToValue)
sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + 2L * Long.BYTES);
}
idToValue.add(originalValue);
sizeInBytes.addAndGet(extraSize);
minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
return index;
}
finally {
lock.writeLock().unlock();
lock.unlockWrite(stamp);
}
}
private int addNull()
{
if (idForNull != ABSENT_VALUE_ID) {
return idForNull;
}
long stamp = lock.writeLock();
try {
// check, in case it was changed by another thread
if (idForNull == ABSENT_VALUE_ID) {
idForNull = idToValue.size();
idToValue.add(null);
}
return idForNull;
}
finally {
lock.unlockWrite(stamp);
}
}
public T getMinValue()
{
lock.readLock().lock();
// optimistic read
long stamp = lock.tryOptimisticRead();
T output = minValue;
if (lock.validate(stamp)) {
return output;
}
// classic lock
stamp = lock.readLock();
try {
return minValue;
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
public T getMaxValue()
{
lock.readLock().lock();
// optimistic read
long stamp = lock.tryOptimisticRead();
T output = maxValue;
if (lock.validate(stamp)) {
return output;
}
// classic lock
stamp = lock.readLock();
try {
return maxValue;
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}
@ -193,12 +260,12 @@ public abstract class DimensionDictionary<T extends Comparable<T>>
public SortedDimensionDictionary<T> sort()
{
lock.readLock().lock();
long stamp = lock.readLock();
try {
return new SortedDimensionDictionary<>(idToValue, idToValue.size());
}
finally {
lock.readLock().unlock();
lock.unlockRead(stamp);
}
}