NIFI-2651: Ensure that when we disable transmission on an RPG that we interrupt any transactions in progress for http-based site-to-site

This closes #937.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mark Payne 2016-08-24 21:06:30 -04:00 committed by Koji Kawamura
parent c2bfc4ef24
commit 8536ad65f4
5 changed files with 43 additions and 5 deletions

View File

@ -99,7 +99,7 @@ public abstract class AbstractTransaction implements Transaction {
}
@Override
public Communicant getCommunicant() {
public Peer getCommunicant() {
return peer;
}

View File

@ -42,6 +42,8 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -55,6 +57,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
private final ScheduledExecutorService taskExecutor;
private final PeerSelector peerSelector;
private final Set<HttpClientTransaction> activeTransactions = Collections.synchronizedSet(new HashSet<>());
public HttpClient(final SiteToSiteClientConfig config) {
super(config);
@ -177,15 +180,26 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
// We found a valid peer to communicate with.
final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter());
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) {
@Override
protected void close() throws IOException {
try {
super.close();
} finally {
activeTransactions.remove(this);
}
}
};
transaction.initialize(apiClient, transactionUrl);
activeTransactions.add(transaction);
return transaction;
}
logger.info("Couldn't find a valid peer to communicate with.");
return null;
}
private String resolveNodeApiUrl(final PeerDescription description) {
@ -201,5 +215,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
public void close() throws IOException {
taskExecutor.shutdown();
peerSelector.clear();
for (final HttpClientTransaction transaction : activeTransactions) {
transaction.getCommunicant().getCommunicationsSession().interrupt();
}
}
}

View File

@ -74,6 +74,8 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession {
@Override
public void interrupt() {
input.interrupt();
output.interrupt();
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote.io.http;
import org.apache.nifi.remote.io.InterruptableInputStream;
import org.apache.nifi.remote.protocol.CommunicationsInput;
import org.apache.nifi.stream.io.ByteCountingInputStream;
@ -25,6 +26,7 @@ import java.io.InputStream;
public class HttpInput implements CommunicationsInput {
private ByteCountingInputStream countingIn;
private InterruptableInputStream interruptableIn;
@Override
public InputStream getInputStream() throws IOException {
@ -53,6 +55,13 @@ public class HttpInput implements CommunicationsInput {
}
public void setInputStream(InputStream inputStream) {
this.countingIn = new ByteCountingInputStream(inputStream);
interruptableIn = new InterruptableInputStream(inputStream);
this.countingIn = new ByteCountingInputStream(interruptableIn);
}
public void interrupt() {
if (interruptableIn != null) {
interruptableIn.interrupt();
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote.io.http;
import org.apache.nifi.remote.io.InterruptableOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsOutput;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
@ -25,6 +26,7 @@ import java.io.OutputStream;
public class HttpOutput implements CommunicationsOutput {
private ByteCountingOutputStream countingOut;
private InterruptableOutputStream interruptableOut;
@Override
public OutputStream getOutputStream() throws IOException {
@ -40,6 +42,13 @@ public class HttpOutput implements CommunicationsOutput {
}
public void setOutputStream(OutputStream outputStream) {
this.countingOut = new ByteCountingOutputStream(outputStream);
interruptableOut = new InterruptableOutputStream(outputStream);
this.countingOut = new ByteCountingOutputStream(interruptableOut);
}
public void interrupt() {
if (interruptableOut != null) {
interruptableOut.interrupt();
}
}
}