Fix concurrency defects, remove unnecessary volatiles (#3701)

This commit is contained in:
Roman Leventov 2016-11-22 18:42:28 -06:00 committed by Slim
parent 7d37f675ba
commit c070b4a816
16 changed files with 100 additions and 136 deletions

View File

@ -38,7 +38,7 @@ public class LifecycleScope implements Scope
private final Lifecycle.Stage stage;
private Lifecycle lifecycle;
private List<Object> instances = Lists.newLinkedList();
private final List<Object> instances = Lists.newLinkedList();
public LifecycleScope(Lifecycle.Stage stage)
{
@ -47,8 +47,8 @@ public class LifecycleScope implements Scope
public void setLifecycle(Lifecycle lifecycle)
{
this.lifecycle = lifecycle;
synchronized (instances) {
this.lifecycle = lifecycle;
for (Object instance : instances) {
lifecycle.addManagedInstance(instance);
}

View File

@ -169,7 +169,7 @@ public class ImmutableNode
{
return new Iterator<ImmutableNode>()
{
private volatile int count = 0;
private int count = 0;
@Override
public boolean hasNext()

View File

@ -39,7 +39,7 @@ public class RTree
private final SplitStrategy splitStrategy;
private final BitmapFactory bitmapFactory;
private Node root;
private volatile int size;
private int size;
public RTree(BitmapFactory bitmapFactory)
{

View File

@ -20,7 +20,6 @@
package io.druid.common.guava;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
@ -29,7 +28,6 @@ import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.java.util.common.guava.nary.BinaryFn;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ -41,7 +39,7 @@ public class CombiningSequence<T> implements Sequence<T>
BinaryFn<T, T, T> mergeFn
)
{
return new CombiningSequence<T>(baseSequence, ordering, mergeFn);
return new CombiningSequence<>(baseSequence, ordering, mergeFn);
}
private final Sequence<T> baseSequence;
@ -62,16 +60,19 @@ public class CombiningSequence<T> implements Sequence<T>
@Override
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> accumulator)
{
final AtomicReference<OutType> retVal = new AtomicReference<OutType>(initValue);
final CombiningAccumulator<OutType> combiningAccumulator = new CombiningAccumulator<OutType>(retVal, accumulator);
final CombiningAccumulator<OutType> combiningAccumulator = new CombiningAccumulator<>(initValue, accumulator);
T lastValue = baseSequence.accumulate(null, combiningAccumulator);
return combiningAccumulator.accumulatedSomething() ? accumulator.accumulate(retVal.get(), lastValue) : initValue;
if (combiningAccumulator.accumulatedSomething()) {
return accumulator.accumulate(combiningAccumulator.retVal, lastValue);
} else {
return initValue;
}
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAccumulator<OutType, T> accumulator)
{
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator = new CombiningYieldingAccumulator<OutType, T>(
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator = new CombiningYieldingAccumulator<>(
ordering, mergeFn, accumulator
);
@ -81,7 +82,7 @@ public class CombiningSequence<T> implements Sequence<T>
return makeYielder(baseYielder, combiningAccumulator, false);
}
public <OutType> Yielder<OutType> makeYielder(
private <OutType> Yielder<OutType> makeYielder(
final Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
@ -152,11 +153,11 @@ public class CombiningSequence<T> implements Sequence<T>
private final BinaryFn<T, T, T> mergeFn;
private final YieldingAccumulator<OutType, T> accumulator;
private volatile OutType retVal;
private volatile T lastMergedVal;
private volatile boolean accumulatedSomething = false;
private OutType retVal;
private T lastMergedVal;
private boolean accumulatedSomething = false;
public CombiningYieldingAccumulator(
CombiningYieldingAccumulator(
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn,
YieldingAccumulator<OutType, T> accumulator
@ -219,12 +220,12 @@ public class CombiningSequence<T> implements Sequence<T>
return t;
}
public void accumulateLastValue()
void accumulateLastValue()
{
retVal = accumulator.accumulate(retVal, lastMergedVal);
}
public boolean accumulatedSomething()
boolean accumulatedSomething()
{
return accumulatedSomething;
}
@ -232,18 +233,18 @@ public class CombiningSequence<T> implements Sequence<T>
private class CombiningAccumulator<OutType> implements Accumulator<T, T>
{
private final AtomicReference<OutType> retVal;
private OutType retVal;
private final Accumulator<OutType, T> accumulator;
private volatile boolean accumulatedSomething = false;
public CombiningAccumulator(AtomicReference<OutType> retVal, Accumulator<OutType, T> accumulator)
CombiningAccumulator(OutType retVal, Accumulator<OutType, T> accumulator)
{
this.retVal = retVal;
this.accumulator = accumulator;
}
public boolean accumulatedSomething()
boolean accumulatedSomething()
{
return accumulatedSomething;
}
@ -263,7 +264,7 @@ public class CombiningSequence<T> implements Sequence<T>
return mergeFn.apply(prevValue, t);
}
retVal.set(accumulator.accumulate(retVal.get(), prevValue));
retVal = accumulator.accumulate(retVal, prevValue);
return t;
}
}

View File

@ -213,8 +213,8 @@ public class HadoopDruidIndexerConfig
return retVal;
}
private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
private HadoopIngestionSpec schema;
private PathSpec pathSpec;
private final Map<Long, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran;

View File

@ -31,8 +31,8 @@ public class RetryPolicy
private final long maxNumRetries;
private final Duration maxRetryDelay;
private volatile Duration currRetryDelay;
private volatile int retryCount;
private Duration currRetryDelay;
private int retryCount;
public RetryPolicy(RetryPolicyConfig config)
{

View File

@ -67,6 +67,7 @@ import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -548,7 +549,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
if (terminated) {
log.info("Finished stopping in %,dms.", elapsed);
} else {
final Set<String> stillRunning = ImmutableSet.copyOf(tasks.keySet());
final Set<String> stillRunning;
synchronized (tasks) {
stillRunning = ImmutableSet.copyOf(tasks.keySet());
}
log.makeAlert("Failed to stop forked tasks")
.addData("stillRunning", stillRunning)
@ -670,6 +674,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
// Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
// occur while saving.
@GuardedBy("tasks")
private void saveRunningTasks()
{
final File restoreFile = getRestoreFile();

View File

@ -1,70 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.guava;
/**
* @deprecated this class uses expensive volatile counter inside, but it is not thread-safe. It is going to be removed
* in the future.
*/
@Deprecated
public class LimitedYieldingAccumulator<OutType, T> extends YieldingAccumulator<OutType, T>
{
private final int limit;
private final YieldingAccumulator<OutType, T> delegate;
private volatile int count = 0;
public LimitedYieldingAccumulator(
YieldingAccumulator<OutType, T> delegate, int limit
)
{
this.limit = limit;
this.delegate = delegate;
}
@Override
public void yield()
{
delegate.yield();
}
@Override
public boolean yielded()
{
return delegate.yielded();
}
@Override
public void reset()
{
delegate.reset();
}
@Override
public OutType accumulate(OutType accumulated, T in)
{
if (count < limit) {
count++;
return delegate.accumulate(accumulated, in);
}
return accumulated;
}
}

View File

@ -40,7 +40,7 @@ public class JavaScriptAggregator implements Aggregator
private final ObjectColumnSelector[] selectorList;
private final ScriptAggregator script;
private volatile double current;
private double current;
public JavaScriptAggregator(List<ObjectColumnSelector> selectorList, ScriptAggregator script)
{

View File

@ -1328,7 +1328,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
private final int rowCount;
private final boolean descending;
private volatile int currentOffset;
private int currentOffset;
NoFilterOffset(int currentOffset, int rowCount, boolean descending)
{

View File

@ -147,13 +147,17 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
public String getMinValue()
{
synchronized (lock) {
return minValue;
}
}
public String getMaxValue()
{
synchronized (lock) {
return maxValue;
}
}
public SortedDimensionDictionary sort()
{

View File

@ -502,11 +502,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return TYPE_MAP.get(singleVal.getClass());
}
public Map<String, DimensionDesc> getDimensionDescs()
{
return dimensionDescs;
}
public Map<String, ColumnCapabilitiesImpl> getColumnCapabilities()
{
return columnCapabilities;

View File

@ -53,7 +53,7 @@ import java.util.Map;
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
private Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(
final List<WindowedStorageAdapter> adapters,

View File

@ -23,16 +23,31 @@ import io.druid.common.utils.JodaUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(final Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new MessageTimeRejectionPolicy(windowMillis, windowPeriod);
}
return new RejectionPolicy()
private static class MessageTimeRejectionPolicy implements RejectionPolicy
{
private volatile long maxTimestamp = JodaUtils.MIN_INSTANT;
private static final AtomicLongFieldUpdater<MessageTimeRejectionPolicy> maxTimestampUpdater =
AtomicLongFieldUpdater.newUpdater(MessageTimeRejectionPolicy.class, "maxTimestamp");
private final long windowMillis;
private final Period windowPeriod;
private volatile long maxTimestamp;
public MessageTimeRejectionPolicy(long windowMillis, Period windowPeriod)
{
this.windowMillis = windowMillis;
this.windowPeriod = windowPeriod;
this.maxTimestamp = JodaUtils.MIN_INSTANT;
}
@Override
public DateTime getCurrMaxTime()
@ -43,17 +58,31 @@ public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
@Override
public boolean accept(long timestamp)
{
maxTimestamp = Math.max(maxTimestamp, timestamp);
long maxTimestamp = this.maxTimestamp;
if (timestamp > maxTimestamp) {
maxTimestamp = tryUpdateMaxTimestamp(timestamp);
}
return timestamp >= (maxTimestamp - windowMillis);
}
private long tryUpdateMaxTimestamp(long timestamp)
{
long currentMaxTimestamp;
do {
currentMaxTimestamp = maxTimestamp;
if (timestamp <= currentMaxTimestamp) {
return currentMaxTimestamp;
}
} while (!maxTimestampUpdater.compareAndSet(this, currentMaxTimestamp, timestamp));
return timestamp;
}
@Override
public String toString()
{
return String.format("messageTime-%s", windowPeriod);
}
};
}
}

View File

@ -30,7 +30,7 @@ public class BalancerSegmentHolder
private final DataSegment segment;
// This is a pretty fugly hard coding of the maximum lifetime
private volatile int lifetime = 15;
private int lifetime = 15;
public BalancerSegmentHolder(
ImmutableDruidServer fromServer,

View File

@ -22,7 +22,6 @@ package io.druid.server.log;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.lifecycle.LifecycleStart;
@ -49,8 +48,8 @@ public class FileRequestLogger implements RequestLogger
private final Object lock = new Object();
private volatile DateTime currentDay;
private volatile OutputStreamWriter fileWriter;
private DateTime currentDay;
private OutputStreamWriter fileWriter;
public FileRequestLogger(ObjectMapper objectMapper, ScheduledExecutorService exec, File baseDir)
{
@ -67,12 +66,14 @@ public class FileRequestLogger implements RequestLogger
MutableDateTime mutableDateTime = new DateTime().toMutableDateTime();
mutableDateTime.setMillisOfDay(0);
synchronized (lock) {
currentDay = mutableDateTime.toDateTime();
fileWriter = new OutputStreamWriter(
new FileOutputStream(new File(baseDir, currentDay.toString("yyyy-MM-dd'.log'")), true),
Charsets.UTF_8
);
}
long nextDay = currentDay.plusDays(1).getMillis();
Duration delay = new Duration(nextDay - new DateTime().getMillis());
@ -85,10 +86,9 @@ public class FileRequestLogger implements RequestLogger
@Override
public ScheduledExecutors.Signal call()
{
currentDay = currentDay.plusDays(1);
try {
synchronized (lock) {
currentDay = currentDay.plusDays(1);
CloseQuietly.close(fileWriter);
fileWriter = new OutputStreamWriter(
new FileOutputStream(new File(baseDir, currentDay.toString()), true),