From bab430990538cd9c453fef15e3bac748e32e4708 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 4 Feb 2022 17:41:38 -0600 Subject: [PATCH] NIFI-9655 Add Queue Logging to ListenUDP (#5747) * NIFI-9655 Added Queue Logging to ListenUDP - Added TrackingLinkedBlockingQueue to track the largest queue size - Updated AbstractListenEventProcessor to write debug log at most once per minute - AbstractListenEventProcessor updates support both ListenUDP and ListenUDPRecord --- .../listen/AbstractListenEventProcessor.java | 27 ++++++- .../queue/TrackingLinkedBlockingQueue.java | 73 ++++++++++++++++++ .../TrackingLinkedBlockingQueueTest.java | 77 +++++++++++++++++++ 3 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueue.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueueTest.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 03334ba962..d91a585627 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -29,11 +29,13 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue; import org.apache.nifi.remote.io.socket.NetworkUtils; import java.io.IOException; import java.net.InetAddress; import java.nio.charset.Charset; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -42,6 +44,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; @@ -121,9 +124,13 @@ public abstract class AbstractListenEventProcessor extends Abst protected volatile int port; protected volatile Charset charset; protected volatile ChannelDispatcher dispatcher; - protected volatile BlockingQueue events; + protected volatile TrackingLinkedBlockingQueue events; protected volatile BlockingQueue errorEvents = new LinkedBlockingQueue<>(); + private static final long TRACKING_LOG_INTERVAL = 60000; + private final AtomicLong nextTrackingLog = new AtomicLong(); + private int eventsCapacity; + @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); @@ -174,7 +181,8 @@ public abstract class AbstractListenEventProcessor extends Abst public void onScheduled(final ProcessContext context) throws IOException { charset = Charset.forName(context.getProperty(CHARSET).getValue()); port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); - events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); + eventsCapacity = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger(); + events = new TrackingLinkedBlockingQueue<>(eventsCapacity); final String interfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); final InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(interfaceName); @@ -233,6 +241,7 @@ public abstract class AbstractListenEventProcessor extends Abst * @return an event from one of the queues, or null if none are available */ protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { + processTrackingLog(); E event = null; if (pollErrorQueue) { event = errorEvents.poll(); @@ -263,4 +272,18 @@ public abstract class AbstractListenEventProcessor extends Abst protected long getLongPollTimeout() { return POLL_TIMEOUT_MS; } + + private void processTrackingLog() { + final long now = Instant.now().toEpochMilli(); + if (now > nextTrackingLog.get()) { + getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]", + eventsCapacity, + events.remainingCapacity(), + events.size(), + events.getLargestSize() + ); + final long nextTrackingLogScheduled = now + TRACKING_LOG_INTERVAL; + nextTrackingLog.getAndSet(nextTrackingLogScheduled); + } + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueue.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueue.java new file mode 100644 index 0000000000..858b79ab52 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueue.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.queue; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracking extension of LinkedBlockingQueue to provide additional statistics + */ +public class TrackingLinkedBlockingQueue extends LinkedBlockingQueue { + private final AtomicInteger largestSize = new AtomicInteger(); + + public TrackingLinkedBlockingQueue() { + super(); + } + + public TrackingLinkedBlockingQueue(final int capacity) { + super(capacity); + } + + @Override + public boolean offer(final E element) { + final boolean success = super.offer(element); + if (success) { + updateLargestSize(); + } + return success; + } + + @Override + public boolean offer(final E element, final long timeout, final TimeUnit unit) throws InterruptedException { + final boolean success = super.offer(element, timeout, unit); + if (success) { + updateLargestSize(); + } + return success; + } + + @Override + public void put(final E element) throws InterruptedException { + super.put(element); + updateLargestSize(); + } + + /** + * Get the largest size recorded + * + * @return Current largest size + */ + public int getLargestSize() { + return largestSize.get(); + } + + private void updateLargestSize() { + largestSize.getAndAccumulate(size(), Math::max); + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueueTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueueTest.java new file mode 100644 index 0000000000..9a3e19a345 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/queue/TrackingLinkedBlockingQueueTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.queue; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TrackingLinkedBlockingQueueTest { + private static final String ELEMENT = String.class.getSimpleName(); + + private static final long OFFER_TIMEOUT = 1; + + private static final int INITIAL_CAPACITY = 1; + + @Test + public void testAddRemoveGetLargestSize() { + final TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>(INITIAL_CAPACITY); + + final boolean added = queue.add(ELEMENT); + assertTrue(added); + + final int largestSize = queue.getLargestSize(); + assertEquals(queue.size(), largestSize); + + final boolean removed = queue.remove(ELEMENT); + assertTrue(removed); + + assertEquals(largestSize, queue.getLargestSize()); + } + + @Test + public void testOfferGetLargestSize() { + final TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>(); + + final boolean success = queue.offer(ELEMENT); + assertTrue(success); + + assertEquals(queue.size(), queue.getLargestSize()); + } + + @Test + public void testOfferTimeoutGetLargestSize() throws InterruptedException { + final TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>(); + + final boolean success = queue.offer(ELEMENT, OFFER_TIMEOUT, TimeUnit.SECONDS); + assertTrue(success); + + assertEquals(queue.size(), queue.getLargestSize()); + } + + @Test + public void testPutGetLargestSize() throws InterruptedException { + final TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>(); + + queue.put(ELEMENT); + + assertEquals(queue.size(), queue.getLargestSize()); + } +}