NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling

- Refactor background transaction extension to ExtendTransactionCommand
- Avoid closing S2S HTTP client for IllegalStateExceptions
- Avoid creating additional S2S HTTP client instance for transaction extension commands
- Add check for extend transaction requests received in client test class
- Add null check for Peer Persistence implementation in PeerSelector

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5577.
This commit is contained in:
exceptionfactory 2021-12-06 13:24:40 -06:00 committed by Joe Gresock
parent 12015a17dd
commit 563df24067
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
6 changed files with 216 additions and 167 deletions

View File

@ -540,7 +540,9 @@ public class PeerSelector {
this.peerStatusCache = peerStatusCache; this.peerStatusCache = peerStatusCache;
// The #save mechanism persists the cache to stateful or file-based storage // The #save mechanism persists the cache to stateful or file-based storage
peerPersistence.save(peerStatusCache); if (peerPersistence != null) {
peerPersistence.save(peerStatusCache);
}
} catch (final IOException e) { } catch (final IOException e) {
error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" + error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" +
" and the nodes specified at the remote instance are down," + " and the nodes specified at the remote instance are down," +

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.util;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Site-to-Site Extend Transaction Command executes background requests for transfer transactions
*/
public class ExtendTransactionCommand implements Runnable {
private static final String CATEGORY = "Site-to-Site";
private static final Logger logger = LoggerFactory.getLogger(ExtendTransactionCommand.class);
private final SiteToSiteRestApiClient client;
private final String transactionUrl;
private final EventReporter eventReporter;
ExtendTransactionCommand(final SiteToSiteRestApiClient client, final String transactionUrl, final EventReporter eventReporter) {
this.client = client;
this.transactionUrl = transactionUrl;
this.eventReporter = eventReporter;
}
/**
* Run Command and attempt to extend transaction
*/
@Override
public void run() {
try {
final TransactionResultEntity entity = client.extendTransaction(transactionUrl);
logger.debug("Extend Transaction Completed [{}] Code [{}] FlowFiles Sent [{}]", transactionUrl, entity.getResponseCode(), entity.getFlowFileSent());
} catch (final Throwable e) {
if (e instanceof IllegalStateException) {
logger.debug("Extend Transaction Failed [{}] client connection pool shutdown", transactionUrl, e);
} else {
logger.warn("Extend Transaction Failed [{}]", transactionUrl, e);
final String message = String.format("Extend Transaction Failed [%s]: %s", transactionUrl, e.getMessage());
eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
try {
client.close();
} catch (final Exception closeException) {
logger.warn("Extend Transaction [{}] Close Client Failed", transactionUrl, closeException);
}
}
}
}
}

View File

@ -170,12 +170,11 @@ public class SiteToSiteRestApiClient implements Closeable {
private int batchCount = 0; private int batchCount = 0;
private long batchSize = 0; private long batchSize = 0;
private long batchDurationMillis = 0; private long batchDurationMillis = 0;
private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private final TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
private String trustedPeerDn; private String trustedPeerDn;
private final ScheduledExecutorService ttlExtendTaskExecutor; private final ScheduledExecutorService ttlExtendTaskExecutor;
private ScheduledFuture<?> ttlExtendingFuture; private ScheduledFuture<?> ttlExtendingFuture;
private SiteToSiteRestApiClient extendingApiClient;
private int connectTimeoutMillis; private int connectTimeoutMillis;
private int readTimeoutMillis; private int readTimeoutMillis;
@ -199,7 +198,7 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override @Override
public Thread newThread(final Runnable r) { public Thread newThread(final Runnable r) {
final Thread thread = defaultFactory.newThread(r); final Thread thread = defaultFactory.newThread(r);
thread.setName(Thread.currentThread().getName() + " TTLExtend"); thread.setName(Thread.currentThread().getName() + " Site-to-Site Extend Transactions");
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
@ -208,7 +207,7 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
stopExtendingTtl(); stopExtendingTransaction();
closeSilently(httpClient); closeSilently(httpClient);
closeSilently(httpAsyncClient); closeSilently(httpAsyncClient);
} }
@ -376,7 +375,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private ControllerDTO getController() throws IOException { private ControllerDTO getController() throws IOException {
// first check cache and prune any old values. // first check cache and prune any old values.
// Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed
// from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. // from the canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem.
if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) { if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) {
pruneCache(); pruneCache();
} }
@ -487,7 +486,7 @@ public class SiteToSiteRestApiClient implements Closeable {
if (transportProtocolVersionHeader == null) { if (transportProtocolVersionHeader == null) {
throw new ProtocolException("Server didn't return confirmed protocol version"); throw new ProtocolException("Server didn't return confirmed protocol version");
} }
final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue()); final int protocolVersionConfirmedByServer = Integer.parseInt(transportProtocolVersionHeader.getValue());
logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer); logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer); transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
@ -590,7 +589,7 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
@Override @Override
public HttpRequest generateRequest() throws IOException, HttpException { public HttpRequest generateRequest() {
final BasicHttpEntity entity = new BasicHttpEntity(); final BasicHttpEntity entity = new BasicHttpEntity();
post.setEntity(entity); post.setEntity(entity);
return post; return post;
@ -623,12 +622,12 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
@Override @Override
public void resetRequest() throws IOException { public void resetRequest() {
requestHasBeenReset = true; requestHasBeenReset = true;
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
}; };
@ -722,7 +721,7 @@ public class SiteToSiteRestApiClient implements Closeable {
if (r < 0) { if (r < 0) {
closed = true; closed = true;
logger.debug("Reached to end of input stream. Closing resources..."); logger.debug("Reached to end of input stream. Closing resources...");
stopExtendingTtl(); stopExtendingTransaction();
closeSilently(httpIn); closeSilently(httpIn);
closeSilently(response); closeSilently(response);
} }
@ -731,7 +730,7 @@ public class SiteToSiteRestApiClient implements Closeable {
}; };
((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture); ((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
startExtendingTtl(transactionUrl, httpIn, response); startExtendingTransaction(transactionUrl);
keepItOpen = true; keepItOpen = true;
return true; return true;
@ -784,7 +783,7 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
@Override @Override
public HttpRequest generateRequest() throws IOException, HttpException { public HttpRequest generateRequest() {
// Pass the output stream so that Site-to-Site client thread can send // Pass the output stream so that Site-to-Site client thread can send
// data packet through this connection. // data packet through this connection.
@ -888,17 +887,17 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
@Override @Override
public void resetRequest() throws IOException { public void resetRequest() {
logger.debug("Sending data request to {} has been reset...", flowFilesPath); logger.debug("Sending data request to {} has been reset...", flowFilesPath);
requestHasBeenReset = true; requestHasBeenReset = true;
} }
@Override @Override
public void close() throws IOException { public void close() {
logger.debug("Closing sending data request to {}", flowFilesPath); logger.debug("Closing sending data request to {}", flowFilesPath);
closeSilently(outputStream); closeSilently(outputStream);
closeSilently(dataPacketChannel); closeSilently(dataPacketChannel);
stopExtendingTtl(); stopExtendingTransaction();
} }
}; };
@ -912,7 +911,7 @@ public class SiteToSiteRestApiClient implements Closeable {
// Started. // Started.
transferDataLatch = new CountDownLatch(1); transferDataLatch = new CountDownLatch(1);
startExtendingTtl(transactionUrl, dataPacketChannel, null); startExtendingTransaction(transactionUrl);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
throw new IOException("Awaiting initConnectionLatch has been interrupted.", e); throw new IOException("Awaiting initConnectionLatch has been interrupted.", e);
@ -927,7 +926,7 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
// No more data can be sent. // No more data can be sent.
// Close PipedOutputStream so that dataPacketChannel doesn't blocked. // Close PipedOutputStream so that dataPacketChannel doesn't get blocked.
// If we don't close this output stream, then PipedInputStream loops infinitely at read(). // If we don't close this output stream, then PipedInputStream loops infinitely at read().
commSession.getOutput().getOutputStream().close(); commSession.getOutput().getOutputStream().close();
logger.debug("{} FinishTransferFlowFiles no more data can be sent", this); logger.debug("{} FinishTransferFlowFiles no more data can be sent", this);
@ -940,7 +939,7 @@ public class SiteToSiteRestApiClient implements Closeable {
throw new IOException("Awaiting transferDataLatch has been interrupted.", e); throw new IOException("Awaiting transferDataLatch has been interrupted.", e);
} }
stopExtendingTtl(); stopExtendingTransaction();
final HttpResponse response; final HttpResponse response;
try { try {
@ -968,37 +967,15 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
} }
private void startExtendingTtl(final String transactionUrl, final Closeable stream, final CloseableHttpResponse response) { private void startExtendingTransaction(final String transactionUrl) {
if (ttlExtendingFuture != null) { if (ttlExtendingFuture != null) {
// Already started.
return; return;
} }
logger.debug("Starting extending TTL thread...");
extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
extendingApiClient.localAddress = this.localAddress;
final int extendFrequency = serverTransactionTtl / 2; final int extendFrequency = serverTransactionTtl / 2;
logger.debug("Extend Transaction Started [{}] Frequency [{} seconds]", transactionUrl, extendFrequency);
ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { final Runnable command = new ExtendTransactionCommand(this, transactionUrl, eventReporter);
try { ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency, extendFrequency, TimeUnit.SECONDS);
extendingApiClient.extendTransaction(transactionUrl);
} catch (final Exception e) {
logger.warn("Failed to extend transaction ttl", e);
try {
// Without disconnecting, Site-to-Site client keep reading data packet,
// while server has already rollback.
this.close();
} catch (final IOException ec) {
logger.warn("Failed to close", e);
}
}
}, extendFrequency, extendFrequency, TimeUnit.SECONDS);
} }
private void closeSilently(final Closeable closeable) { private void closeSilently(final Closeable closeable) {
@ -1040,17 +1017,15 @@ public class SiteToSiteRestApiClient implements Closeable {
} }
private void stopExtendingTtl() { private void stopExtendingTransaction() {
if (!ttlExtendTaskExecutor.isShutdown()) { if (!ttlExtendTaskExecutor.isShutdown()) {
ttlExtendTaskExecutor.shutdown(); ttlExtendTaskExecutor.shutdown();
} }
if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) { if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) {
logger.debug("Cancelling extending ttl..."); final boolean cancelled = ttlExtendingFuture.cancel(true);
ttlExtendingFuture.cancel(true); logger.debug("Extend Transaction Cancelled [{}]", cancelled);
} }
closeSilently(extendingApiClient);
} }
private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException { private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException {
@ -1451,7 +1426,7 @@ public class SiteToSiteRestApiClient implements Closeable {
logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}", logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}",
transactionUrl, clientResponse, checksum); transactionUrl, clientResponse, checksum);
stopExtendingTtl(); stopExtendingTransaction();
final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode()); final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) { if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) {

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote.client.http;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
@ -62,10 +63,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.ExtendWith;
import org.littleshoot.proxy.HttpProxyServer; import org.littleshoot.proxy.HttpProxyServer;
import org.littleshoot.proxy.ProxyAuthenticator; import org.littleshoot.proxy.ProxyAuthenticator;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer; import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
import org.littleshoot.proxy.impl.ThreadPoolConfiguration; import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -82,6 +86,7 @@ import java.io.OutputStream;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -89,6 +94,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
@ -96,13 +102,15 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.SERVER_SIDE_TRANSACTION_TTL; import static org.apache.nifi.remote.protocol.http.HttpHeaders.SERVER_SIDE_TRANSACTION_TTL;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ExtendWith(MockitoExtension.class)
public class TestHttpClient { public class TestHttpClient {
private static final Logger logger = LoggerFactory.getLogger(TestHttpClient.class); private static final Logger logger = LoggerFactory.getLogger(TestHttpClient.class);
@ -123,6 +131,13 @@ public class TestHttpClient {
private static TlsConfiguration tlsConfiguration; private static TlsConfiguration tlsConfiguration;
private static final int INITIAL_TRANSACTIONS = 0;
private static final AtomicInteger outputExtendTransactions = new AtomicInteger(INITIAL_TRANSACTIONS);
@Mock
private EventReporter eventReporter;
public static class SiteInfoServlet extends HttpServlet { public static class SiteInfoServlet extends HttpServlet {
@Override @Override
@ -161,7 +176,7 @@ public class TestHttpClient {
@Override @Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// This response simulates when a Site-to-Site is given an URL which has wrong path. // This response simulates when a Site-to-Site is given a URL which has wrong path.
respondWithText(resp, "<p class=\"message-pane-content\">You may have mistyped...</p>", 200); respondWithText(resp, "<p class=\"message-pane-content\">You may have mistyped...</p>", 200);
} }
} }
@ -252,6 +267,7 @@ public class TestHttpClient {
@Override @Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException { protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException {
outputExtendTransactions.incrementAndGet();
final int reqProtocolVersion = getReqProtocolVersion(req); final int reqProtocolVersion = getReqProtocolVersion(req);
final TransactionResultEntity entity = new TransactionResultEntity(); final TransactionResultEntity entity = new TransactionResultEntity();
@ -295,7 +311,7 @@ public class TestHttpClient {
} }
logger.info("finish receiving data packets."); logger.info("finish receiving data packets.");
assertNotNull("Test case should set <serverChecksum> depending on the test scenario.", serverChecksum); assertNotNull(serverChecksum, "Test case should set <serverChecksum> depending on the test scenario.");
respondWithText(resp, serverChecksum, HttpServletResponse.SC_ACCEPTED); respondWithText(resp, serverChecksum, HttpServletResponse.SC_ACCEPTED);
} }
@ -372,6 +388,7 @@ public class TestHttpClient {
fail("Test case timeout."); fail("Test case timeout.");
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
fail("Test interrupted");
} }
} }
@ -387,7 +404,7 @@ public class TestHttpClient {
private static OutputStream getOutputStream(HttpServletRequest req, HttpServletResponse resp) throws IOException { private static OutputStream getOutputStream(HttpServletRequest req, HttpServletResponse resp) throws IOException {
OutputStream outputStream = resp.getOutputStream(); OutputStream outputStream = resp.getOutputStream();
if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
outputStream = new CompressionOutputStream(outputStream); outputStream = new CompressionOutputStream(outputStream);
} }
return outputStream; return outputStream;
@ -396,7 +413,7 @@ public class TestHttpClient {
private static DataPacket readIncomingPacket(HttpServletRequest req) throws IOException { private static DataPacket readIncomingPacket(HttpServletRequest req) throws IOException {
final StandardFlowFileCodec codec = new StandardFlowFileCodec(); final StandardFlowFileCodec codec = new StandardFlowFileCodec();
InputStream inputStream = req.getInputStream(); InputStream inputStream = req.getInputStream();
if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
inputStream = new CompressionInputStream(inputStream); inputStream = new CompressionInputStream(inputStream);
} }
@ -605,6 +622,7 @@ public class TestHttpClient {
@BeforeEach @BeforeEach
public void before() throws Exception { public void before() throws Exception {
outputExtendTransactions.set(INITIAL_TRANSACTIONS);
testCaseFinished = new CountDownLatch(1); testCaseFinished = new CountDownLatch(1);
final PeerDTO peer = new PeerDTO(); final PeerDTO peer = new PeerDTO();
@ -708,7 +726,7 @@ public class TestHttpClient {
private static void consumeDataPacket(DataPacket packet) throws IOException { private static void consumeDataPacket(DataPacket packet) throws IOException {
final ByteArrayOutputStream bos = new ByteArrayOutputStream(); final ByteArrayOutputStream bos = new ByteArrayOutputStream();
StreamUtils.copy(packet.getData(), bos); StreamUtils.copy(packet.getData(), bos);
String contents = new String(bos.toByteArray()); String contents = new String(bos.toByteArray(), StandardCharsets.UTF_8);
logger.info("received: {}, {}", contents, packet.getAttributes()); logger.info("received: {}, {}", contents, packet.getAttributes());
} }
@ -924,7 +942,6 @@ public class TestHttpClient {
transaction.complete(); transaction.complete();
} catch (final IOException e) { } catch (final IOException e) {
if (isProxyEnabled && e.getMessage().contains("504")) { if (isProxyEnabled && e.getMessage().contains("504")) {
// Gateway Timeout happens sometimes at Travis CI.
logger.warn("Request timeout. Most likely an environment dependent issue.", e); logger.warn("Request timeout. Most likely an environment dependent issue.", e);
} else { } else {
throw e; throw e;
@ -1106,7 +1123,7 @@ public class TestHttpClient {
} }
private void completeShouldFail(Transaction transaction) { private void completeShouldFail(Transaction transaction) {
assertThrows(IllegalStateException.class, () -> transaction.complete()); assertThrows(IllegalStateException.class, transaction::complete);
} }
private void confirmShouldFail(Transaction transaction) throws IOException { private void confirmShouldFail(Transaction transaction) throws IOException {
@ -1139,7 +1156,7 @@ public class TestHttpClient {
serverChecksum = "1345413116"; serverChecksum = "1345413116";
transaction.send(packet); transaction.send(packet);
IOException e = assertThrows(IOException.class, () -> transaction.confirm()); IOException e = assertThrows(IOException.class, transaction::confirm);
assertTrue(e.getMessage().contains("TimeoutException")); assertTrue(e.getMessage().contains("TimeoutException"));
completeShouldFail(transaction); completeShouldFail(transaction);
@ -1313,7 +1330,7 @@ public class TestHttpClient {
DataPacket packet; DataPacket packet;
while ((packet = transaction.receive()) != null) { while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet); consumeDataPacket(packet);
Thread.sleep(500); TimeUnit.MILLISECONDS.sleep(500);
} }
transaction.confirm(); transaction.confirm();
transaction.complete(); transaction.complete();
@ -1336,12 +1353,11 @@ public class TestHttpClient {
@Test @Test
public void testReceiveTimeoutAfterDataExchange() throws Exception { public void testReceiveTimeoutAfterDataExchange() throws Exception {
try (final SiteToSiteClient client = getDefaultBuilder()
try ( .timeout(3, TimeUnit.SECONDS)
SiteToSiteClient client = getDefaultBuilder() .portName("output-timeout-data-ex")
.timeout(5, TimeUnit.SECONDS) .eventReporter(eventReporter)
.portName("output-timeout-data-ex") .build()
.build()
) { ) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNotNull(transaction); assertNotNull(transaction);
@ -1350,11 +1366,13 @@ public class TestHttpClient {
assertNotNull(packet); assertNotNull(packet);
consumeDataPacket(packet); consumeDataPacket(packet);
IOException e = assertThrows(IOException.class, () -> transaction.receive()); IOException e = assertThrows(IOException.class, transaction::receive);
assertTrue(e.getCause() instanceof SocketTimeoutException); assertTrue(e.getCause() instanceof SocketTimeoutException);
confirmShouldFail(transaction); confirmShouldFail(transaction);
completeShouldFail(transaction); completeShouldFail(transaction);
assertNotSame(INITIAL_TRANSACTIONS, outputExtendTransactions.get());
} }
} }

View File

@ -1,99 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client.socket;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class SiteToSiteClientIT {
@Test
public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("cba")
.requestBatchCount(10)
.build();
try {
for (int i = 0; i < 1000; i++) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNotNull(transaction);
DataPacket packet;
while (true) {
packet = transaction.receive();
if (packet == null) {
break;
}
final InputStream in = packet.getData();
final long size = packet.getSize();
final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff);
}
transaction.confirm();
transaction.complete();
}
} finally {
client.close();
}
}
@Test
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("input")
.build();
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNotNull(transaction);
final Map<String, String> attrs = new HashMap<>();
attrs.put("site-to-site", "yes, please!");
final byte[] bytes = "Hello".getBytes();
final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
transaction.send(packet);
transaction.confirm();
transaction.complete();
} finally {
client.close();
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.util;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.net.SocketTimeoutException;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestExtendTransactionCommand {
private static final String TRANSACTION_URL = "https://localhost:8443/nifi-api/transaction-id";
private static final TransactionResultEntity RESULT_ENTITY = new TransactionResultEntity();
@Mock
private SiteToSiteRestApiClient client;
@Mock
private EventReporter eventReporter;
private ExtendTransactionCommand command;
@BeforeEach
public void setCommand() {
command = new ExtendTransactionCommand(client, TRANSACTION_URL, eventReporter);
}
@Test
public void testRun() throws IOException {
when(client.extendTransaction(eq(TRANSACTION_URL))).thenReturn(RESULT_ENTITY);
command.run();
verifyNoInteractions(eventReporter);
}
@Test
public void testRunIllegalStateExceptionClientNotClosed() throws IOException {
when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new IllegalStateException());
command.run();
verifyNoInteractions(eventReporter);
verify(client, never()).close();
}
@Test
public void testRunSocketTimeoutExceptionClientClosed() throws IOException {
when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new SocketTimeoutException());
command.run();
verify(eventReporter).reportEvent(eq(Severity.WARNING), anyString(), anyString());
verify(client).close();
}
}