NIFI-9655 Add Queue Logging to ListenUDP (#5747)

* NIFI-9655 Added Queue Logging to ListenUDP

- Added TrackingLinkedBlockingQueue to track the largest queue size
- Updated AbstractListenEventProcessor to write debug log at most once per minute
- AbstractListenEventProcessor updates support both ListenUDP and ListenUDPRecord
This commit is contained in:
exceptionfactory 2022-02-04 17:41:38 -06:00 committed by GitHub
parent 230ed9c98d
commit bab4309905
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 2 deletions

View File

@ -29,11 +29,13 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -42,6 +44,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
@ -121,9 +124,13 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
protected volatile int port;
protected volatile Charset charset;
protected volatile ChannelDispatcher dispatcher;
protected volatile BlockingQueue<E> events;
protected volatile TrackingLinkedBlockingQueue<E> events;
protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>();
private static final long TRACKING_LOG_INTERVAL = 60000;
private final AtomicLong nextTrackingLog = new AtomicLong();
private int eventsCapacity;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -174,7 +181,8 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
public void onScheduled(final ProcessContext context) throws IOException {
charset = Charset.forName(context.getProperty(CHARSET).getValue());
port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
eventsCapacity = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
final String interfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(interfaceName);
@ -233,6 +241,7 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
* @return an event from one of the queues, or null if none are available
*/
protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
processTrackingLog();
E event = null;
if (pollErrorQueue) {
event = errorEvents.poll();
@ -263,4 +272,18 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
protected long getLongPollTimeout() {
return POLL_TIMEOUT_MS;
}
private void processTrackingLog() {
final long now = Instant.now().toEpochMilli();
if (now > nextTrackingLog.get()) {
getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]",
eventsCapacity,
events.remainingCapacity(),
events.size(),
events.getLargestSize()
);
final long nextTrackingLogScheduled = now + TRACKING_LOG_INTERVAL;
nextTrackingLog.getAndSet(nextTrackingLogScheduled);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Tracking extension of LinkedBlockingQueue to provide additional statistics
*/
public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private final AtomicInteger largestSize = new AtomicInteger();
public TrackingLinkedBlockingQueue() {
super();
}
public TrackingLinkedBlockingQueue(final int capacity) {
super(capacity);
}
@Override
public boolean offer(final E element) {
final boolean success = super.offer(element);
if (success) {
updateLargestSize();
}
return success;
}
@Override
public boolean offer(final E element, final long timeout, final TimeUnit unit) throws InterruptedException {
final boolean success = super.offer(element, timeout, unit);
if (success) {
updateLargestSize();
}
return success;
}
@Override
public void put(final E element) throws InterruptedException {
super.put(element);
updateLargestSize();
}
/**
* Get the largest size recorded
*
* @return Current largest size
*/
public int getLargestSize() {
return largestSize.get();
}
private void updateLargestSize() {
largestSize.getAndAccumulate(size(), Math::max);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.queue;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TrackingLinkedBlockingQueueTest {
private static final String ELEMENT = String.class.getSimpleName();
private static final long OFFER_TIMEOUT = 1;
private static final int INITIAL_CAPACITY = 1;
@Test
public void testAddRemoveGetLargestSize() {
final TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>(INITIAL_CAPACITY);
final boolean added = queue.add(ELEMENT);
assertTrue(added);
final int largestSize = queue.getLargestSize();
assertEquals(queue.size(), largestSize);
final boolean removed = queue.remove(ELEMENT);
assertTrue(removed);
assertEquals(largestSize, queue.getLargestSize());
}
@Test
public void testOfferGetLargestSize() {
final TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>();
final boolean success = queue.offer(ELEMENT);
assertTrue(success);
assertEquals(queue.size(), queue.getLargestSize());
}
@Test
public void testOfferTimeoutGetLargestSize() throws InterruptedException {
final TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>();
final boolean success = queue.offer(ELEMENT, OFFER_TIMEOUT, TimeUnit.SECONDS);
assertTrue(success);
assertEquals(queue.size(), queue.getLargestSize());
}
@Test
public void testPutGetLargestSize() throws InterruptedException {
final TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>();
queue.put(ELEMENT);
assertEquals(queue.size(), queue.getLargestSize());
}
}