diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 99ac506a7d..5fe8153cc5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -64,10 +64,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { private volatile boolean stopped = true; - private final Map leaderRoles = new HashMap<>(); - private final Map registeredRoles = new HashMap<>(); + private final ConcurrentMap leaderRoles = new ConcurrentHashMap<>(); + private final ConcurrentMap registeredRoles = new ConcurrentHashMap<>(); - private final Map> leaderChanges = new HashMap<>(); + private final ConcurrentMap> leaderChanges = new ConcurrentHashMap<>(); private final TimedBuffer pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess()); private final ConcurrentMap lastKnownLeader = new ConcurrentHashMap<>(); @@ -221,16 +221,16 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]"; } - private synchronized LeaderRole getLeaderRole(final String roleName) { + private LeaderRole getLeaderRole(final String roleName) { return leaderRoles.get(roleName); } - private synchronized void onLeaderChanged(final String roleName) { + private void onLeaderChanged(final String roleName) { final TimedBuffer buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess())); buffer.add(new TimestampedLong(1L)); } - public synchronized Map getLeadershipChangeCount(final long duration, final TimeUnit unit) { + public Map getLeadershipChangeCount(final long duration, final TimeUnit unit) { final Map leadershipChangesPerRole = new HashMap<>(); for (final Map.Entry> entry : leaderChanges.entrySet()) { @@ -301,48 +301,57 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return participantId; } - private synchronized void registerPollTime(final long nanos) { - pollTimes.add(TimestampedLongAggregation.newValue(nanos)); + private void registerPollTime(final long nanos) { + synchronized (pollTimes) { + pollTimes.add(TimestampedLongAggregation.newValue(nanos)); + } } - public synchronized long getAveragePollTime(final TimeUnit timeUnit) { - final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); - if (aggregation == null || aggregation.getCount() == 0) { - return 0L; + public long getAveragePollTime(final TimeUnit timeUnit) { + final long averageNanos; + synchronized (pollTimes) { + final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); + if (aggregation == null || aggregation.getCount() == 0) { + return 0L; + } + averageNanos = aggregation.getSum() / aggregation.getCount(); } - - final long averageNanos = aggregation.getSum() / aggregation.getCount(); return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS); } - public synchronized long getMinPollTime(final TimeUnit timeUnit) { - final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); - if (aggregation == null) { - return 0L; + public long getMinPollTime(final TimeUnit timeUnit) { + final long minNanos; + synchronized (pollTimes) { + final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); + if (aggregation == null) { + return 0L; + } + minNanos = aggregation.getMin(); } - - final long minNanos = aggregation.getMin(); return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS); } - public synchronized long getMaxPollTime(final TimeUnit timeUnit) { - final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); - if (aggregation == null) { - return 0L; + public long getMaxPollTime(final TimeUnit timeUnit) { + final long maxNanos; + synchronized (pollTimes) { + final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); + if (aggregation == null) { + return 0L; + } + maxNanos = aggregation.getMax(); } - - final long maxNanos = aggregation.getMax(); return timeUnit.convert(maxNanos, TimeUnit.NANOSECONDS); } @Override - public synchronized long getPollCount() { - final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); - if (aggregation == null) { - return 0L; + public long getPollCount() { + synchronized (pollTimes) { + final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); + if (aggregation == null) { + return 0L; + } + return aggregation.getCount(); } - - return aggregation.getCount(); } /**