NIFI-9217 - avoid deadlock on cluster operation (#5390)

This commit is contained in:
greyp9 2021-09-15 15:40:33 -04:00 committed by GitHub
parent bb638b13aa
commit 4af3fac07a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 32 deletions

View File

@ -64,10 +64,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private volatile boolean stopped = true; private volatile boolean stopped = true;
private final Map<String, LeaderRole> leaderRoles = new HashMap<>(); private final ConcurrentMap<String, LeaderRole> leaderRoles = new ConcurrentHashMap<>();
private final Map<String, RegisteredRole> registeredRoles = new HashMap<>(); private final ConcurrentMap<String, RegisteredRole> registeredRoles = new ConcurrentHashMap<>();
private final Map<String, TimedBuffer<TimestampedLong>> leaderChanges = new HashMap<>(); private final ConcurrentMap<String, TimedBuffer<TimestampedLong>> leaderChanges = new ConcurrentHashMap<>();
private final TimedBuffer<TimestampedLongAggregation> pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess()); private final TimedBuffer<TimestampedLongAggregation> pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess());
private final ConcurrentMap<String, String> lastKnownLeader = new ConcurrentHashMap<>(); private final ConcurrentMap<String, String> lastKnownLeader = new ConcurrentHashMap<>();
@ -221,16 +221,16 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]"; return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
} }
private synchronized LeaderRole getLeaderRole(final String roleName) { private LeaderRole getLeaderRole(final String roleName) {
return leaderRoles.get(roleName); return leaderRoles.get(roleName);
} }
private synchronized void onLeaderChanged(final String roleName) { private void onLeaderChanged(final String roleName) {
final TimedBuffer<TimestampedLong> buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess())); final TimedBuffer<TimestampedLong> buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess()));
buffer.add(new TimestampedLong(1L)); buffer.add(new TimestampedLong(1L));
} }
public synchronized Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit unit) { public Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit unit) {
final Map<String, Integer> leadershipChangesPerRole = new HashMap<>(); final Map<String, Integer> leadershipChangesPerRole = new HashMap<>();
for (final Map.Entry<String, TimedBuffer<TimestampedLong>> entry : leaderChanges.entrySet()) { for (final Map.Entry<String, TimedBuffer<TimestampedLong>> entry : leaderChanges.entrySet()) {
@ -301,49 +301,58 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
return participantId; return participantId;
} }
private synchronized void registerPollTime(final long nanos) { private void registerPollTime(final long nanos) {
synchronized (pollTimes) {
pollTimes.add(TimestampedLongAggregation.newValue(nanos)); pollTimes.add(TimestampedLongAggregation.newValue(nanos));
} }
}
public synchronized long getAveragePollTime(final TimeUnit timeUnit) { public long getAveragePollTime(final TimeUnit timeUnit) {
final long averageNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null || aggregation.getCount() == 0) { if (aggregation == null || aggregation.getCount() == 0) {
return 0L; return 0L;
} }
averageNanos = aggregation.getSum() / aggregation.getCount();
final long averageNanos = aggregation.getSum() / aggregation.getCount(); }
return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS);
} }
public synchronized long getMinPollTime(final TimeUnit timeUnit) { public long getMinPollTime(final TimeUnit timeUnit) {
final long minNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) { if (aggregation == null) {
return 0L; return 0L;
} }
minNanos = aggregation.getMin();
final long minNanos = aggregation.getMin(); }
return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
} }
public synchronized long getMaxPollTime(final TimeUnit timeUnit) { public long getMaxPollTime(final TimeUnit timeUnit) {
final long maxNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) { if (aggregation == null) {
return 0L; return 0L;
} }
maxNanos = aggregation.getMax();
final long maxNanos = aggregation.getMax(); }
return timeUnit.convert(maxNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(maxNanos, TimeUnit.NANOSECONDS);
} }
@Override @Override
public synchronized long getPollCount() { public long getPollCount() {
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation(); final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) { if (aggregation == null) {
return 0L; return 0L;
} }
return aggregation.getCount(); return aggregation.getCount();
} }
}
/** /**
* Determines whether or not leader election has already begun for the role with the given name * Determines whether or not leader election has already begun for the role with the given name