NIFI-7286 ListenTCPRecord cleanup changed from @OnStopped to @OnUnscheduled

This commit is contained in:
Tamas Palfy 2020-03-26 15:28:03 +01:00 committed by Bryan Bende
parent 483f23a8aa
commit c79473baf5
2 changed files with 9 additions and 5 deletions

View File

@ -25,7 +25,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -300,8 +300,8 @@ public class ListenTCPRecord extends AbstractProcessor {
readerThread.start(); readerThread.start();
} }
@OnStopped @OnUnscheduled
public void onStopped() { public void onUnscheduled() {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.close(); dispatcher.close();
dispatcher = null; dispatcher = null;
@ -309,7 +309,11 @@ public class ListenTCPRecord extends AbstractProcessor {
SocketChannelRecordReader socketRecordReader; SocketChannelRecordReader socketRecordReader;
while ((socketRecordReader = socketReaders.poll()) != null) { while ((socketRecordReader = socketReaders.poll()) != null) {
IOUtils.closeQuietly(socketRecordReader.getRecordReader()); try {
socketRecordReader.close();
} catch (Exception e) {
getLogger().error("Couldn't close " + socketRecordReader, e);
}
} }
} }

View File

@ -258,7 +258,7 @@ public class TestListenTCPRecord {
runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred); runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
} finally { } finally {
// unschedule to close connections // unschedule to close connections
proc.onStopped(); proc.onUnscheduled();
IOUtils.closeQuietly(sender); IOUtils.closeQuietly(sender);
} }
} }