remove unnecessary lock in ForegroundCachePopulator leading to a lot of contention (#8116)

* remove unecessary lock in ForegroundCachePopulator leading to a lot of contention

* mutableboolean, javadocs,document some cache configs that were missing

* more doc stuff

* adjustments

* remove background documentation
This commit is contained in:
Clint Wylie 2019-07-23 10:57:59 -07:00 committed by Jihoon Son
parent 04a180a5fb
commit 83514958db
6 changed files with 77 additions and 45 deletions

View File

@ -1176,6 +1176,7 @@ You can optionally configure caching to be enabled on the peons by setting cachi
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.realtime.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache settings.
@ -1319,6 +1320,7 @@ You can optionally only configure caching to be enabled on the Historical by set
|`druid.historical.cache.useCache`|true, false|Enable the cache on the Historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the Historical.|false|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.historical.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache settings.
@ -1452,6 +1454,7 @@ You can optionally only configure caching to be enabled on the Broker by setting
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the Historicals|`Integer.MAX_VALUE`|
|`druid.broker.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache settings.

View File

@ -25,42 +25,44 @@ import org.apache.druid.segment.Segment;
import java.util.concurrent.ExecutorService;
/**
* An interface that defines the nitty gritty implementation detauls of a Query on a Segment
* An interface that defines the nitty gritty implementation details of a Query on a Segment
*/
@ExtensionPoint
public interface QueryRunnerFactory<T, QueryType extends Query<T>>
{
/**
* Given a specific segment, this method will create a QueryRunner.
* Given a specific segment, this method will create a {@link QueryRunner}.
*
* The QueryRunner, when asked, will generate a Sequence of results based on the given segment. This
* is the meat of the query processing and is where the results are actually generated. Everything else
* is just merging and reduction logic.
* The {@link QueryRunner}, when asked, will generate a {@link org.apache.druid.java.util.common.guava.Sequence} of
* results based on the given segment. This is the meat of the {@link Query} processing and is where the results are
* actually generated. Everything else is just merging and reduction logic.
*
* @param segment The segment to process
* @return A QueryRunner that, when asked, will generate a Sequence of results based on the given segment
* @param segment The segment to process
* @return A {@link QueryRunner} that, when asked, will generate a
* {@link org.apache.druid.java.util.common.guava.Sequence} of results based on the given segment
*/
QueryRunner<T> createRunner(Segment segment);
/**
* Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
* along to this method with an ExecutorService. The method should then return a QueryRunner that, when
* asked, will use the ExecutorService to run the base QueryRunners in some fashion.
* along to this method with an {@link ExecutorService}. The method should then return a {@link QueryRunner} that,
* when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion.
*
* The vast majority of the time, this should be implemented with
* The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}:
*
* return new ChainedExecutionQueryRunner<>(queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners);
*
* Which will allow for parallel execution up to the maximum number of processing threads allowed.
*
* @param queryExecutor ExecutorService to be used for parallel processing
* @param queryRunners Individual QueryRunner objects that produce some results
* @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners
* @param queryExecutor {@link ExecutorService} to be used for parallel processing
* @param queryRunners Individual {@link QueryRunner} objects that produce some results
* @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
* {@link QueryRunner} collection.
*/
QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);
/**
* Provides access to the toolchest for this specific query type.
* Provides access to the {@link QueryToolChest} for this specific {@link Query} type.
*
* @return an instance of the toolchest for this specific query type.
*/

View File

@ -38,6 +38,10 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
/**
* {@link CachePopulator} implementation that uses a {@link ExecutorService} thread pool to populate a cache in the
* background. Used if config "druid.*.cache.numBackgroundThreads" is greater than 0.
*/
public class BackgroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(BackgroundCachePopulator.class);

View File

@ -23,6 +23,25 @@ import org.apache.druid.java.util.common.guava.Sequence;
import java.util.function.Function;
/**
* Abstraction of mechanism for populating a {@link Cache} by wrapping a {@link Sequence} and providing a function to
* extract the values from it. At runtime, the {@link CachePopulator} implementation is used as a singleton and
* injected where needed to share between all cacheables, which requires the {@link Cache} itself to be thread-safe.
*
* Consumers of the {@link Sequence} will either be a processing thread (in the case of a historical or task), or
* an http thread in the case of a broker. See:
*
* historicals: {@link org.apache.druid.server.coordination.ServerManager} and
* {@link org.apache.druid.client.CachingQueryRunner}
*
* realtime tasks: {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} and
* {@link org.apache.druid.client.CachingQueryRunner}
*
* brokers: {@link org.apache.druid.server.ClientQuerySegmentWalker} and
* {@link org.apache.druid.client.CachingClusteredClient}
*
* for additional details
*/
public interface CachePopulator
{
<T, CacheType> Sequence<T> wrap(

View File

@ -22,6 +22,11 @@ package org.apache.druid.client.cache;
import java.util.concurrent.atomic.AtomicLong;
/**
* Thread safe collector of {@link CachePopulator} statistics, utilized {@link CacheMonitor} to emit cache metrics.
* Like the {@link CachePopulator}, this is used as a singleton.
*
* See {@link org.apache.druid.guice.DruidProcessingModule#getCachePopulator} which supplies either
* {@link ForegroundCachePopulator} or {@link BackgroundCachePopulator}, as configured, for more details.
*/
public class CachePopulatorStats
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
@ -29,14 +30,16 @@ import org.apache.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* {@link CachePopulator} implementation that populates a cache on the same thread that is processing the
* {@link Sequence}. Used if config "druid.*.cache.numBackgroundThreads" is 0 (the default).
*/
public class ForegroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(ForegroundCachePopulator.class);
private final Object lock = new Object();
private final ObjectMapper objectMapper;
private final CachePopulatorStats cachePopulatorStats;
private final long maxEntrySize;
@ -61,7 +64,7 @@ public class ForegroundCachePopulator implements CachePopulator
)
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final AtomicBoolean tooBig = new AtomicBoolean(false);
final MutableBoolean tooBig = new MutableBoolean(false);
final JsonGenerator jsonGenerator;
try {
@ -75,21 +78,19 @@ public class ForegroundCachePopulator implements CachePopulator
Sequences.map(
sequence,
input -> {
if (!tooBig.get()) {
synchronized (lock) {
try {
jsonGenerator.writeObject(cacheFn.apply(input));
if (!tooBig.isTrue()) {
try {
jsonGenerator.writeObject(cacheFn.apply(input));
// Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste cycles flushing.
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
tooBig.set(true);
}
}
catch (IOException e) {
throw new RuntimeException(e);
// Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste cycles flushing.
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
tooBig.setValue(true);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return input;
@ -100,24 +101,22 @@ public class ForegroundCachePopulator implements CachePopulator
@Override
public void after(final boolean isDone, final Throwable thrown) throws Exception
{
synchronized (lock) {
jsonGenerator.close();
jsonGenerator.close();
if (isDone) {
// Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
cachePopulatorStats.incrementOversized();
return;
}
if (isDone) {
// Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
if (tooBig.isTrue() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
cachePopulatorStats.incrementOversized();
return;
}
try {
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Unable to write to cache");
cachePopulatorStats.incrementError();
}
try {
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Unable to write to cache");
cachePopulatorStats.incrementError();
}
}
}