NIFI-2567: Site-to-Site to send large data via HTTPS

- It couldn't send data larger than about 7KB due to the mis-use of
  httpasyncclient library
- Updated httpasyncclient from 4.1.1 to 4.1.2
- Let httpasyncclient framework to call produceContent multiple times as
  it gets ready to send more data via SSL session
- Added HTTPS test cases to TestHttpClient, which failed without this
  fix
This commit is contained in:
Koji Kawamura 2016-08-14 22:01:52 +09:00 committed by Mark Payne
parent 02071103d0
commit a919844461
5 changed files with 219 additions and 66 deletions

View File

@ -55,7 +55,7 @@
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId> <artifactId>httpasyncclient</artifactId>
<version>4.1.1</version> <version>4.1.2</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -112,6 +112,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty;
@ -463,6 +464,8 @@ public class SiteToSiteRestApiClient implements Closeable {
final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() { final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() {
private final ByteBuffer buffer = ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE); private final ByteBuffer buffer = ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
private int totalRead = 0;
private int totalProduced = 0;
@Override @Override
public HttpHost getTarget() { public HttpHost getTarget() {
@ -485,43 +488,59 @@ public class SiteToSiteRestApiClient implements Closeable {
return post; return post;
} }
private final AtomicBoolean bufferHasRemainingData = new AtomicBoolean(false);
@Override @Override
public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException { public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
int totalRead = 0; if (bufferHasRemainingData.get()) {
int totalProduced = 0; // If there's remaining buffer last time, send it first.
writeBuffer(encoder);
if (bufferHasRemainingData.get()) {
return;
}
}
int read; int read;
// This read() blocks until data becomes available, // This read() blocks until data becomes available,
// or corresponding outputStream is closed. // or corresponding outputStream is closed.
while ((read = dataPacketChannel.read(buffer)) > -1) { if ((read = dataPacketChannel.read(buffer)) > -1) {
buffer.flip();
while (buffer.hasRemaining()) {
totalProduced += encoder.write(buffer);
}
buffer.clear();
logger.trace("Read {} bytes from dataPacketChannel. {}", read, flowFilesPath); logger.trace("Read {} bytes from dataPacketChannel. {}", read, flowFilesPath);
totalRead += read; totalRead += read;
buffer.flip();
writeBuffer(encoder);
} else {
final long totalWritten = commSession.getOutput().getBytesWritten();
logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
flowFilesPath, totalProduced, totalRead, totalWritten);
if (totalRead != totalWritten || totalProduced != totalWritten) {
final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. Something went wrong.";
throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
}
transferDataLatch.countDown();
encoder.complete();
dataPacketChannel.close();
} }
// There might be remaining bytes in buffer. Make sure it's fully drained. }
buffer.flip();
private void writeBuffer(ContentEncoder encoder) throws IOException {
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
totalProduced += encoder.write(buffer); final int written = encoder.write(buffer);
logger.trace("written {} bytes to encoder.", written);
if (written == 0) {
logger.trace("Buffer still has remaining. {}", buffer);
bufferHasRemainingData.set(true);
return;
}
totalProduced += written;
} }
bufferHasRemainingData.set(false);
final long totalWritten = commSession.getOutput().getBytesWritten(); buffer.clear();
logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
flowFilesPath, totalProduced, totalRead, totalWritten);
if (totalRead != totalWritten || totalProduced != totalWritten) {
final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong.";
throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
}
transferDataLatch.countDown();
encoder.complete();
dataPacketChannel.close();
} }
@Override @Override

View File

@ -20,6 +20,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.KeystoreType;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionInputStream;
@ -39,9 +40,16 @@ import org.apache.nifi.web.api.entity.ControllerEntity;
import org.apache.nifi.web.api.entity.PeersEntity; import org.apache.nifi.web.api.entity.PeersEntity;
import org.apache.nifi.web.api.entity.TransactionResultEntity; import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler; import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -83,11 +91,14 @@ public class TestHttpClient {
private static Logger logger = LoggerFactory.getLogger(TestHttpClient.class); private static Logger logger = LoggerFactory.getLogger(TestHttpClient.class);
private static Server server; private static Server server;
private static ServerConnector httpConnector;
private static ServerConnector sslConnector;
final private static AtomicBoolean isTestCaseFinished = new AtomicBoolean(false); final private static AtomicBoolean isTestCaseFinished = new AtomicBoolean(false);
private static Set<PortDTO> inputPorts; private static Set<PortDTO> inputPorts;
private static Set<PortDTO> outputPorts; private static Set<PortDTO> outputPorts;
private static Set<PeerDTO> peers; private static Set<PeerDTO> peers;
private static Set<PeerDTO> peersSecure;
private static String serverChecksum; private static String serverChecksum;
public static class SiteInfoServlet extends HttpServlet { public static class SiteInfoServlet extends HttpServlet {
@ -96,11 +107,18 @@ public class TestHttpClient {
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final ControllerDTO controller = new ControllerDTO(); final ControllerDTO controller = new ControllerDTO();
controller.setRemoteSiteHttpListeningPort(server.getURI().getPort());
if (req.getLocalPort() == httpConnector.getLocalPort()) {
controller.setRemoteSiteHttpListeningPort(httpConnector.getLocalPort());
controller.setSiteToSiteSecure(false);
} else {
controller.setRemoteSiteHttpListeningPort(sslConnector.getLocalPort());
controller.setSiteToSiteSecure(true);
}
controller.setId("remote-controller-id"); controller.setId("remote-controller-id");
controller.setInstanceId("remote-instance-id"); controller.setInstanceId("remote-instance-id");
controller.setName("Remote NiFi Flow"); controller.setName("Remote NiFi Flow");
controller.setSiteToSiteSecure(false);
assertNotNull("Test case should set <inputPorts> depending on the test scenario.", inputPorts); assertNotNull("Test case should set <inputPorts> depending on the test scenario.", inputPorts);
controller.setInputPorts(inputPorts); controller.setInputPorts(inputPorts);
@ -124,8 +142,13 @@ public class TestHttpClient {
final PeersEntity peersEntity = new PeersEntity(); final PeersEntity peersEntity = new PeersEntity();
assertNotNull("Test case should set <peers> depending on the test scenario.", peers); if (req.getLocalPort() == httpConnector.getLocalPort()) {
peersEntity.setPeers(peers); assertNotNull("Test case should set <peers> depending on the test scenario.", peers);
peersEntity.setPeers(peers);
} else {
assertNotNull("Test case should set <peersSecure> depending on the test scenario.", peersSecure);
peersEntity.setPeers(peersSecure);
}
respondWithJson(resp, peersEntity); respondWithJson(resp, peersEntity);
} }
@ -383,6 +406,21 @@ public class TestHttpClient {
ServletHandler servletHandler = new ServletHandler(); ServletHandler servletHandler = new ServletHandler();
contextHandler.insertHandler(servletHandler); contextHandler.insertHandler(servletHandler);
SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
sslContextFactory.setKeyStorePassword("localtest");
sslContextFactory.setKeyStoreType("JKS");
httpConnector = new ServerConnector(server);
HttpConfiguration https = new HttpConfiguration();
https.addCustomizer(new SecureRequestCustomizer());
sslConnector = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(https));
server.setConnectors(new Connector[] { httpConnector, sslConnector });
servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site"); servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site");
servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers"); servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers");
@ -412,8 +450,7 @@ public class TestHttpClient {
server.start(); server.start();
int serverPort = server.getURI().getPort(); logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
logger.info("Starting server on port {}", serverPort);
} }
@AfterClass @AfterClass
@ -450,17 +487,26 @@ public class TestHttpClient {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "DEBUG");
final URI uri = server.getURI(); final URI uri = server.getURI();
isTestCaseFinished.set(false);
final PeerDTO peer = new PeerDTO(); final PeerDTO peer = new PeerDTO();
peer.setHostname(uri.getHost()); peer.setHostname("localhost");
peer.setPort(uri.getPort()); peer.setPort(httpConnector.getLocalPort());
peer.setFlowFileCount(10); peer.setFlowFileCount(10);
peer.setSecure(false); peer.setSecure(false);
isTestCaseFinished.set(false);
peers = new HashSet<>(); peers = new HashSet<>();
peers.add(peer); peers.add(peer);
final PeerDTO peerSecure = new PeerDTO();
peerSecure.setHostname("localhost");
peerSecure.setPort(sslConnector.getLocalPort());
peerSecure.setFlowFileCount(10);
peerSecure.setSecure(true);
peersSecure = new HashSet<>();
peersSecure.add(peerSecure);
inputPorts = new HashSet<>(); inputPorts = new HashSet<>();
final PortDTO runningInputPort = new PortDTO(); final PortDTO runningInputPort = new PortDTO();
@ -522,9 +568,20 @@ public class TestHttpClient {
} }
private SiteToSiteClient.Builder getDefaultBuilder() { private SiteToSiteClient.Builder getDefaultBuilder() {
final URI uri = server.getURI();
return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP) return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
.url("http://" + uri.getHost() + ":" + uri.getPort() + "/nifi") .url("http://localhost:" + httpConnector.getLocalPort() + "/nifi")
;
}
private SiteToSiteClient.Builder getDefaultBuilderHTTPS() {
return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
.url("https://localhost:" + sslConnector.getLocalPort() + "/nifi")
.keystoreFilename("src/test/resources/certs/localhost-ks.jks")
.keystorePass("localtest")
.keystoreType(KeystoreType.JKS)
.truststoreFilename("src/test/resources/certs/localhost-ts.jks")
.truststorePass("localtest")
.truststoreType(KeystoreType.JKS)
; ;
} }
@ -594,9 +651,6 @@ public class TestHttpClient {
@Test @Test
public void testSendSuccess() throws Exception { public void testSendSuccess() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.portName("input-running") .portName("input-running")
@ -627,12 +681,95 @@ public class TestHttpClient {
} }
@Test
public void testSendSuccessHTTPS() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNotNull(transaction);
serverChecksum = "1071206772";
for (int i = 0; i < 20; i++) {
DataPacket packet = new DataPacketBuilder()
.contents("Example contents from client.")
.attr("Client attr 1", "Client attr 1 value")
.attr("Client attr 2", "Client attr 2 value")
.build();
transaction.send(packet);
long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
logger.info("{}: {} bytes have been written.", i, written);
}
transaction.confirm();
transaction.complete();
}
}
private static void testSendLargeFile(SiteToSiteClient client) throws IOException {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNotNull(transaction);
serverChecksum = "1527414060";
final int contentSize = 10_000;
final StringBuilder sb = new StringBuilder(contentSize);
for (int i = 0; i < contentSize; i++) {
sb.append("a");
}
DataPacket packet = new DataPacketBuilder()
.contents(sb.toString())
.attr("Client attr 1", "Client attr 1 value")
.attr("Client attr 2", "Client attr 2 value")
.build();
transaction.send(packet);
long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
logger.info("{} bytes have been written.", written);
transaction.confirm();
transaction.complete();
}
@Test
public void testSendLargeFileHTTP() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.build()
) {
testSendLargeFile(client);
}
}
@Test
public void testSendLargeFileHTTPS() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-running")
.build()
) {
testSendLargeFile(client);
}
}
@Test @Test
public void testSendSuccessCompressed() throws Exception { public void testSendSuccessCompressed() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.portName("input-running") .portName("input-running")
@ -667,9 +804,6 @@ public class TestHttpClient {
@Test @Test
public void testSendSlowClientSuccess() throws Exception { public void testSendSlowClientSuccess() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.idleExpiration(1000, TimeUnit.MILLISECONDS) .idleExpiration(1000, TimeUnit.MILLISECONDS)
@ -722,9 +856,6 @@ public class TestHttpClient {
@Test @Test
public void testSendTimeout() throws Exception { public void testSendTimeout() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS) .timeout(1, TimeUnit.SECONDS)
@ -761,9 +892,6 @@ public class TestHttpClient {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "INFO"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "INFO");
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.idleExpiration(500, TimeUnit.MILLISECONDS) .idleExpiration(500, TimeUnit.MILLISECONDS)
@ -822,9 +950,6 @@ public class TestHttpClient {
@Test @Test
public void testReceiveSuccess() throws Exception { public void testReceiveSuccess() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.portName("output-running") .portName("output-running")
@ -843,12 +968,30 @@ public class TestHttpClient {
} }
} }
@Test
public void testReceiveSuccessHTTPS() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("output-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNotNull(transaction);
DataPacket packet;
while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet);
}
transaction.confirm();
transaction.complete();
}
}
@Test @Test
public void testReceiveSuccessCompressed() throws Exception { public void testReceiveSuccessCompressed() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.portName("output-running") .portName("output-running")
@ -871,9 +1014,6 @@ public class TestHttpClient {
@Test @Test
public void testReceiveSlowClientSuccess() throws Exception { public void testReceiveSlowClientSuccess() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.portName("output-running") .portName("output-running")
@ -896,9 +1036,6 @@ public class TestHttpClient {
@Test @Test
public void testReceiveTimeout() throws Exception { public void testReceiveTimeout() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS) .timeout(1, TimeUnit.SECONDS)
@ -918,9 +1055,6 @@ public class TestHttpClient {
@Test @Test
public void testReceiveTimeoutAfterDataExchange() throws Exception { public void testReceiveTimeoutAfterDataExchange() throws Exception {
final URI uri = server.getURI();
logger.info("uri={}", uri);
try ( try (
SiteToSiteClient client = getDefaultBuilder() SiteToSiteClient client = getDefaultBuilder()
.timeout(1, TimeUnit.SECONDS) .timeout(1, TimeUnit.SECONDS)