groupBy v2: Parallel disk spilling. (#3433)

In ConcurrentGrouper, when it becomes clear that disk spilling is necessary, switch
from hash-based partitioning to thread-based partitioning. This stops processing
threads from blocking each other while spilling is occurring.
This commit is contained in:
Gian Merlino 2016-09-09 15:49:58 -07:00 committed by David Lim
parent 1344e3c3af
commit d108461838
4 changed files with 79 additions and 30 deletions

View File

@ -20,6 +20,7 @@
package io.druid.query.groupby.epinephelinae; package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
@ -28,34 +29,52 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe. * Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe.
* *
* The passed-in buffer is cut up into "concurrencyHint" slices, and each slice is passed to a different underlying * The passed-in buffer is cut up into concurrencyHint slices, and each slice is passed to a different underlying
* grouper. Access to each slice is separately synchronized. * grouper. Access to each slice is separately synchronized. As long as the result set fits in memory, keys are
* partitioned between buffers based on their hash, and multiple threads can write into the same buffer. When
* it becomes clear that the result set does not fit in memory, the table switches to a mode where each thread
* gets its own buffer and its own spill files on disk.
*/ */
public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements Grouper<KeyType> public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements Grouper<KeyType>
{ {
private final List<Grouper<KeyType>> groupers; private final List<SpillingGrouper<KeyType>> groupers;
private final ThreadLocal<SpillingGrouper<KeyType>> threadLocalGrouper;
private final AtomicInteger threadNumber = new AtomicInteger();
private volatile boolean spilling = false;
private volatile boolean closed = false; private volatile boolean closed = false;
public ConcurrentGrouper( public ConcurrentGrouper(
final ByteBuffer buffer, final ByteBuffer buffer,
final int concurrencyHint, final KeySerdeFactory<KeyType> keySerdeFactory,
final LimitedTemporaryStorage temporaryStorage, final ColumnSelectorFactory columnSelectorFactory,
final ObjectMapper spillMapper, final AggregatorFactory[] aggregatorFactories,
final int bufferGrouperMaxSize, final int bufferGrouperMaxSize,
final float bufferGrouperMaxLoadFactor, final float bufferGrouperMaxLoadFactor,
final int bufferGrouperInitialBuckets, final int bufferGrouperInitialBuckets,
final KeySerdeFactory<KeyType> keySerdeFactory, final LimitedTemporaryStorage temporaryStorage,
final ColumnSelectorFactory columnSelectorFactory, final ObjectMapper spillMapper,
final AggregatorFactory[] aggregatorFactories final int concurrencyHint
) )
{ {
Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
this.groupers = new ArrayList<>(concurrencyHint); this.groupers = new ArrayList<>(concurrencyHint);
this.threadLocalGrouper = new ThreadLocal<SpillingGrouper<KeyType>>()
{
@Override
protected SpillingGrouper<KeyType> initialValue()
{
return groupers.get(threadNumber.getAndIncrement());
}
};
final int sliceSize = (buffer.capacity() / concurrencyHint); final int sliceSize = (buffer.capacity() / concurrencyHint);
for (int i = 0; i < concurrencyHint; i++) { for (int i = 0; i < concurrencyHint; i++) {
final ByteBuffer slice = buffer.duplicate(); final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i); slice.position(sliceSize * i);
@ -66,11 +85,12 @@ public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements G
keySerdeFactory, keySerdeFactory,
columnSelectorFactory, columnSelectorFactory,
aggregatorFactories, aggregatorFactories,
temporaryStorage,
spillMapper,
bufferGrouperMaxSize, bufferGrouperMaxSize,
bufferGrouperMaxLoadFactor, bufferGrouperMaxLoadFactor,
bufferGrouperInitialBuckets bufferGrouperInitialBuckets,
temporaryStorage,
spillMapper,
false
) )
); );
} }
@ -83,9 +103,26 @@ public class ConcurrentGrouper<KeyType extends Comparable<KeyType>> implements G
throw new ISE("Grouper is closed"); throw new ISE("Grouper is closed");
} }
final Grouper<KeyType> grouper = groupers.get(grouperNumberForKeyHash(keyHash)); if (!spilling) {
synchronized (grouper) { final SpillingGrouper<KeyType> hashBasedGrouper = groupers.get(grouperNumberForKeyHash(keyHash));
return grouper.aggregate(key, keyHash);
synchronized (hashBasedGrouper) {
if (!spilling) {
if (hashBasedGrouper.aggregate(key, keyHash)) {
return true;
} else {
spilling = true;
}
}
}
}
// At this point we know spilling = true
final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
synchronized (tlGrouper) {
tlGrouper.setSpillingAllowed(true);
return tlGrouper.aggregate(key, keyHash);
} }
} }

View File

@ -389,7 +389,7 @@ outer:
public Grouper.KeyComparator comparator() public Grouper.KeyComparator comparator()
{ {
// No sorting, let mergeRunners handle that // No sorting, let mergeRunners handle that
return null; throw new UnsupportedOperationException();
} }
@Override @Override

View File

@ -76,7 +76,7 @@ public class RowBasedGrouperHelper
final AggregatorFactory[] aggregatorFactories final AggregatorFactory[] aggregatorFactories
) )
{ {
// concurrencyHint >= 1 for thread safe groupers, -1 for non-thread-safe // concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded
Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint"); Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint");
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
@ -94,24 +94,25 @@ public class RowBasedGrouperHelper
keySerdeFactory, keySerdeFactory,
columnSelectorFactory, columnSelectorFactory,
aggregatorFactories, aggregatorFactories,
temporaryStorage,
spillMapper,
querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets() querySpecificConfig.getBufferGrouperInitialBuckets(),
temporaryStorage,
spillMapper,
true
); );
} else { } else {
grouper = new ConcurrentGrouper<>( grouper = new ConcurrentGrouper<>(
buffer, buffer,
concurrencyHint, keySerdeFactory,
temporaryStorage, columnSelectorFactory,
spillMapper, aggregatorFactories,
querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxSize(),
querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets(), querySpecificConfig.getBufferGrouperInitialBuckets(),
keySerdeFactory, temporaryStorage,
columnSelectorFactory, spillMapper,
aggregatorFactories concurrencyHint
); );
} }

View File

@ -61,16 +61,19 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
private final List<File> files = Lists.newArrayList(); private final List<File> files = Lists.newArrayList();
private final List<Closeable> closeables = Lists.newArrayList(); private final List<Closeable> closeables = Lists.newArrayList();
private boolean spillingAllowed = false;
public SpillingGrouper( public SpillingGrouper(
final ByteBuffer buffer, final ByteBuffer buffer,
final KeySerdeFactory<KeyType> keySerdeFactory, final KeySerdeFactory<KeyType> keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory, final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories, final AggregatorFactory[] aggregatorFactories,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
final int bufferGrouperMaxSize, final int bufferGrouperMaxSize,
final float bufferGrouperMaxLoadFactor, final float bufferGrouperMaxLoadFactor,
final int bufferGrouperInitialBuckets final int bufferGrouperInitialBuckets,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
final boolean spillingAllowed
) )
{ {
this.keySerde = keySerdeFactory.factorize(); this.keySerde = keySerdeFactory.factorize();
@ -86,6 +89,7 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
this.aggregatorFactories = aggregatorFactories; this.aggregatorFactories = aggregatorFactories;
this.temporaryStorage = temporaryStorage; this.temporaryStorage = temporaryStorage;
this.spillMapper = spillMapper; this.spillMapper = spillMapper;
this.spillingAllowed = spillingAllowed;
} }
@Override @Override
@ -93,7 +97,7 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
{ {
if (grouper.aggregate(key, keyHash)) { if (grouper.aggregate(key, keyHash)) {
return true; return true;
} else { } else if (spillingAllowed) {
// Warning: this can potentially block up a processing thread for a while. // Warning: this can potentially block up a processing thread for a while.
try { try {
spill(); spill();
@ -105,6 +109,8 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
return grouper.aggregate(key, keyHash); return grouper.aggregate(key, keyHash);
} else {
return false;
} }
} }
@ -128,6 +134,11 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
deleteFiles(); deleteFiles();
} }
public void setSpillingAllowed(final boolean spillingAllowed)
{
this.spillingAllowed = spillingAllowed;
}
@Override @Override
public Iterator<Entry<KeyType>> iterator(final boolean sorted) public Iterator<Entry<KeyType>> iterator(final boolean sorted)
{ {