NIFI-9654 Added Queue Logging to ListenTCP

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6116.
This commit is contained in:
exceptionfactory 2022-06-10 08:36:15 -05:00 committed by Pierre Villard
parent f91d877043
commit 554f648f00
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 25 additions and 2 deletions

View File

@ -48,6 +48,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
@ -57,6 +58,7 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -67,6 +69,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@ -135,10 +138,14 @@ public class ListenTCP extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
private static final long TRACKING_LOG_INTERVAL = 60000;
private final AtomicLong nextTrackingLog = new AtomicLong();
private int eventsCapacity;
protected List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
protected volatile int port;
protected volatile BlockingQueue<ByteArrayMessage> events;
protected volatile TrackingLinkedBlockingQueue<ByteArrayMessage> events;
protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
@ -177,7 +184,8 @@ public class ListenTCP extends AbstractProcessor {
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
eventsCapacity = context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger();
events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
errorEvents = new LinkedBlockingQueue<>();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
@ -209,6 +217,7 @@ public class ListenTCP extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
processTrackingLog();
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
@ -313,4 +322,18 @@ public class ListenTCP extends AbstractProcessor {
attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
}
}
private void processTrackingLog() {
final long now = Instant.now().toEpochMilli();
if (now > nextTrackingLog.get()) {
getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]",
eventsCapacity,
events.remainingCapacity(),
events.size(),
events.getLargestSize()
);
final long nextTrackingLogScheduled = now + TRACKING_LOG_INTERVAL;
nextTrackingLog.getAndSet(nextTrackingLogScheduled);
}
}
}