From 0cf515c9c0d58ae41218135a331ca09fe3bb4fec Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Tue, 16 Nov 2021 15:22:09 -0600 Subject: [PATCH] NIFI-9384 Corrected usage and generics in ListenTCP - Addressed compiler warnings in ListenTCP and EventBatcher - Adjusted ListenTCP property order to match previous version Signed-off-by: Nathan Gough This closes #5526. --- .../processor/util/listen/EventBatcher.java | 32 +++++++---------- .../nifi/processors/standard/ListenTCP.java | 35 +++++++++---------- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java index bcdb598d39..7a8fff2802 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java @@ -20,10 +20,7 @@ import org.apache.nifi.event.transport.message.ByteArrayMessage; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.OutputStreamCallback; -import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -34,11 +31,11 @@ public abstract class EventBatcher { public static final int POLL_TIMEOUT_MS = 20; - private volatile BlockingQueue events; - private volatile BlockingQueue errorEvents; + private final BlockingQueue events; + private final BlockingQueue errorEvents; private final ComponentLog logger; - public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) { + public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) { this.logger = logger; this.events = events; this.errorEvents = errorEvents; @@ -56,10 +53,10 @@ public abstract class EventBatcher { * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all * the batches will be <= batchSize */ - public Map getBatches(final ProcessSession session, final int totalBatchSize, + public Map> getBatches(final ProcessSession session, final int totalBatchSize, final byte[] messageDemarcatorBytes) { - final Map batches = new HashMap(); + final Map> batches = new HashMap<>(); for (int i = 0; i < totalBatchSize; i++) { final E event = getMessage(true, true, session); if (event == null) { @@ -67,11 +64,11 @@ public abstract class EventBatcher { } final String batchKey = getBatchKey(event); - FlowFileEventBatch batch = batches.get(batchKey); + FlowFileEventBatch batch = batches.get(batchKey); // if we don't have a batch for this key then create a new one if (batch == null) { - batch = new FlowFileEventBatch(session.create(), new ArrayList()); + batch = new FlowFileEventBatch<>(session.create(), new ArrayList<>()); batches.put(batchKey, batch); } @@ -82,15 +79,12 @@ public abstract class EventBatcher { final boolean writeDemarcator = (i > 0); try { final byte[] rawMessage = event.getMessage(); - FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - if (writeDemarcator) { - out.write(messageDemarcatorBytes); - } - - out.write(rawMessage); + FlowFile appendedFlowFile = session.append(batch.getFlowFile(), out -> { + if (writeDemarcator) { + out.write(messageDemarcatorBytes); } + + out.write(rawMessage); }); // update the FlowFile reference in the batch object @@ -99,7 +93,7 @@ public abstract class EventBatcher { } catch (final Exception e) { logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", e.getMessage(), e); - errorEvents.offer(event); + errorEvents.add(event); break; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index eb7f6687e7..e6f29c900a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -130,11 +130,12 @@ public class ListenTCP extends AbstractProcessor { protected volatile BlockingQueue errorEvents; protected volatile EventServer eventServer; protected volatile byte[] messageDemarcatorBytes; - protected volatile EventBatcher eventBatcher; + protected volatile EventBatcher eventBatcher; @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); + descriptors.add(ListenerProperties.NETWORK_INTF_NAME); descriptors.add(ListenerProperties.PORT); descriptors.add(ListenerProperties.RECV_BUFFER_SIZE); descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE); @@ -148,9 +149,8 @@ public class ListenTCP extends AbstractProcessor { descriptors.add(MAX_RECV_THREAD_POOL_SIZE); // Deprecated descriptors.add(POOL_RECV_BUFFERS); - descriptors.add(ListenerProperties.NETWORK_INTF_NAME); - descriptors.add(CLIENT_AUTH); descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(CLIENT_AUTH); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); @@ -163,14 +163,14 @@ public class ListenTCP extends AbstractProcessor { int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger(); int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); - InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface); + InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface); Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue()); port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger(); events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger()); errorEvents = new LinkedBlockingQueue<>(); final String msgDemarcator = getMessageDemarcator(context); messageDemarcatorBytes = msgDemarcator.getBytes(charset); - final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events); + final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), address, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); if (sslContextService != null) { @@ -183,23 +183,24 @@ public class ListenTCP extends AbstractProcessor { eventFactory.setSocketReceiveBuffer(bufferSize); eventFactory.setWorkerThreads(maxConnections); + eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier())); try { eventServer = eventFactory.getEventServer(); } catch (EventException e) { - getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port); + getLogger().error("Failed to bind to [{}:{}]", address, port, e); } } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger(); - Map batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes); + Map> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes); processEvents(session, batches); } - private void processEvents(final ProcessSession session, final Map batches) { - for (Map.Entry entry : batches.entrySet()) { + private void processEvents(final ProcessSession session, final Map> batches) { + for (Map.Entry> entry : batches.entrySet()) { FlowFile flowFile = entry.getValue().getFlowFile(); final List events = entry.getValue().getEvents(); @@ -245,7 +246,7 @@ public class ListenTCP extends AbstractProcessor { return results; } - protected Map getAttributes(final FlowFileEventBatch batch) { + protected Map getAttributes(final FlowFileEventBatch batch) { final List events = batch.getEvents(); final String sender = events.get(0).getSender(); final Map attributes = new HashMap<>(3); @@ -254,13 +255,11 @@ public class ListenTCP extends AbstractProcessor { return attributes; } - protected String getTransitUri(FlowFileEventBatch batch) { + protected String getTransitUri(final FlowFileEventBatch batch) { final List events = batch.getEvents(); final String sender = events.get(0).getSender(); final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; - final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":") - .append(port).toString(); - return transitUri; + return String.format("tcp://%s:%d", senderHost, port); } @Override @@ -279,17 +278,15 @@ public class ListenTCP extends AbstractProcessor { .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); } - private EventBatcher getEventBatcher() { - if (eventBatcher != null) { - return eventBatcher; - } else { + private EventBatcher getEventBatcher() { + if (eventBatcher == null) { eventBatcher = new EventBatcher(getLogger(), events, errorEvents) { @Override protected String getBatchKey(ByteArrayMessage event) { return event.getSender(); } }; - return eventBatcher; } + return eventBatcher; } } \ No newline at end of file