diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java index 0752fa161a..826cf00df0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java @@ -99,7 +99,7 @@ public abstract class AbstractTransaction implements Transaction { } @Override - public Communicant getCommunicant() { + public Peer getCommunicant() { return peer; } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 0f0d4a59de..8821edb604 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -42,6 +42,8 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -55,6 +57,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr private final ScheduledExecutorService taskExecutor; private final PeerSelector peerSelector; + private final Set activeTransactions = Collections.synchronizedSet(new HashSet<>()); public HttpClient(final SiteToSiteClientConfig config) { super(config); @@ -177,15 +180,26 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr // We found a valid peer to communicate with. final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion(); final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction, - config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()); + config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) { + + @Override + protected void close() throws IOException { + try { + super.close(); + } finally { + activeTransactions.remove(this); + } + } + }; + transaction.initialize(apiClient, transactionUrl); + activeTransactions.add(transaction); return transaction; } logger.info("Couldn't find a valid peer to communicate with."); return null; - } private String resolveNodeApiUrl(final PeerDescription description) { @@ -201,5 +215,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr public void close() throws IOException { taskExecutor.shutdown(); peerSelector.clear(); + + for (final HttpClientTransaction transaction : activeTransactions) { + transaction.getCommunicant().getCommunicationsSession().interrupt(); + } } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java index 868fb36c95..95b3314010 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java @@ -74,6 +74,8 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession { @Override public void interrupt() { + input.interrupt(); + output.interrupt(); } @Override diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java index 5048306688..77804cd4e5 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.remote.io.http; +import org.apache.nifi.remote.io.InterruptableInputStream; import org.apache.nifi.remote.protocol.CommunicationsInput; import org.apache.nifi.stream.io.ByteCountingInputStream; @@ -25,6 +26,7 @@ import java.io.InputStream; public class HttpInput implements CommunicationsInput { private ByteCountingInputStream countingIn; + private InterruptableInputStream interruptableIn; @Override public InputStream getInputStream() throws IOException { @@ -53,6 +55,13 @@ public class HttpInput implements CommunicationsInput { } public void setInputStream(InputStream inputStream) { - this.countingIn = new ByteCountingInputStream(inputStream); + interruptableIn = new InterruptableInputStream(inputStream); + this.countingIn = new ByteCountingInputStream(interruptableIn); + } + + public void interrupt() { + if (interruptableIn != null) { + interruptableIn.interrupt(); + } } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java index b78be18d06..54c105f293 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.remote.io.http; +import org.apache.nifi.remote.io.InterruptableOutputStream; import org.apache.nifi.remote.protocol.CommunicationsOutput; import org.apache.nifi.stream.io.ByteCountingOutputStream; @@ -25,6 +26,7 @@ import java.io.OutputStream; public class HttpOutput implements CommunicationsOutput { private ByteCountingOutputStream countingOut; + private InterruptableOutputStream interruptableOut; @Override public OutputStream getOutputStream() throws IOException { @@ -40,6 +42,13 @@ public class HttpOutput implements CommunicationsOutput { } public void setOutputStream(OutputStream outputStream) { - this.countingOut = new ByteCountingOutputStream(outputStream); + interruptableOut = new InterruptableOutputStream(outputStream); + this.countingOut = new ByteCountingOutputStream(interruptableOut); + } + + public void interrupt() { + if (interruptableOut != null) { + interruptableOut.interrupt(); + } } }