NIFI-12288 This closes #7950. Improved Long and Integer handling in Utilities

- Added explicit round in FormatUtils.makeWholeNumberTime()
- Removed unnecessary boxing in component descriptors
- Maintained long number tracking for releasable counts in Wait and Notify Processors

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2023-10-27 17:19:50 -05:00 committed by Joseph Witt
parent 211a3aa7c4
commit ae14738dea
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 89 additions and 106 deletions

View File

@ -217,7 +217,7 @@ public class FormatUtils {
durationLong = Math.round(durationVal);
} else {
// Try reducing the size of the units to make the input a long
List wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit);
List<?> wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit);
durationLong = (long) wholeResults.get(0);
specifiedTimeUnit = (TimeUnit) wholeResults.get(1);
}
@ -247,7 +247,8 @@ public class FormatUtils {
protected static List<Object> makeWholeNumberTime(double decimal, TimeUnit timeUnit) {
// If the value is already a whole number, return it and the current time unit
if (decimal == Math.rint(decimal)) {
return Arrays.asList(new Object[]{(long) decimal, timeUnit});
final long rounded = Math.round(decimal);
return Arrays.asList(new Object[]{rounded, timeUnit});
} else if (TimeUnit.NANOSECONDS == timeUnit) {
// The time unit is as small as possible
if (decimal < 1.0) {

View File

@ -57,7 +57,7 @@ public enum ProcessorStatusDescriptor {
"FlowFiles In (5 mins)",
"The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInputCount())),
s -> (long) s.getInputCount()),
OUTPUT_BYTES(
"outputBytes",
@ -71,14 +71,14 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Out (5 mins)",
"The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getOutputCount())),
s -> (long) s.getOutputCount()),
TASK_COUNT(
"taskCount",
"Tasks (5 mins)",
"The number of tasks that this Processor has completed in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInvocations())),
s -> (long) s.getInvocations()),
TASK_MILLIS(
"taskMillis",
@ -100,7 +100,7 @@ public enum ProcessorStatusDescriptor {
"FlowFiles Removed (5 mins)",
"The total number of FlowFiles removed by this Processor in the last 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getFlowFilesRemoved())),
s -> (long) s.getFlowFilesRemoved()),
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
@ -108,27 +108,27 @@ public enum ProcessorStatusDescriptor {
"The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
int count = 0;
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
long count = 0;
for (final StatusSnapshot snapshot : values) {
final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue();
final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue();
final long processed = removed + outputCount;
for (final StatusSnapshot snapshot : values) {
final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor());
final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor());
final long processed = removed + outputCount;
count += processed;
count += processed;
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * processed;
millis += totalMillis;
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
final long totalMillis = avgMillis * processed;
millis += totalMillis;
}
return count == 0 ? 0 : millis / count;
}
return count == 0 ? 0 : millis / count;
}
},
},
true
),
@ -138,31 +138,31 @@ public enum ProcessorStatusDescriptor {
"The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes",
Formatter.COUNT,
s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() / s.getInvocations(),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long procNanos = 0L;
int invocations = 0;
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long procNanos = 0L;
int invocations = 0;
for (final StatusSnapshot snapshot : values) {
final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
if (taskNanos != null) {
procNanos += taskNanos.longValue();
for (final StatusSnapshot snapshot : values) {
final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
if (taskNanos != null) {
procNanos += taskNanos;
}
final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
if (taskInvocations != null) {
invocations += taskInvocations.intValue();
}
}
final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
if (taskInvocations != null) {
invocations += taskInvocations.intValue();
if (invocations == 0) {
return 0L;
}
}
if (invocations == 0) {
return 0L;
return procNanos / invocations;
}
return procNanos / invocations;
}
},
},
true
);

View File

@ -57,25 +57,20 @@ public enum RemoteProcessGroupStatusDescriptor {
"Received Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getReceivedContentSize().longValue() / 300L),
s -> s.getReceivedContentSize() / 300L),
SENT_BYTES_PER_SECOND(
"sentBytesPerSecond",
"Sent Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getSentContentSize().longValue() / 300L),
s -> s.getSentContentSize() / 300L),
TOTAL_BYTES_PER_SECOND("totalBytesPerSecond",
"Total Bytes Per Second",
"The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
new ValueMapper<RemoteProcessGroupStatus>() {
@Override
public Long getValue(final RemoteProcessGroupStatus status) {
return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
}
}),
status -> (status.getReceivedContentSize() + status.getSentContentSize()) / 300L),
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
@ -83,24 +78,24 @@ public enum RemoteProcessGroupStatusDescriptor {
"The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
Formatter.DURATION,
s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
int count = 0;
new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long millis = 0L;
long count = 0;
for (final StatusSnapshot snapshot : values) {
final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue();
count += sent;
for (final StatusSnapshot snapshot : values) {
final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor());
count += sent;
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * sent;
millis += totalMillis;
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor());
final long totalMillis = avgMillis * sent;
millis += totalMillis;
}
return count == 0 ? 0 : millis / count;
}
return count == 0 ? 0 : millis / count;
}
});
});
private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;

View File

@ -70,7 +70,7 @@ public class WaitNotifyProtocol {
transient private AtomicCacheEntry<String, String, Object> cachedEntry;
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
private int releasableCount = 0;
private long releasableCount = 0;
public Map<String, Long> getCounts() {
return counts;
@ -110,11 +110,11 @@ public class WaitNotifyProtocol {
return count != null ? count : 0;
}
public int getReleasableCount() {
public long getReleasableCount() {
return releasableCount;
}
public void setReleasableCount(int releasableCount) {
public void setReleasableCount(long releasableCount) {
this.releasableCount = releasableCount;
}
@ -155,7 +155,8 @@ public class WaitNotifyProtocol {
}
}
int releaseCount = Math.min(releasableCount, candidateSize);
// Convert to integer for list index sizing
final int releaseCount = Math.toIntExact(Math.min(releasableCount, candidateSize));
released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));

View File

@ -44,8 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -56,8 +56,8 @@ public class TestWaitNotifyProtocol {
private final ObjectMapper mapper = new ObjectMapper();
private AtomicDistributedMapCacheClient<Long> cache;
@SuppressWarnings("unchecked")
private final Answer successfulReplace = invocation -> {
private final Answer<?> successfulReplace = invocation -> {
final AtomicCacheEntry<String, String, Long> entry = invocation.getArgument(0);
cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getRevision().orElse(0L) + 1));
return true;
@ -83,11 +83,7 @@ public class TestWaitNotifyProtocol {
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
try {
protocol.notify(signalId, "a", 1, null);
fail("Notify should fail after retrying few times.");
} catch (ConcurrentModificationException e) {
}
assertThrows(ConcurrentModificationException.class, () -> protocol.notify(signalId, "a", 1, null));
}
@Test
@ -237,7 +233,7 @@ public class TestWaitNotifyProtocol {
attributesSerializer.serialize(cachedAttributes, bos);
final String signalId = "old-entry";
cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0L));
cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, bos.toString(StandardCharsets.UTF_8), 0L));
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId);
@ -248,12 +244,7 @@ public class TestWaitNotifyProtocol {
assertEquals("value3", signal.getAttributes().get("key3"));
cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0L));
try {
protocol.getSignal(signalId);
fail("Should fail since cached value was not in expected format.");
} catch (DeserializationException e) {
}
assertThrows(DeserializationException.class, () -> protocol.getSignal(signalId));
}
@Test
@ -263,14 +254,11 @@ public class TestWaitNotifyProtocol {
final List<Integer> released = new ArrayList<>();
final List<Integer> waiting = new ArrayList<>();
// Test default name.
final String counterName = DEFAULT_COUNT_NAME;
final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
signal.releaseCandidates(DEFAULT_COUNT_NAME, requiredCountForPass, releasableCandidatePerPass, candidates,
released::addAll, waiting::addAll);
};
final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
@ -281,7 +269,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter is not enough yet.
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
@ -289,7 +277,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target.
signal.getCounts().put(DEFAULT_COUNT_NAME, 3L);
@ -297,7 +285,7 @@ public class TestWaitNotifyProtocol {
assertEquals(1, released.size());
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
@ -305,7 +293,7 @@ public class TestWaitNotifyProtocol {
assertEquals(2, released.size());
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(DEFAULT_COUNT_NAME, 11L);
@ -313,7 +301,7 @@ public class TestWaitNotifyProtocol {
assertEquals(3, released.size()); // 11 / 3 = 3
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two pass count and each pass can release 2 candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
@ -321,7 +309,7 @@ public class TestWaitNotifyProtocol {
assertEquals(4, released.size()); // (6 / 3) * 2 = 4
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// If there are counts more than enough to release current candidates, unused releasableCount should remain.
signal.getCounts().put(DEFAULT_COUNT_NAME, 50L);
@ -329,11 +317,9 @@ public class TestWaitNotifyProtocol {
assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2.
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22.
}
@Test
public void testReleaseCandidateTotal() throws Exception {
final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
@ -348,7 +334,7 @@ public class TestWaitNotifyProtocol {
released.clear();
waiting.clear();
signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
released::addAll, waiting::addAll);
};
final String counterA = "counterA";
@ -364,7 +350,7 @@ public class TestWaitNotifyProtocol {
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter is not enough yet.
signal.getCounts().put(counterA, 1L);
@ -374,7 +360,7 @@ public class TestWaitNotifyProtocol {
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target.
signal.getCounts().put(counterA, 1L);
@ -386,7 +372,7 @@ public class TestWaitNotifyProtocol {
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(counterA, 1L);
@ -398,7 +384,7 @@ public class TestWaitNotifyProtocol {
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(counterA, 3L);
@ -410,7 +396,7 @@ public class TestWaitNotifyProtocol {
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName));
assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// Counter reached the target for two pass count and each pass can release 2 candidates.
signal.getCounts().put(counterA, 1L);
@ -422,7 +408,7 @@ public class TestWaitNotifyProtocol {
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
assertEquals(0, releasableCount.getLong(signal));
// If there are counts more than enough to release current candidates, unused releasableCount should remain.
signal.getCounts().put(counterA, 10L);
@ -434,7 +420,7 @@ public class TestWaitNotifyProtocol {
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22.
}