Refactoring to use `CollectionUtils.mapValues` (#8059)

* doc updates and changes to use the CollectionUtils.mapValues utility method

* Add Structural Search patterns to intelliJ

* refactoring from PR comments

* put -> putIfAbsent

* do single key lookup
This commit is contained in:
Surekha 2019-07-17 23:02:22 -07:00 committed by Clint Wylie
parent 2d6d1c17a0
commit da16144495
12 changed files with 78 additions and 63 deletions

View File

@ -311,6 +311,28 @@
<constraint name="k" within="" contains="" /> <constraint name="k" within="" contains="" />
<constraint name="v" within="" contains="" /> <constraint name="v" within="" contains="" />
</searchConfiguration> </searchConfiguration>
<searchConfiguration name="Use CollectionUtils.mapValues(Map&lt;K,V&gt;, Function&lt;V,V2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($k$ -&gt; $k$.getKey(), $y$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="x" within="" contains="" />
<constraint name="y" within="" contains="" />
<constraint name="k" within="" contains="" />
<constraint name="__context__" target="true" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="Use CollectionUtils.mapValues(Map&lt;k,v&gt;, Function&lt;v,v2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap(Entry::getKey, $y$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="x" within="" contains="" />
<constraint name="y" within="" contains="" />
<constraint name="__context__" target="true" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="Use CollectionUtils.mapKeys(Map&lt;K,V&gt;, Function&lt;K,K2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, $v$ -&gt; $v$.getValue()))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="x" within="" contains="" />
<constraint name="y" within="" contains="" />
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="v" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="Use CollectionUtils.mapKeys(Map&lt;k,v&gt;, Function&lt;k,k2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, Map.Entry::getValue))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="x" within="" contains="" />
<constraint name="y" within="" contains="" />
<constraint name="__context__" target="true" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="Use collections constructors directly" text="Stream.of($x$).collect(Collectors.$m$())" recursive="true" caseInsensitive="true" type="JAVA"> <searchConfiguration name="Use collections constructors directly" text="Stream.of($x$).collect(Collectors.$m$())" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" /> <constraint name="__context__" target="true" within="" contains="" />
<constraint name="m" within="" contains="" /> <constraint name="m" within="" contains="" />

View File

@ -21,10 +21,12 @@ package org.apache.druid.utils;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.ISE;
import java.util.AbstractCollection; import java.util.AbstractCollection;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Spliterator; import java.util.Spliterator;
@ -85,6 +87,8 @@ public final class CollectionUtils
/** /**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper * Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function. * function.
* Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs
* in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view.
*/ */
public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper) public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{ {
@ -93,6 +97,25 @@ public final class CollectionUtils
return result; return result;
} }
/**
* Returns a transformed map from the given input map where the key is modified based on the given keyMapper
* function. This method fails if keys collide after applying the given keyMapper function and
* throws a IllegalStateException.
*
* @throws ISE if key collisions occur while applying specified keyMapper
*/
public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMapper)
{
final Map<K2, V> result = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, v) -> {
final K2 k2 = keyMapper.apply(k);
if (result.putIfAbsent(k2, v) != null) {
throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k);
}
});
return result;
}
private CollectionUtils() private CollectionUtils()
{ {
} }

View File

@ -39,6 +39,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer; import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -54,7 +55,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
* Kafka indexing task runner supporting incremental segments publishing * Kafka indexing task runner supporting incremental segments publishing
@ -163,12 +163,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
} }
if (doReset) { if (doReset) {
sendResetRequestAndWait(resetPartitions.entrySet() sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of(
.stream() streamPartition.topic(),
.collect(Collectors.toMap(x -> StreamPartition.of( streamPartition.partition()
x.getKey().topic(), )), taskToolbox);
x.getKey().partition()
), Map.Entry::getValue)), taskToolbox);
} else { } else {
log.warn("Retrying in %dms", task.getPollRetryMs()); log.warn("Retrying in %dms", task.getPollRetryMs());
pollRetryLock.lockInterruptibly(); pollRetryLock.lockInterruptibly();

View File

@ -266,6 +266,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
@Override @Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets) protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
{ {
return currentOffsets return currentOffsets

View File

@ -45,6 +45,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -621,12 +622,7 @@ public class TaskQueue
public Map<String, Long> getSuccessfulTaskCount() public Map<String, Long> getSuccessfulTaskCount()
{ {
Map<String, Long> total = totalSuccessfulTaskCount.entrySet() Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total; prevTotalSuccessfulTaskCount = total;
return delta; return delta;
@ -634,12 +630,7 @@ public class TaskQueue
public Map<String, Long> getFailedTaskCount() public Map<String, Long> getFailedTaskCount()
{ {
Map<String, Long> total = totalFailedTaskCount.entrySet() Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount); Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total; prevTotalFailedTaskCount = total;
return delta; return delta;

View File

@ -80,6 +80,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer; import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -1274,8 +1275,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
) )
throws IOException throws IOException
{ {
Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = outOfRangePartitions Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = CollectionUtils.mapKeys(
.entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue)); outOfRangePartitions,
StreamPartition::getPartitionId
);
boolean result = taskToolbox boolean result = taskToolbox
.getTaskActionClient() .getTaskActionClient()

View File

@ -35,13 +35,13 @@ import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
public class ExpressionPostAggregator implements PostAggregator public class ExpressionPostAggregator implements PostAggregator
{ {
@ -187,12 +187,7 @@ public class ExpressionPostAggregator implements PostAggregator
expression, expression,
ordering, ordering,
macroTable, macroTable,
aggregators.entrySet().stream().collect( CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)),
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry.getValue()::finalizeComputation
)
),
parsed, parsed,
dependentFields dependentFields
); );

View File

@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -689,14 +690,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
) )
) )
), ),
snapshot.entrySet() CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId),
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue().lastSegmentId
)
),
committer.getMetadata() committer.getMetadata()
); );

View File

@ -32,12 +32,11 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** /**
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of * This class is responsible for managing data sources and their states like timeline, total segment size, and number of
@ -115,8 +114,7 @@ public class SegmentManager
*/ */
public Map<String, Long> getDataSourceSizes() public Map<String, Long> getDataSourceSizes()
{ {
return dataSources.entrySet().stream() return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize()));
} }
/** /**
@ -127,8 +125,7 @@ public class SegmentManager
*/ */
public Map<String, Long> getDataSourceCounts() public Map<String, Long> getDataSourceCounts()
{ {
return dataSources.entrySet().stream() return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments()));
} }
public boolean isSegmentCached(final DataSegment segment) public boolean isSegmentCached(final DataSegment segment)

View File

@ -36,7 +36,6 @@ import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors;
/** /**
* Contains a representation of the current state of the cluster by tier. * Contains a representation of the current state of the cluster by tier.
@ -69,16 +68,13 @@ public class DruidCluster
) )
{ {
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
this.historicals = historicals this.historicals = CollectionUtils.mapValues(
.entrySet() historicals,
.stream() holders -> CollectionUtils.newTreeSet(
.collect( Comparator.reverseOrder(),
Collectors.toMap( holders
Map.Entry::getKey, )
(Map.Entry<String, Iterable<ServerHolder>> e) -> );
CollectionUtils.newTreeSet(Comparator.reverseOrder(), e.getValue())
)
);
} }
public void add(ServerHolder serverHolder) public void add(ServerHolder serverHolder)

View File

@ -21,11 +21,11 @@ package org.apache.druid.server.coordinator.cost;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
public class ClusterCostCache public class ClusterCostCache
{ {
@ -82,10 +82,7 @@ public class ClusterCostCache
public ClusterCostCache build() public ClusterCostCache build()
{ {
return new ClusterCostCache( return new ClusterCostCache(
serversCostCache CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
); );
} }
} }

View File

@ -21,10 +21,10 @@ package org.apache.druid.server.coordinator.cost;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
public class ServerCostCache public class ServerCostCache
{ {
@ -89,10 +89,7 @@ public class ServerCostCache
{ {
return new ServerCostCache( return new ServerCostCache(
allSegmentsCostCache.build(), allSegmentsCostCache.build(),
segmentsPerDataSource CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
); );
} }
} }