NIFI-9384 Corrected usage and generics in ListenTCP

- Addressed compiler warnings in ListenTCP and EventBatcher
- Adjusted ListenTCP property order to match previous version

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5526.
This commit is contained in:
exceptionfactory 2021-11-16 15:22:09 -06:00 committed by Nathan Gough
parent 77f235bf1c
commit 0cf515c9c0
2 changed files with 29 additions and 38 deletions

View File

@ -20,10 +20,7 @@ import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@ -34,11 +31,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
public static final int POLL_TIMEOUT_MS = 20;
private volatile BlockingQueue<E> events;
private volatile BlockingQueue<E> errorEvents;
private final BlockingQueue<E> events;
private final BlockingQueue<E> errorEvents;
private final ComponentLog logger;
public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
public EventBatcher(final ComponentLog logger, final BlockingQueue<E> events, final BlockingQueue<E> errorEvents) {
this.logger = logger;
this.events = events;
this.errorEvents = errorEvents;
@ -56,10 +53,10 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
* @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
* the batches will be <= batchSize
*/
public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
public Map<String, FlowFileEventBatch<E>> getBatches(final ProcessSession session, final int totalBatchSize,
final byte[] messageDemarcatorBytes) {
final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
final Map<String, FlowFileEventBatch<E>> batches = new HashMap<>();
for (int i = 0; i < totalBatchSize; i++) {
final E event = getMessage(true, true, session);
if (event == null) {
@ -67,11 +64,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
}
final String batchKey = getBatchKey(event);
FlowFileEventBatch batch = batches.get(batchKey);
FlowFileEventBatch<E> batch = batches.get(batchKey);
// if we don't have a batch for this key then create a new one
if (batch == null) {
batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
batch = new FlowFileEventBatch<>(session.create(), new ArrayList<>());
batches.put(batchKey, batch);
}
@ -82,15 +79,12 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
final boolean writeDemarcator = (i > 0);
try {
final byte[] rawMessage = event.getMessage();
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
FlowFile appendedFlowFile = session.append(batch.getFlowFile(), out -> {
if (writeDemarcator) {
out.write(messageDemarcatorBytes);
}
out.write(rawMessage);
});
// update the FlowFile reference in the batch object
@ -99,7 +93,7 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
} catch (final Exception e) {
logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
e.getMessage(), e);
errorEvents.offer(event);
errorEvents.add(event);
break;
}
}

View File

@ -130,11 +130,12 @@ public class ListenTCP extends AbstractProcessor {
protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
protected volatile EventBatcher eventBatcher;
protected volatile EventBatcher<ByteArrayMessage> eventBatcher;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
descriptors.add(ListenerProperties.PORT);
descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
@ -148,9 +149,8 @@ public class ListenTCP extends AbstractProcessor {
descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
// Deprecated
descriptors.add(POOL_RECV_BUFFERS);
descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
descriptors.add(CLIENT_AUTH);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
@ -163,14 +163,14 @@ public class ListenTCP extends AbstractProcessor {
int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
errorEvents = new LinkedBlockingQueue<>();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), address, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
@ -183,23 +183,24 @@ public class ListenTCP extends AbstractProcessor {
eventFactory.setSocketReceiveBuffer(bufferSize);
eventFactory.setWorkerThreads(maxConnections);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
try {
eventServer = eventFactory.getEventServer();
} catch (EventException e) {
getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
getLogger().error("Failed to bind to [{}:{}]", address, port, e);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
Map<String, FlowFileEventBatch> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<ByteArrayMessage>> batches) {
for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
final List<ByteArrayMessage> events = entry.getValue().getEvents();
@ -245,7 +246,7 @@ public class ListenTCP extends AbstractProcessor {
return results;
}
protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
protected Map<String, String> getAttributes(final FlowFileEventBatch<ByteArrayMessage> batch) {
final List<ByteArrayMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
@ -254,13 +255,11 @@ public class ListenTCP extends AbstractProcessor {
return attributes;
}
protected String getTransitUri(FlowFileEventBatch batch) {
protected String getTransitUri(final FlowFileEventBatch<ByteArrayMessage> batch) {
final List<ByteArrayMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
return String.format("tcp://%s:%d", senderHost, port);
}
@Override
@ -279,17 +278,15 @@ public class ListenTCP extends AbstractProcessor {
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
private EventBatcher getEventBatcher() {
if (eventBatcher != null) {
return eventBatcher;
} else {
private EventBatcher<ByteArrayMessage> getEventBatcher() {
if (eventBatcher == null) {
eventBatcher = new EventBatcher<ByteArrayMessage>(getLogger(), events, errorEvents) {
@Override
protected String getBatchKey(ByteArrayMessage event) {
return event.getSender();
}
};
return eventBatcher;
}
return eventBatcher;
}
}