From 285516bedeafa0be90e5b835ab6e1d37d525b83e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 17 Oct 2016 11:39:12 -0500 Subject: [PATCH] Workaround non-thread-safe use of HLL aggregators. (#3578) Despite the non-thread-safety of HyperLogLogCollector, it is actually currently used by multiple threads during realtime indexing. HyperUniquesAggregator's "aggregate" and "get" methods can be called simultaneously by OnheapIncrementalIndex, since its "doAggregate" and "getMetricObjectValue" methods are not synchronized. This means that the optimization of HyperLogLogCollector.fold in #3314 (saving and restoring position rather than duplicating the storage buffer of the right-hand side) could cause corruption in the face of concurrent writes. This patch works around the issue by duplicating the storage buffer in "get" before returning a collector. The returned collector still shares data with the original one, but the situation is no worse than before #3314. In the future we may want to consider making a thread safe version of HLLC that avoids these kinds of problems in realtime indexing. But for now I thought it was best to do a small change that restored the old behavior. --- .../aggregation/hyperloglog/HyperLogLogCollector.java | 9 +++++++-- .../aggregation/hyperloglog/HyperUniquesAggregator.java | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java index 63a38efb644..84f1cf7a2bb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperLogLogCollector.java @@ -39,10 +39,15 @@ import java.nio.ByteBuffer; * } * * - * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that + * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that * only one thread is ever calling methods on it. * - * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior + * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior. + * + * Note that despite the non-thread-safety of this class, it is actually currently used by multiple threads during + * realtime indexing. HyperUniquesAggregator's "aggregate" and "get" methods can be called simultaneously by + * OnheapIncrementalIndex, since its "doAggregate" and "getMetricObjectValue" methods are not synchronized. So, watch + * out for that. */ public abstract class HyperLogLogCollector implements Comparable { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java index ca1f45fc432..41d7aeb9e29 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregator.java @@ -57,7 +57,8 @@ public class HyperUniquesAggregator implements Aggregator @Override public Object get() { - return collector; + // Workaround for OnheapIncrementalIndex's penchant for calling "aggregate" and "get" simultaneously. + return HyperLogLogCollector.makeCollector(collector.getStorageBuffer().duplicate()); } @Override