From c79473baf5a8bb3dc054b13c57eefef1a4088d4c Mon Sep 17 00:00:00 2001 From: Tamas Palfy Date: Thu, 26 Mar 2020 15:28:03 +0100 Subject: [PATCH] NIFI-7286 ListenTCPRecord cleanup changed from @OnStopped to @OnUnscheduled --- .../nifi/processors/standard/ListenTCPRecord.java | 12 ++++++++---- .../processors/standard/TestListenTCPRecord.java | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index c6b6af9ab1..6eaf7286e3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -25,7 +25,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -300,8 +300,8 @@ public class ListenTCPRecord extends AbstractProcessor { readerThread.start(); } - @OnStopped - public void onStopped() { + @OnUnscheduled + public void onUnscheduled() { if (dispatcher != null) { dispatcher.close(); dispatcher = null; @@ -309,7 +309,11 @@ public class ListenTCPRecord extends AbstractProcessor { SocketChannelRecordReader socketRecordReader; while ((socketRecordReader = socketReaders.poll()) != null) { - IOUtils.closeQuietly(socketRecordReader.getRecordReader()); + try { + socketRecordReader.close(); + } catch (Exception e) { + getLogger().error("Couldn't close " + socketRecordReader, e); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java index 10e77c2136..69dc5d0a59 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java @@ -258,7 +258,7 @@ public class TestListenTCPRecord { runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred); } finally { // unschedule to close connections - proc.onStopped(); + proc.onUnscheduled(); IOUtils.closeQuietly(sender); } }