diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java index e5da2520d5..8dec493f64 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java @@ -217,7 +217,7 @@ public class FormatUtils { durationLong = Math.round(durationVal); } else { // Try reducing the size of the units to make the input a long - List wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit); + List wholeResults = makeWholeNumberTime(durationVal, specifiedTimeUnit); durationLong = (long) wholeResults.get(0); specifiedTimeUnit = (TimeUnit) wholeResults.get(1); } @@ -247,7 +247,8 @@ public class FormatUtils { protected static List makeWholeNumberTime(double decimal, TimeUnit timeUnit) { // If the value is already a whole number, return it and the current time unit if (decimal == Math.rint(decimal)) { - return Arrays.asList(new Object[]{(long) decimal, timeUnit}); + final long rounded = Math.round(decimal); + return Arrays.asList(new Object[]{rounded, timeUnit}); } else if (TimeUnit.NANOSECONDS == timeUnit) { // The time unit is as small as possible if (decimal < 1.0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java index c8f51267ec..feff921299 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -57,7 +57,7 @@ public enum ProcessorStatusDescriptor { "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getInputCount())), + s -> (long) s.getInputCount()), OUTPUT_BYTES( "outputBytes", @@ -71,14 +71,14 @@ public enum ProcessorStatusDescriptor { "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getOutputCount())), + s -> (long) s.getOutputCount()), TASK_COUNT( "taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getInvocations())), + s -> (long) s.getInvocations()), TASK_MILLIS( "taskMillis", @@ -100,7 +100,7 @@ public enum ProcessorStatusDescriptor { "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getFlowFilesRemoved())), + s -> (long) s.getFlowFilesRemoved()), AVERAGE_LINEAGE_DURATION( "averageLineageDuration", @@ -108,27 +108,27 @@ public enum ProcessorStatusDescriptor { "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", Formatter.DURATION, s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS), - new ValueReducer() { - @Override - public Long reduce(final List values) { - long millis = 0L; - int count = 0; + new ValueReducer<>() { + @Override + public Long reduce(final List values) { + long millis = 0L; + long count = 0; - for (final StatusSnapshot snapshot : values) { - final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue(); - final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue(); - final long processed = removed + outputCount; + for (final StatusSnapshot snapshot : values) { + final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()); + final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()); + final long processed = removed + outputCount; - count += processed; + count += processed; - final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * processed; - millis += totalMillis; + final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()); + final long totalMillis = avgMillis * processed; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; } - - return count == 0 ? 0 : millis / count; - } - }, + }, true ), @@ -138,31 +138,31 @@ public enum ProcessorStatusDescriptor { "The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes", Formatter.COUNT, s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() / s.getInvocations(), - new ValueReducer() { - @Override - public Long reduce(final List values) { - long procNanos = 0L; - int invocations = 0; + new ValueReducer<>() { + @Override + public Long reduce(final List values) { + long procNanos = 0L; + int invocations = 0; - for (final StatusSnapshot snapshot : values) { - final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor()); - if (taskNanos != null) { - procNanos += taskNanos.longValue(); + for (final StatusSnapshot snapshot : values) { + final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor()); + if (taskNanos != null) { + procNanos += taskNanos; + } + + final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor()); + if (taskInvocations != null) { + invocations += taskInvocations.intValue(); + } } - final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor()); - if (taskInvocations != null) { - invocations += taskInvocations.intValue(); + if (invocations == 0) { + return 0L; } - } - if (invocations == 0) { - return 0L; + return procNanos / invocations; } - - return procNanos / invocations; - } - }, + }, true ); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java index 6b131d2055..c5c70c7798 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -57,25 +57,20 @@ public enum RemoteProcessGroupStatusDescriptor { "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, - s -> s.getReceivedContentSize().longValue() / 300L), + s -> s.getReceivedContentSize() / 300L), SENT_BYTES_PER_SECOND( "sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, - s -> s.getSentContentSize().longValue() / 300L), + s -> s.getSentContentSize() / 300L), TOTAL_BYTES_PER_SECOND("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, - new ValueMapper() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); - } - }), + status -> (status.getReceivedContentSize() + status.getSentContentSize()) / 300L), AVERAGE_LINEAGE_DURATION( "averageLineageDuration", @@ -83,24 +78,24 @@ public enum RemoteProcessGroupStatusDescriptor { "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", Formatter.DURATION, s -> s.getAverageLineageDuration(TimeUnit.MILLISECONDS), - new ValueReducer() { - @Override - public Long reduce(final List values) { - long millis = 0L; - int count = 0; + new ValueReducer<>() { + @Override + public Long reduce(final List values) { + long millis = 0L; + long count = 0; - for (final StatusSnapshot snapshot : values) { - final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue(); - count += sent; + for (final StatusSnapshot snapshot : values) { + final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()); + count += sent; - final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * sent; - millis += totalMillis; + final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()); + final long totalMillis = avgMillis * sent; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; } - - return count == 0 ? 0 : millis / count; - } - }); + }); private final MetricDescriptor descriptor; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index 2c9c9fdfcc..e18a1f4776 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -70,7 +70,7 @@ public class WaitNotifyProtocol { transient private AtomicCacheEntry cachedEntry; private Map counts = new HashMap<>(); private Map attributes = new HashMap<>(); - private int releasableCount = 0; + private long releasableCount = 0; public Map getCounts() { return counts; @@ -110,11 +110,11 @@ public class WaitNotifyProtocol { return count != null ? count : 0; } - public int getReleasableCount() { + public long getReleasableCount() { return releasableCount; } - public void setReleasableCount(int releasableCount) { + public void setReleasableCount(long releasableCount) { this.releasableCount = releasableCount; } @@ -155,7 +155,8 @@ public class WaitNotifyProtocol { } } - int releaseCount = Math.min(releasableCount, candidateSize); + // Convert to integer for list index sizing + final int releaseCount = Math.toIntExact(Math.min(releasableCount, candidateSize)); released.accept(candidates.subList(0, releaseCount)); waiting.accept(candidates.subList(releaseCount, candidateSize)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index 5429bf3f43..d68f6699e3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -44,8 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -56,8 +56,8 @@ public class TestWaitNotifyProtocol { private final ObjectMapper mapper = new ObjectMapper(); private AtomicDistributedMapCacheClient cache; - @SuppressWarnings("unchecked") - private final Answer successfulReplace = invocation -> { + + private final Answer successfulReplace = invocation -> { final AtomicCacheEntry entry = invocation.getArgument(0); cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getRevision().orElse(0L) + 1)); return true; @@ -83,11 +83,7 @@ public class TestWaitNotifyProtocol { final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final String signalId = "signal-id"; - try { - protocol.notify(signalId, "a", 1, null); - fail("Notify should fail after retrying few times."); - } catch (ConcurrentModificationException e) { - } + assertThrows(ConcurrentModificationException.class, () -> protocol.notify(signalId, "a", 1, null)); } @Test @@ -237,7 +233,7 @@ public class TestWaitNotifyProtocol { attributesSerializer.serialize(cachedAttributes, bos); final String signalId = "old-entry"; - cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0L)); + cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, bos.toString(StandardCharsets.UTF_8), 0L)); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final Signal signal = protocol.getSignal(signalId); @@ -248,12 +244,7 @@ public class TestWaitNotifyProtocol { assertEquals("value3", signal.getAttributes().get("key3")); cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0L)); - try { - protocol.getSignal(signalId); - fail("Should fail since cached value was not in expected format."); - } catch (DeserializationException e) { - } - + assertThrows(DeserializationException.class, () -> protocol.getSignal(signalId)); } @Test @@ -263,14 +254,11 @@ public class TestWaitNotifyProtocol { final List released = new ArrayList<>(); final List waiting = new ArrayList<>(); - // Test default name. - final String counterName = DEFAULT_COUNT_NAME; - final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { released.clear(); waiting.clear(); - signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, - r -> released.addAll(r), w -> waiting.addAll(w)); + signal.releaseCandidates(DEFAULT_COUNT_NAME, requiredCountForPass, releasableCandidatePerPass, candidates, + released::addAll, waiting::addAll); }; final Field releasableCount = Signal.class.getDeclaredField("releasableCount"); @@ -281,7 +269,7 @@ public class TestWaitNotifyProtocol { assertEquals(0, released.size()); assertEquals(10, waiting.size()); assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter is not enough yet. signal.getCounts().put(DEFAULT_COUNT_NAME, 1L); @@ -289,7 +277,7 @@ public class TestWaitNotifyProtocol { assertEquals(0, released.size()); assertEquals(10, waiting.size()); assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target. signal.getCounts().put(DEFAULT_COUNT_NAME, 3L); @@ -297,7 +285,7 @@ public class TestWaitNotifyProtocol { assertEquals(1, released.size()); assertEquals(9, waiting.size()); assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two candidates. signal.getCounts().put(DEFAULT_COUNT_NAME, 6L); @@ -305,7 +293,7 @@ public class TestWaitNotifyProtocol { assertEquals(2, released.size()); assertEquals(8, waiting.size()); assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two candidates, and reminder is 2. signal.getCounts().put(DEFAULT_COUNT_NAME, 11L); @@ -313,7 +301,7 @@ public class TestWaitNotifyProtocol { assertEquals(3, released.size()); // 11 / 3 = 3 assertEquals(7, waiting.size()); assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2 - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two pass count and each pass can release 2 candidates. signal.getCounts().put(DEFAULT_COUNT_NAME, 6L); @@ -321,7 +309,7 @@ public class TestWaitNotifyProtocol { assertEquals(4, released.size()); // (6 / 3) * 2 = 4 assertEquals(6, waiting.size()); assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0 - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // If there are counts more than enough to release current candidates, unused releasableCount should remain. signal.getCounts().put(DEFAULT_COUNT_NAME, 50L); @@ -329,11 +317,9 @@ public class TestWaitNotifyProtocol { assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10. assertEquals(0, waiting.size()); assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2. - assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22. - + assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22. } - @Test public void testReleaseCandidateTotal() throws Exception { final List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList()); @@ -348,7 +334,7 @@ public class TestWaitNotifyProtocol { released.clear(); waiting.clear(); signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates, - r -> released.addAll(r), w -> waiting.addAll(w)); + released::addAll, waiting::addAll); }; final String counterA = "counterA"; @@ -364,7 +350,7 @@ public class TestWaitNotifyProtocol { assertEquals(10, waiting.size()); assertEquals(0, signal.getCount(emptyCounterName)); assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter is not enough yet. signal.getCounts().put(counterA, 1L); @@ -374,7 +360,7 @@ public class TestWaitNotifyProtocol { assertEquals(10, waiting.size()); assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target. signal.getCounts().put(counterA, 1L); @@ -386,7 +372,7 @@ public class TestWaitNotifyProtocol { assertEquals(9, waiting.size()); assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two candidates. signal.getCounts().put(counterA, 1L); @@ -398,7 +384,7 @@ public class TestWaitNotifyProtocol { assertEquals(8, waiting.size()); assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two candidates, and reminder is 2. signal.getCounts().put(counterA, 3L); @@ -410,7 +396,7 @@ public class TestWaitNotifyProtocol { assertEquals(7, waiting.size()); assertEquals(2, signal.getCount(emptyCounterName)); assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // Counter reached the target for two pass count and each pass can release 2 candidates. signal.getCounts().put(counterA, 1L); @@ -422,7 +408,7 @@ public class TestWaitNotifyProtocol { assertEquals(6, waiting.size()); assertEquals(0, signal.getCount(emptyCounterName)); assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); - assertEquals(0, releasableCount.getInt(signal)); + assertEquals(0, releasableCount.getLong(signal)); // If there are counts more than enough to release current candidates, unused releasableCount should remain. signal.getCounts().put(counterA, 10L); @@ -434,7 +420,7 @@ public class TestWaitNotifyProtocol { assertEquals(0, waiting.size()); assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2. assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2. - assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22. + assertEquals(22, releasableCount.getLong(signal)); // 32 - 10 = 22. }