From c070b4a816c7c0a5fbdd755a7c801eede01a7b03 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Tue, 22 Nov 2016 18:42:28 -0600 Subject: [PATCH] Fix concurrency defects, remove unnecessary volatiles (#3701) --- .../java/io/druid/guice/LifecycleScope.java | 4 +- .../collections/spatial/ImmutableNode.java | 2 +- .../io/druid/collections/spatial/RTree.java | 2 +- .../druid/common/guava/CombiningSequence.java | 37 +++++----- .../indexer/HadoopDruidIndexerConfig.java | 4 +- .../io/druid/indexing/common/RetryPolicy.java | 4 +- .../indexing/overlord/ForkingTaskRunner.java | 7 +- .../guava/LimitedYieldingAccumulator.java | 70 ------------------- .../aggregation/JavaScriptAggregator.java | 2 +- .../segment/QueryableIndexStorageAdapter.java | 2 +- .../druid/segment/StringDimensionIndexer.java | 8 ++- .../segment/incremental/IncrementalIndex.java | 5 -- .../firehose/IngestSegmentFirehose.java | 2 +- .../MessageTimeRejectionPolicyFactory.java | 65 ++++++++++++----- .../coordinator/BalancerSegmentHolder.java | 2 +- .../druid/server/log/FileRequestLogger.java | 20 +++--- 16 files changed, 100 insertions(+), 136 deletions(-) delete mode 100644 java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java diff --git a/api/src/main/java/io/druid/guice/LifecycleScope.java b/api/src/main/java/io/druid/guice/LifecycleScope.java index 5e83a706464..0d2b3481acc 100644 --- a/api/src/main/java/io/druid/guice/LifecycleScope.java +++ b/api/src/main/java/io/druid/guice/LifecycleScope.java @@ -38,7 +38,7 @@ public class LifecycleScope implements Scope private final Lifecycle.Stage stage; private Lifecycle lifecycle; - private List instances = Lists.newLinkedList(); + private final List instances = Lists.newLinkedList(); public LifecycleScope(Lifecycle.Stage stage) { @@ -47,8 +47,8 @@ public class LifecycleScope implements Scope public void setLifecycle(Lifecycle lifecycle) { - this.lifecycle = lifecycle; synchronized (instances) { + this.lifecycle = lifecycle; for (Object instance : instances) { lifecycle.addManagedInstance(instance); } diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableNode.java b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableNode.java index 6f5cf938fff..e29e2002ad7 100755 --- a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableNode.java +++ b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableNode.java @@ -169,7 +169,7 @@ public class ImmutableNode { return new Iterator() { - private volatile int count = 0; + private int count = 0; @Override public boolean hasNext() diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/RTree.java b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/RTree.java index 68398aa69be..74ad9426802 100755 --- a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/RTree.java +++ b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/RTree.java @@ -39,7 +39,7 @@ public class RTree private final SplitStrategy splitStrategy; private final BitmapFactory bitmapFactory; private Node root; - private volatile int size; + private int size; public RTree(BitmapFactory bitmapFactory) { diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index 3695f104d31..e3ccc40453a 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -20,7 +20,6 @@ package io.druid.common.guava; import com.google.common.collect.Ordering; - import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Yielder; @@ -29,7 +28,6 @@ import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.guava.nary.BinaryFn; import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; /** */ @@ -41,7 +39,7 @@ public class CombiningSequence implements Sequence BinaryFn mergeFn ) { - return new CombiningSequence(baseSequence, ordering, mergeFn); + return new CombiningSequence<>(baseSequence, ordering, mergeFn); } private final Sequence baseSequence; @@ -62,16 +60,19 @@ public class CombiningSequence implements Sequence @Override public OutType accumulate(OutType initValue, final Accumulator accumulator) { - final AtomicReference retVal = new AtomicReference(initValue); - final CombiningAccumulator combiningAccumulator = new CombiningAccumulator(retVal, accumulator); + final CombiningAccumulator combiningAccumulator = new CombiningAccumulator<>(initValue, accumulator); T lastValue = baseSequence.accumulate(null, combiningAccumulator); - return combiningAccumulator.accumulatedSomething() ? accumulator.accumulate(retVal.get(), lastValue) : initValue; + if (combiningAccumulator.accumulatedSomething()) { + return accumulator.accumulate(combiningAccumulator.retVal, lastValue); + } else { + return initValue; + } } @Override public Yielder toYielder(OutType initValue, final YieldingAccumulator accumulator) { - final CombiningYieldingAccumulator combiningAccumulator = new CombiningYieldingAccumulator( + final CombiningYieldingAccumulator combiningAccumulator = new CombiningYieldingAccumulator<>( ordering, mergeFn, accumulator ); @@ -81,7 +82,7 @@ public class CombiningSequence implements Sequence return makeYielder(baseYielder, combiningAccumulator, false); } - public Yielder makeYielder( + private Yielder makeYielder( final Yielder yielder, final CombiningYieldingAccumulator combiningAccumulator, boolean finalValue @@ -152,11 +153,11 @@ public class CombiningSequence implements Sequence private final BinaryFn mergeFn; private final YieldingAccumulator accumulator; - private volatile OutType retVal; - private volatile T lastMergedVal; - private volatile boolean accumulatedSomething = false; + private OutType retVal; + private T lastMergedVal; + private boolean accumulatedSomething = false; - public CombiningYieldingAccumulator( + CombiningYieldingAccumulator( Ordering ordering, BinaryFn mergeFn, YieldingAccumulator accumulator @@ -219,12 +220,12 @@ public class CombiningSequence implements Sequence return t; } - public void accumulateLastValue() + void accumulateLastValue() { retVal = accumulator.accumulate(retVal, lastMergedVal); } - public boolean accumulatedSomething() + boolean accumulatedSomething() { return accumulatedSomething; } @@ -232,18 +233,18 @@ public class CombiningSequence implements Sequence private class CombiningAccumulator implements Accumulator { - private final AtomicReference retVal; + private OutType retVal; private final Accumulator accumulator; private volatile boolean accumulatedSomething = false; - public CombiningAccumulator(AtomicReference retVal, Accumulator accumulator) + CombiningAccumulator(OutType retVal, Accumulator accumulator) { this.retVal = retVal; this.accumulator = accumulator; } - public boolean accumulatedSomething() + boolean accumulatedSomething() { return accumulatedSomething; } @@ -263,7 +264,7 @@ public class CombiningSequence implements Sequence return mergeFn.apply(prevValue, t); } - retVal.set(accumulator.accumulate(retVal.get(), prevValue)); + retVal = accumulator.accumulate(retVal, prevValue); return t; } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 3b194d166e3..0d6ccf500fb 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -213,8 +213,8 @@ public class HadoopDruidIndexerConfig return retVal; } - private volatile HadoopIngestionSpec schema; - private volatile PathSpec pathSpec; + private HadoopIngestionSpec schema; + private PathSpec pathSpec; private final Map shardSpecLookups = Maps.newHashMap(); private final Map> hadoopShardSpecLookup = Maps.newHashMap(); private final QueryGranularity rollupGran; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/RetryPolicy.java b/indexing-service/src/main/java/io/druid/indexing/common/RetryPolicy.java index 86c32daf6ff..f336035700c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/RetryPolicy.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/RetryPolicy.java @@ -31,8 +31,8 @@ public class RetryPolicy private final long maxNumRetries; private final Duration maxRetryDelay; - private volatile Duration currRetryDelay; - private volatile int retryCount; + private Duration currRetryDelay; + private int retryCount; public RetryPolicy(RetryPolicyConfig config) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index c7ad0194bd7..32ff8309a4f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -67,6 +67,7 @@ import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -548,7 +549,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (terminated) { log.info("Finished stopping in %,dms.", elapsed); } else { - final Set stillRunning = ImmutableSet.copyOf(tasks.keySet()); + final Set stillRunning; + synchronized (tasks) { + stillRunning = ImmutableSet.copyOf(tasks.keySet()); + } log.makeAlert("Failed to stop forked tasks") .addData("stillRunning", stillRunning) @@ -670,6 +674,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that // occur while saving. + @GuardedBy("tasks") private void saveRunningTasks() { final File restoreFile = getRestoreFile(); diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java b/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java deleted file mode 100644 index 72ff3b31fa2..00000000000 --- a/java-util/src/main/java/io/druid/java/util/common/guava/LimitedYieldingAccumulator.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.guava; - -/** - * @deprecated this class uses expensive volatile counter inside, but it is not thread-safe. It is going to be removed - * in the future. - */ - -@Deprecated -public class LimitedYieldingAccumulator extends YieldingAccumulator -{ - private final int limit; - private final YieldingAccumulator delegate; - - private volatile int count = 0; - - public LimitedYieldingAccumulator( - YieldingAccumulator delegate, int limit - ) - { - this.limit = limit; - this.delegate = delegate; - } - - @Override - public void yield() - { - delegate.yield(); - } - - @Override - public boolean yielded() - { - return delegate.yielded(); - } - - @Override - public void reset() - { - delegate.reset(); - } - - @Override - public OutType accumulate(OutType accumulated, T in) - { - if (count < limit) { - count++; - return delegate.accumulate(accumulated, in); - } - return accumulated; - } -} diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java index 94ea621f4f9..5adb021363d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregator.java @@ -40,7 +40,7 @@ public class JavaScriptAggregator implements Aggregator private final ObjectColumnSelector[] selectorList; private final ScriptAggregator script; - private volatile double current; + private double current; public JavaScriptAggregator(List selectorList, ScriptAggregator script) { diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 38dbe383c61..9815620375b 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -1328,7 +1328,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter { private final int rowCount; private final boolean descending; - private volatile int currentOffset; + private int currentOffset; NoFilterOffset(int currentOffset, int rowCount, boolean descending) { diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index 5c94ed307fc..9cc84def028 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -147,12 +147,16 @@ public class StringDimensionIndexer implements DimensionIndexer implements Iterable, return TYPE_MAP.get(singleVal.getClass()); } - public Map getDimensionDescs() - { - return dimensionDescs; - } - public Map getColumnCapabilities() { return columnCapabilities; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index 0e9cf1f13da..cf923330b1c 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -53,7 +53,7 @@ import java.util.Map; public class IngestSegmentFirehose implements Firehose { - private volatile Yielder rowYielder; + private Yielder rowYielder; public IngestSegmentFirehose( final List adapters, diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/MessageTimeRejectionPolicyFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/MessageTimeRejectionPolicyFactory.java index ccde6bf6ffc..1f516db2c20 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/MessageTimeRejectionPolicyFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/MessageTimeRejectionPolicyFactory.java @@ -23,37 +23,66 @@ import io.druid.common.utils.JodaUtils; import org.joda.time.DateTime; import org.joda.time.Period; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory { @Override public RejectionPolicy create(final Period windowPeriod) { final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + return new MessageTimeRejectionPolicy(windowMillis, windowPeriod); + } - return new RejectionPolicy() + private static class MessageTimeRejectionPolicy implements RejectionPolicy + { + private static final AtomicLongFieldUpdater maxTimestampUpdater = + AtomicLongFieldUpdater.newUpdater(MessageTimeRejectionPolicy.class, "maxTimestamp"); + private final long windowMillis; + private final Period windowPeriod; + private volatile long maxTimestamp; + + public MessageTimeRejectionPolicy(long windowMillis, Period windowPeriod) { - private volatile long maxTimestamp = JodaUtils.MIN_INSTANT; + this.windowMillis = windowMillis; + this.windowPeriod = windowPeriod; + this.maxTimestamp = JodaUtils.MIN_INSTANT; + } - @Override - public DateTime getCurrMaxTime() - { - return new DateTime(maxTimestamp); + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(maxTimestamp); + } + + @Override + public boolean accept(long timestamp) + { + long maxTimestamp = this.maxTimestamp; + if (timestamp > maxTimestamp) { + maxTimestamp = tryUpdateMaxTimestamp(timestamp); } - @Override - public boolean accept(long timestamp) - { - maxTimestamp = Math.max(maxTimestamp, timestamp); + return timestamp >= (maxTimestamp - windowMillis); + } - return timestamp >= (maxTimestamp - windowMillis); - } + private long tryUpdateMaxTimestamp(long timestamp) + { + long currentMaxTimestamp; + do { + currentMaxTimestamp = maxTimestamp; + if (timestamp <= currentMaxTimestamp) { + return currentMaxTimestamp; + } + } while (!maxTimestampUpdater.compareAndSet(this, currentMaxTimestamp, timestamp)); + return timestamp; + } - @Override - public String toString() - { - return String.format("messageTime-%s", windowPeriod); - } - }; + @Override + public String toString() + { + return String.format("messageTime-%s", windowPeriod); + } } } diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerSegmentHolder.java b/server/src/main/java/io/druid/server/coordinator/BalancerSegmentHolder.java index 050ca617a2b..5b5d828e3f6 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerSegmentHolder.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerSegmentHolder.java @@ -30,7 +30,7 @@ public class BalancerSegmentHolder private final DataSegment segment; // This is a pretty fugly hard coding of the maximum lifetime - private volatile int lifetime = 15; + private int lifetime = 15; public BalancerSegmentHolder( ImmutableDruidServer fromServer, diff --git a/server/src/main/java/io/druid/server/log/FileRequestLogger.java b/server/src/main/java/io/druid/server/log/FileRequestLogger.java index 995a1a798cd..67780c77cbf 100644 --- a/server/src/main/java/io/druid/server/log/FileRequestLogger.java +++ b/server/src/main/java/io/druid/server/log/FileRequestLogger.java @@ -22,7 +22,6 @@ package io.druid.server.log; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.base.Throwables; - import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -49,8 +48,8 @@ public class FileRequestLogger implements RequestLogger private final Object lock = new Object(); - private volatile DateTime currentDay; - private volatile OutputStreamWriter fileWriter; + private DateTime currentDay; + private OutputStreamWriter fileWriter; public FileRequestLogger(ObjectMapper objectMapper, ScheduledExecutorService exec, File baseDir) { @@ -67,12 +66,14 @@ public class FileRequestLogger implements RequestLogger MutableDateTime mutableDateTime = new DateTime().toMutableDateTime(); mutableDateTime.setMillisOfDay(0); - currentDay = mutableDateTime.toDateTime(); + synchronized (lock) { + currentDay = mutableDateTime.toDateTime(); - fileWriter = new OutputStreamWriter( - new FileOutputStream(new File(baseDir, currentDay.toString("yyyy-MM-dd'.log'")), true), - Charsets.UTF_8 - ); + fileWriter = new OutputStreamWriter( + new FileOutputStream(new File(baseDir, currentDay.toString("yyyy-MM-dd'.log'")), true), + Charsets.UTF_8 + ); + } long nextDay = currentDay.plusDays(1).getMillis(); Duration delay = new Duration(nextDay - new DateTime().getMillis()); @@ -85,10 +86,9 @@ public class FileRequestLogger implements RequestLogger @Override public ScheduledExecutors.Signal call() { - currentDay = currentDay.plusDays(1); - try { synchronized (lock) { + currentDay = currentDay.plusDays(1); CloseQuietly.close(fileWriter); fileWriter = new OutputStreamWriter( new FileOutputStream(new File(baseDir, currentDay.toString()), true),