diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml
index 63e5c125ee..c1857b52fa 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -55,7 +55,7 @@
org.apache.httpcomponents
httpasyncclient
- 4.1.1
+ 4.1.2
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 8a379b7f0e..d228378b94 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -112,6 +112,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -463,6 +464,8 @@ public class SiteToSiteRestApiClient implements Closeable {
final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() {
private final ByteBuffer buffer = ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
+ private int totalRead = 0;
+ private int totalProduced = 0;
@Override
public HttpHost getTarget() {
@@ -485,43 +488,59 @@ public class SiteToSiteRestApiClient implements Closeable {
return post;
}
+ private final AtomicBoolean bufferHasRemainingData = new AtomicBoolean(false);
+
@Override
public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
- int totalRead = 0;
- int totalProduced = 0;
+ if (bufferHasRemainingData.get()) {
+ // If there's remaining buffer last time, send it first.
+ writeBuffer(encoder);
+ if (bufferHasRemainingData.get()) {
+ return;
+ }
+ }
+
int read;
// This read() blocks until data becomes available,
// or corresponding outputStream is closed.
- while ((read = dataPacketChannel.read(buffer)) > -1) {
+ if ((read = dataPacketChannel.read(buffer)) > -1) {
- buffer.flip();
- while (buffer.hasRemaining()) {
- totalProduced += encoder.write(buffer);
- }
- buffer.clear();
logger.trace("Read {} bytes from dataPacketChannel. {}", read, flowFilesPath);
totalRead += read;
+ buffer.flip();
+ writeBuffer(encoder);
+
+ } else {
+
+ final long totalWritten = commSession.getOutput().getBytesWritten();
+ logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
+ flowFilesPath, totalProduced, totalRead, totalWritten);
+ if (totalRead != totalWritten || totalProduced != totalWritten) {
+ final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. Something went wrong.";
+ throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
+ }
+ transferDataLatch.countDown();
+ encoder.complete();
+ dataPacketChannel.close();
}
- // There might be remaining bytes in buffer. Make sure it's fully drained.
- buffer.flip();
+ }
+
+ private void writeBuffer(ContentEncoder encoder) throws IOException {
while (buffer.hasRemaining()) {
- totalProduced += encoder.write(buffer);
+ final int written = encoder.write(buffer);
+ logger.trace("written {} bytes to encoder.", written);
+ if (written == 0) {
+ logger.trace("Buffer still has remaining. {}", buffer);
+ bufferHasRemainingData.set(true);
+ return;
+ }
+ totalProduced += written;
}
-
- final long totalWritten = commSession.getOutput().getBytesWritten();
- logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
- flowFilesPath, totalProduced, totalRead, totalWritten);
- if (totalRead != totalWritten || totalProduced != totalWritten) {
- final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong.";
- throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
- }
- transferDataLatch.countDown();
- encoder.complete();
- dataPacketChannel.close();
-
+ bufferHasRemainingData.set(false);
+ buffer.clear();
}
@Override
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 3f6dd89208..3a1b1bde05 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -20,6 +20,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.io.CompressionInputStream;
@@ -39,9 +40,16 @@ import org.apache.nifi.web.api.entity.ControllerEntity;
import org.apache.nifi.web.api.entity.PeersEntity;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -83,11 +91,14 @@ public class TestHttpClient {
private static Logger logger = LoggerFactory.getLogger(TestHttpClient.class);
private static Server server;
+ private static ServerConnector httpConnector;
+ private static ServerConnector sslConnector;
final private static AtomicBoolean isTestCaseFinished = new AtomicBoolean(false);
private static Set inputPorts;
private static Set outputPorts;
private static Set peers;
+ private static Set peersSecure;
private static String serverChecksum;
public static class SiteInfoServlet extends HttpServlet {
@@ -96,11 +107,18 @@ public class TestHttpClient {
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final ControllerDTO controller = new ControllerDTO();
- controller.setRemoteSiteHttpListeningPort(server.getURI().getPort());
+
+ if (req.getLocalPort() == httpConnector.getLocalPort()) {
+ controller.setRemoteSiteHttpListeningPort(httpConnector.getLocalPort());
+ controller.setSiteToSiteSecure(false);
+ } else {
+ controller.setRemoteSiteHttpListeningPort(sslConnector.getLocalPort());
+ controller.setSiteToSiteSecure(true);
+ }
+
controller.setId("remote-controller-id");
controller.setInstanceId("remote-instance-id");
controller.setName("Remote NiFi Flow");
- controller.setSiteToSiteSecure(false);
assertNotNull("Test case should set depending on the test scenario.", inputPorts);
controller.setInputPorts(inputPorts);
@@ -124,8 +142,13 @@ public class TestHttpClient {
final PeersEntity peersEntity = new PeersEntity();
- assertNotNull("Test case should set depending on the test scenario.", peers);
- peersEntity.setPeers(peers);
+ if (req.getLocalPort() == httpConnector.getLocalPort()) {
+ assertNotNull("Test case should set depending on the test scenario.", peers);
+ peersEntity.setPeers(peers);
+ } else {
+ assertNotNull("Test case should set depending on the test scenario.", peersSecure);
+ peersEntity.setPeers(peersSecure);
+ }
respondWithJson(resp, peersEntity);
}
@@ -383,6 +406,21 @@ public class TestHttpClient {
ServletHandler servletHandler = new ServletHandler();
contextHandler.insertHandler(servletHandler);
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
+ sslContextFactory.setKeyStorePassword("localtest");
+ sslContextFactory.setKeyStoreType("JKS");
+
+ httpConnector = new ServerConnector(server);
+
+ HttpConfiguration https = new HttpConfiguration();
+ https.addCustomizer(new SecureRequestCustomizer());
+ sslConnector = new ServerConnector(server,
+ new SslConnectionFactory(sslContextFactory, "http/1.1"),
+ new HttpConnectionFactory(https));
+
+ server.setConnectors(new Connector[] { httpConnector, sslConnector });
+
servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site");
servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers");
@@ -412,8 +450,7 @@ public class TestHttpClient {
server.start();
- int serverPort = server.getURI().getPort();
- logger.info("Starting server on port {}", serverPort);
+ logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
}
@AfterClass
@@ -450,17 +487,26 @@ public class TestHttpClient {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "DEBUG");
final URI uri = server.getURI();
+ isTestCaseFinished.set(false);
+
final PeerDTO peer = new PeerDTO();
- peer.setHostname(uri.getHost());
- peer.setPort(uri.getPort());
+ peer.setHostname("localhost");
+ peer.setPort(httpConnector.getLocalPort());
peer.setFlowFileCount(10);
peer.setSecure(false);
- isTestCaseFinished.set(false);
-
peers = new HashSet<>();
peers.add(peer);
+ final PeerDTO peerSecure = new PeerDTO();
+ peerSecure.setHostname("localhost");
+ peerSecure.setPort(sslConnector.getLocalPort());
+ peerSecure.setFlowFileCount(10);
+ peerSecure.setSecure(true);
+
+ peersSecure = new HashSet<>();
+ peersSecure.add(peerSecure);
+
inputPorts = new HashSet<>();
final PortDTO runningInputPort = new PortDTO();
@@ -522,9 +568,20 @@ public class TestHttpClient {
}
private SiteToSiteClient.Builder getDefaultBuilder() {
- final URI uri = server.getURI();
return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
- .url("http://" + uri.getHost() + ":" + uri.getPort() + "/nifi")
+ .url("http://localhost:" + httpConnector.getLocalPort() + "/nifi")
+ ;
+ }
+
+ private SiteToSiteClient.Builder getDefaultBuilderHTTPS() {
+ return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
+ .url("https://localhost:" + sslConnector.getLocalPort() + "/nifi")
+ .keystoreFilename("src/test/resources/certs/localhost-ks.jks")
+ .keystorePass("localtest")
+ .keystoreType(KeystoreType.JKS)
+ .truststoreFilename("src/test/resources/certs/localhost-ts.jks")
+ .truststorePass("localtest")
+ .truststoreType(KeystoreType.JKS)
;
}
@@ -594,9 +651,6 @@ public class TestHttpClient {
@Test
public void testSendSuccess() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
@@ -627,12 +681,95 @@ public class TestHttpClient {
}
+ @Test
+ public void testSendSuccessHTTPS() throws Exception {
+
+ try (
+ SiteToSiteClient client = getDefaultBuilderHTTPS()
+ .portName("input-running")
+ .build()
+ ) {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+
+ assertNotNull(transaction);
+
+ serverChecksum = "1071206772";
+
+
+ for (int i = 0; i < 20; i++) {
+ DataPacket packet = new DataPacketBuilder()
+ .contents("Example contents from client.")
+ .attr("Client attr 1", "Client attr 1 value")
+ .attr("Client attr 2", "Client attr 2 value")
+ .build();
+ transaction.send(packet);
+ long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
+ logger.info("{}: {} bytes have been written.", i, written);
+ }
+
+ transaction.confirm();
+
+ transaction.complete();
+ }
+
+ }
+
+ private static void testSendLargeFile(SiteToSiteClient client) throws IOException {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+
+ assertNotNull(transaction);
+
+ serverChecksum = "1527414060";
+
+ final int contentSize = 10_000;
+ final StringBuilder sb = new StringBuilder(contentSize);
+ for (int i = 0; i < contentSize; i++) {
+ sb.append("a");
+ }
+
+ DataPacket packet = new DataPacketBuilder()
+ .contents(sb.toString())
+ .attr("Client attr 1", "Client attr 1 value")
+ .attr("Client attr 2", "Client attr 2 value")
+ .build();
+ transaction.send(packet);
+ long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
+ logger.info("{} bytes have been written.", written);
+
+ transaction.confirm();
+
+ transaction.complete();
+ }
+
+ @Test
+ public void testSendLargeFileHTTP() throws Exception {
+
+ try (
+ SiteToSiteClient client = getDefaultBuilder()
+ .portName("input-running")
+ .build()
+ ) {
+ testSendLargeFile(client);
+ }
+
+ }
+
+ @Test
+ public void testSendLargeFileHTTPS() throws Exception {
+
+ try (
+ SiteToSiteClient client = getDefaultBuilderHTTPS()
+ .portName("input-running")
+ .build()
+ ) {
+ testSendLargeFile(client);
+ }
+
+ }
+
@Test
public void testSendSuccessCompressed() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
@@ -667,9 +804,6 @@ public class TestHttpClient {
@Test
public void testSendSlowClientSuccess() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.idleExpiration(1000, TimeUnit.MILLISECONDS)
@@ -722,9 +856,6 @@ public class TestHttpClient {
@Test
public void testSendTimeout() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS)
@@ -761,9 +892,6 @@ public class TestHttpClient {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "INFO");
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.idleExpiration(500, TimeUnit.MILLISECONDS)
@@ -822,9 +950,6 @@ public class TestHttpClient {
@Test
public void testReceiveSuccess() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("output-running")
@@ -843,12 +968,30 @@ public class TestHttpClient {
}
}
+ @Test
+ public void testReceiveSuccessHTTPS() throws Exception {
+
+ try (
+ SiteToSiteClient client = getDefaultBuilderHTTPS()
+ .portName("output-running")
+ .build()
+ ) {
+ final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+
+ assertNotNull(transaction);
+
+ DataPacket packet;
+ while ((packet = transaction.receive()) != null) {
+ consumeDataPacket(packet);
+ }
+ transaction.confirm();
+ transaction.complete();
+ }
+ }
+
@Test
public void testReceiveSuccessCompressed() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("output-running")
@@ -871,9 +1014,6 @@ public class TestHttpClient {
@Test
public void testReceiveSlowClientSuccess() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("output-running")
@@ -896,9 +1036,6 @@ public class TestHttpClient {
@Test
public void testReceiveTimeout() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS)
@@ -918,9 +1055,6 @@ public class TestHttpClient {
@Test
public void testReceiveTimeoutAfterDataExchange() throws Exception {
- final URI uri = server.getURI();
-
- logger.info("uri={}", uri);
try (
SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS)
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks
new file mode 100755
index 0000000000..df36197d92
Binary files /dev/null and b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks differ
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks
new file mode 100755
index 0000000000..7824378a32
Binary files /dev/null and b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks differ