diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java new file mode 100644 index 00000000000..0f44b15018c --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StringDimensionIndexerProcessBenchmark.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.indexing; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.segment.StringDimensionIndexer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +public class StringDimensionIndexerProcessBenchmark +{ + static { + NullHandling.initializeForTests(); + } + + String[] inputData; + StringDimensionIndexer emptyIndexer; + StringDimensionIndexer fullIndexer; + int[] readOrder; + + @Setup(Level.Trial) + public void setup() + { + // maxNumbers : inputData ratio of 1000:50000 is 1:50, in other words we should expect each element to be 'added' + // 50x, the first time it is new, the next 49 times it is an existing entry; also 5% of values will be false + + final int maxNumbers = 1000; + final int nullNumbers = 50; // 5% + final int validNumbers = (maxNumbers + 1) - nullNumbers; + + // set up dummy input data, and load to indexer + inputData = new String[50000]; + for (int i = 0; i < inputData.length; i++) { + int next = ThreadLocalRandom.current().nextInt(maxNumbers); + inputData[i] = (next < nullNumbers) ? null : ("abcd-" + next + "-efgh"); + } + + fullIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false); + for (String data : inputData) { + fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true); + } + + // set up a random read order + readOrder = new int[inputData.length]; + for (int i = 0; i < readOrder.length; i++) { + readOrder[i] = ThreadLocalRandom.current().nextInt(validNumbers); + } + } + @Setup(Level.Iteration) + public void setupEmptyIndexer() + { + emptyIndexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, false); + } + + @Setup(Level.Iteration) + public void shuffleReadOrder() + { + ArrayList asList = new ArrayList<>(readOrder.length); + for (int i : readOrder) { + asList.add(i); + } + + Collections.shuffle(asList); + + for (int i = 0; i < readOrder.length; i++) { + readOrder[i] = asList.get(i); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void soloWriter() + { + // load ALL input data to an empty index; duplicates will be present / should be ignored + for (String data : inputData) { + emptyIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @Group("soloReader") + public void soloReader(Blackhole blackhole) + { + // read ALL elements from a fully loaded index + for (int i : readOrder) { + Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i}); + blackhole.consume(result); + } + } + + // parallel read/write test should simulate what happens when we are (1) ingesting data (aka writing to dictionary) + // and also (2) running query (aka reading from dictionary) + // the read side should continuously read + // the write side should continuously write; but we also need to throw in some random writes too + // since our dataset will fill the 'new write' path quickly, so 1-in-50 elements should be new + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @Group("parallelReadWrite") + public void parallelWriter() + { + int count = 0; + // load ALL input data to an empty index; duplicates will be present / should be ignored + for (String data : inputData) { + fullIndexer.processRowValsToUnsortedEncodedKeyComponent(data, true); + count++; + if (count == 50) { + int next = ThreadLocalRandom.current().nextInt(10000); + fullIndexer.processRowValsToUnsortedEncodedKeyComponent("xxx-" + next + "yz", true); + count = 0; + } + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @Group("parallelReadWrite") + @GroupThreads(3) + public void parallelReader(Blackhole blackhole) + { + for (int i : readOrder) { + Object result = fullIndexer.convertUnsortedEncodedKeyComponentToActualList(new int[]{i}); + blackhole.consume(result); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java index 0bb25f8c78f..754934975c4 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java @@ -27,7 +27,7 @@ import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; /** * Buildable dictionary for some comparable type. Values are unsorted, or rather sorted in the order which they are @@ -51,41 +51,51 @@ public abstract class DimensionDictionary> private final Object2IntMap valueToId = new Object2IntOpenHashMap<>(); private final List idToValue = new ArrayList<>(); - private final ReentrantReadWriteLock lock; + private final StampedLock lock; public DimensionDictionary(Class cls) { this.cls = cls; - this.lock = new ReentrantReadWriteLock(); + this.lock = new StampedLock(); valueToId.defaultReturnValue(ABSENT_VALUE_ID); } public int getId(@Nullable T value) { - lock.readLock().lock(); + if (value == null) { + return idForNull; + } + + long stamp = lock.readLock(); try { - if (value == null) { - return idForNull; - } return valueToId.getInt(value); } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } @Nullable public T getValue(int id) { - lock.readLock().lock(); + if (id == idForNull) { + return null; + } + + // optimistic read + long stamp = lock.tryOptimisticRead(); + T output = idToValue.get(id); + if (lock.validate(stamp)) { + return output; + } + + // classic lock + stamp = lock.readLock(); try { - if (id == idForNull) { - return null; - } return idToValue.get(id); } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } @@ -93,27 +103,36 @@ public abstract class DimensionDictionary> { T[] values = (T[]) Array.newInstance(cls, ids.length); - lock.readLock().lock(); + long stamp = lock.readLock(); try { for (int i = 0; i < ids.length; i++) { - values[i] = (ids[i] == idForNull) ? null : idToValue.get(ids[i]); + values[i] = idToValue.get(ids[i]); } return values; } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } public int size() { - lock.readLock().lock(); + // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present. + + // optimistic read + long stamp = lock.tryOptimisticRead(); + int size = idToValue.size(); + if (lock.validate(stamp)) { + return size; + } + + // classic lock + stamp = lock.readLock(); try { - // using idToValue rather than valueToId because the valueToId doesn't account null value, if it is present. return idToValue.size(); } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } @@ -133,56 +152,104 @@ public abstract class DimensionDictionary> public int add(@Nullable T originalValue) { - lock.writeLock().lock(); - try { - if (originalValue == null) { - if (idForNull == ABSENT_VALUE_ID) { - idForNull = idToValue.size(); - idToValue.add(null); + if (originalValue == null) { + return addNull(); + } + + long stamp = lock.tryReadLock(); + if (stamp != 0) { + try { + int existing = valueToId.getInt(originalValue); + if (existing >= 0) { + return existing; } - return idForNull; } - int prev = valueToId.getInt(originalValue); + finally { + lock.unlockRead(stamp); + } + } + + long extraSize = 0; + if (computeOnHeapSize()) { + // Add size of new dim value and 2 references (valueToId and idToValue) + extraSize = estimateSizeOfValue(originalValue) + 2L * Long.BYTES; + } + + stamp = lock.writeLock(); + try { + final int index = idToValue.size(); + final int prev = valueToId.putIfAbsent(originalValue, index); if (prev >= 0) { return prev; } - final int index = idToValue.size(); - valueToId.put(originalValue, index); - idToValue.add(originalValue); - if (computeOnHeapSize()) { - // Add size of new dim value and 2 references (valueToId and idToValue) - sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + 2L * Long.BYTES); - } + idToValue.add(originalValue); + sizeInBytes.addAndGet(extraSize); minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue; maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue; return index; } finally { - lock.writeLock().unlock(); + lock.unlockWrite(stamp); + } + } + + private int addNull() + { + if (idForNull != ABSENT_VALUE_ID) { + return idForNull; + } + + long stamp = lock.writeLock(); + try { + // check, in case it was changed by another thread + if (idForNull == ABSENT_VALUE_ID) { + idForNull = idToValue.size(); + idToValue.add(null); + } + return idForNull; + } + finally { + lock.unlockWrite(stamp); } } public T getMinValue() { - lock.readLock().lock(); + // optimistic read + long stamp = lock.tryOptimisticRead(); + T output = minValue; + if (lock.validate(stamp)) { + return output; + } + + // classic lock + stamp = lock.readLock(); try { return minValue; } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } public T getMaxValue() { - lock.readLock().lock(); + // optimistic read + long stamp = lock.tryOptimisticRead(); + T output = maxValue; + if (lock.validate(stamp)) { + return output; + } + + // classic lock + stamp = lock.readLock(); try { return maxValue; } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } } @@ -193,12 +260,12 @@ public abstract class DimensionDictionary> public SortedDimensionDictionary sort() { - lock.readLock().lock(); + long stamp = lock.readLock(); try { return new SortedDimensionDictionary<>(idToValue, idToValue.size()); } finally { - lock.readLock().unlock(); + lock.unlockRead(stamp); } }