NIFI-2525: Fix Proxy auth issue with async send.

Without this fix, NiFi fails to send data via HTTP Site-to-Site through
Proxy which requires authentication due to AsynchronousCloseException.
It happens when async client replays producing contents in order to re-send the
request with auth credential for the proxy server, however the
connection is already closed.
This fix makes NiFi to send actual data only at the second round of requests, so that flow-file
contents can be sent without reading it twice.

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #915
This commit is contained in:
Koji Kawamura 2016-08-23 10:57:57 +09:00 committed by Yolanda M. Davis
parent 2ceb5c8097
commit 671301193b
4 changed files with 507 additions and 124 deletions

View File

@ -79,5 +79,11 @@
<artifactId>jetty-servlet</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.littleshoot</groupId>
<artifactId>littleproxy</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -296,7 +296,7 @@ public abstract class AbstractTransaction implements Transaction {
transactionResponse = readTransactionResponse();
} catch (final IOException e) {
throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. "
+ "It is unknown whether or not the peer successfully received/processed the data.", e);
+ "It is unknown whether or not the peer successfully received/processed the data. " + e, e);
}
logger.debug("{} Received {} from {}", this, transactionResponse, peer);

View File

@ -27,6 +27,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.StatusLine;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.AuthState;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
@ -138,6 +139,8 @@ public class SiteToSiteRestApiClient implements Closeable {
private String baseUrl;
protected final SSLContext sslContext;
protected final HttpProxy proxy;
private final AtomicBoolean proxyAuthRequiresResend = new AtomicBoolean(false);
private RequestConfig requestConfig;
private CredentialsProvider credentialsProvider;
private CloseableHttpClient httpClient;
@ -300,8 +303,7 @@ public class SiteToSiteRestApiClient implements Closeable {
public ControllerDTO getController() throws IOException {
try {
final HttpGet get = createGet("/site-to-site");
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
final HttpGet get = createGetControllerRequest();
return execute(get, ControllerEntity.class).getController();
} catch (final HttpGetFailedException e) {
@ -314,6 +316,12 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
private HttpGet createGetControllerRequest() {
final HttpGet get = createGet("/site-to-site");
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
return get;
}
public Collection<PeerDTO> getPeers() throws IOException {
final HttpGet get = createGet("/site-to-site/peers");
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
@ -321,15 +329,10 @@ public class SiteToSiteRestApiClient implements Closeable {
}
public String initiateTransaction(final TransferDirection direction, final String portId) throws IOException {
if (TransferDirection.RECEIVE.equals(direction)) {
return initiateTransaction("output-ports", portId);
} else {
return initiateTransaction("input-ports", portId);
}
}
private String initiateTransaction(final String portType, final String portId) throws IOException {
final String portType = TransferDirection.RECEIVE.equals(direction) ? "output-ports" : "input-ports";
logger.debug("initiateTransaction handshaking portType={}, portId={}", portType, portId);
final HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions");
post.setHeader("Accept", "application/json");
@ -337,43 +340,223 @@ public class SiteToSiteRestApiClient implements Closeable {
setHandshakeProperties(post);
try (CloseableHttpResponse response = getHttpClient().execute(post)) {
final int responseCode = response.getStatusLine().getStatusCode();
logger.debug("initiateTransaction responseCode={}", responseCode);
String transactionUrl;
switch (responseCode) {
case RESPONSE_CODE_CREATED:
EntityUtils.consume(response.getEntity());
transactionUrl = readTransactionUrl(response);
if (isEmpty(transactionUrl)) {
throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header");
}
final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
if (transportProtocolVersionHeader == null) {
throw new ProtocolException("Server didn't return confirmed protocol version");
}
final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
final Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
if (serverTransactionTtlHeader == null) {
throw new ProtocolException("Server didn't return " + HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
}
serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue());
break;
default:
try (InputStream content = response.getEntity().getContent()) {
throw handleErrResponse(responseCode, content);
}
}
logger.debug("initiateTransaction handshaking finished, transactionUrl={}", transactionUrl);
return transactionUrl;
final HttpResponse response;
if (TransferDirection.RECEIVE.equals(direction)) {
response = initiateTransactionForReceive(post);
} else {
response = initiateTransactionForSend(post);
}
final int responseCode = response.getStatusLine().getStatusCode();
logger.debug("initiateTransaction responseCode={}", responseCode);
String transactionUrl;
switch (responseCode) {
case RESPONSE_CODE_CREATED:
EntityUtils.consume(response.getEntity());
transactionUrl = readTransactionUrl(response);
if (isEmpty(transactionUrl)) {
throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header");
}
final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
if (transportProtocolVersionHeader == null) {
throw new ProtocolException("Server didn't return confirmed protocol version");
}
final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
final Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
if (serverTransactionTtlHeader == null) {
throw new ProtocolException("Server didn't return " + HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
}
serverTransactionTtl = Integer.parseInt(serverTransactionTtlHeader.getValue());
break;
default:
try (InputStream content = response.getEntity().getContent()) {
throw handleErrResponse(responseCode, content);
}
}
logger.debug("initiateTransaction handshaking finished, transactionUrl={}", transactionUrl);
return transactionUrl;
}
/**
* Initiate a transaction for receiving data.
* @param post a POST request to establish transaction
* @return POST request response
* @throws IOException thrown if the post request failed
*/
private HttpResponse initiateTransactionForReceive(final HttpPost post) throws IOException {
return getHttpClient().execute(post);
}
/**
* <p>
* Initiate a transaction for sending data.
* </p>
*
* <p>
* If a proxy server requires auth, the proxy server returns 407 response with available auth schema such as basic or digest.
* Then client has to resend the same request with its credential added.
* This mechanism is problematic for sending data from NiFi.
* </p>
*
* <p>
* In order to resend a POST request with auth param,
* NiFi has to either read flow-file contents to send again, or keep the POST body somewhere.
* If we store that in memory, it would causes OOM, or storing it on disk slows down performance.
* Rolling back processing session would be overkill.
* Reading flow-file contents only when it's ready to send in a streaming way is ideal.
* </p>
*
* <p>
* Additionally, the way proxy authentication is done is vary among Proxy server software.
* Some requires 407 and resend cycle for every requests, while others keep a connection between a client and
* the proxy server, then consecutive requests skip auth steps.
* The problem is, that how should we behave is only told after sending a request to the proxy.
* </p>
*
* In order to handle above concerns correctly and efficiently, this method do the followings:
*
* <ol>
* <li>Send a GET request to controller resource, to initiate an HttpAsyncClient. The instance will be used for further requests.
* This is not required by the Site-to-Site protocol, but it can setup proxy auth state safely.</li>
* <li>Send a POST request to initiate a transaction. While doing so, it captures how a proxy server works.
* If 407 and resend cycle occurs here, it implies that we need to do the same thing again when we actually send the data.
* Because if the proxy keeps using the same connection and doesn't require an auth step, it doesn't do so here.</li>
* <li>Then this method stores whether the final POST request should wait for the auth step.
* So that {@link #openConnectionForSend} can determine when to produce contents.</li>
* </ol>
*
* <p>
* The above special sequence is only executed when a proxy instance is set, and its username is set.
* </p>
*
* @param post a POST request to establish transaction
* @return POST request response
* @throws IOException thrown if the post request failed
*/
private HttpResponse initiateTransactionForSend(final HttpPost post) throws IOException {
if (shouldCheckProxyAuth()) {
final CloseableHttpAsyncClient asyncClient = getHttpAsyncClient();
final HttpGet get = createGetControllerRequest();
final Future<HttpResponse> getResult = asyncClient.execute(get, null);
try {
final HttpResponse getResponse = getResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
logger.debug("Proxy auth check has done. getResponse={}", getResponse.getStatusLine());
} catch (final ExecutionException e) {
logger.debug("Something has happened at get controller requesting thread for proxy auth check. {}", e.getMessage());
throw toIOException(e);
} catch (TimeoutException | InterruptedException e) {
throw new IOException(e);
}
}
final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() {
private boolean requestHasBeenReset = false;
@Override
public HttpHost getTarget() {
return URIUtils.extractHost(post.getURI());
}
@Override
public HttpRequest generateRequest() throws IOException, HttpException {
final BasicHttpEntity entity = new BasicHttpEntity();
post.setEntity(entity);
return post;
}
@Override
public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException {
encoder.complete();
if (shouldCheckProxyAuth() && requestHasBeenReset) {
logger.debug("Produced content again, assuming the proxy server requires authentication.");
proxyAuthRequiresResend.set(true);
}
}
@Override
public void requestCompleted(HttpContext context) {
debugProxyAuthState(context);
}
@Override
public void failed(Exception ex) {
logger.error("Create transaction for {} has failed", post.getURI(), ex);
}
@Override
public boolean isRepeatable() {
return true;
}
@Override
public void resetRequest() throws IOException {
requestHasBeenReset = true;
}
@Override
public void close() throws IOException {
}
};
final Future<HttpResponse> responseFuture = getHttpAsyncClient().execute(asyncRequestProducer, new BasicAsyncResponseConsumer(), null);
final HttpResponse response;
try {
response = responseFuture.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
logger.debug("Something has happened at initiate transaction requesting thread. {}", e.getMessage());
throw toIOException(e);
} catch (TimeoutException | InterruptedException e) {
throw new IOException(e);
}
return response;
}
/**
* Print AuthState in HttpContext for debugging purpose.
* <p>
* If the proxy server requires 407 and resend cycle, this method logs as followings, for Basic Auth:
* <ul><li>state:UNCHALLENGED;</li>
* <li>state:CHALLENGED;auth scheme:basic;credentials present</li></ul>
* </p>
* <p>
* For Digest Auth:
* <ul><li>state:UNCHALLENGED;</li>
* <li>state:CHALLENGED;auth scheme:digest;credentials present</li></ul>
* </p>
* <p>
* But if the proxy uses the same connection, it doesn't return 407, in such case
* this method is called only once with:
* <ul><li>state:UNCHALLENGED</li></ul>
* </p>
*/
private void debugProxyAuthState(HttpContext context) {
final AuthState proxyAuthState;
if (shouldCheckProxyAuth()
&& logger.isDebugEnabled()
&& (proxyAuthState = (AuthState)context.getAttribute("http.auth.proxy-scope")) != null){
logger.debug("authProxyScope={}", proxyAuthState);
}
}
private IOException toIOException(ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof IOException) {
return (IOException) cause;
} else {
return new IOException(cause);
}
}
private boolean shouldCheckProxyAuth() {
return proxy != null && !isEmpty(proxy.getUsername());
}
public boolean openConnectionForReceive(final String transactionUrl, final Peer peer) throws IOException {
@ -464,9 +647,13 @@ public class SiteToSiteRestApiClient implements Closeable {
final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() {
private final ByteBuffer buffer = ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE);
private int totalRead = 0;
private int totalProduced = 0;
private boolean requestHasBeenReset = false;
@Override
public HttpHost getTarget() {
return URIUtils.extractHost(requestUri);
@ -490,9 +677,22 @@ public class SiteToSiteRestApiClient implements Closeable {
private final AtomicBoolean bufferHasRemainingData = new AtomicBoolean(false);
/**
* If the proxy server requires authentication, the same POST request has to be sent again.
* The first request will result 407, then the next one will be sent with auth headers and actual data.
* This method produces a content only when it's need to be sent, to avoid producing the flow-file contents twice.
* Whether we need to wait auth is determined heuristically by the previous POST request which creates transaction.
* See {@link SiteToSiteRestApiClient#initiateTransactionForSend(HttpPost)} for further detail.
*/
@Override
public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
if (shouldCheckProxyAuth() && proxyAuthRequiresResend.get() && !requestHasBeenReset) {
logger.debug("Need authentication with proxy server. Postpone producing content.");
encoder.complete();
return;
}
if (bufferHasRemainingData.get()) {
// If there's remaining buffer last time, send it first.
writeBuffer(encoder);
@ -546,6 +746,7 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override
public void requestCompleted(final HttpContext context) {
logger.debug("Sending data to {} completed.", flowFilesPath);
debugProxyAuthState(context);
}
@Override
@ -562,6 +763,7 @@ public class SiteToSiteRestApiClient implements Closeable {
@Override
public void resetRequest() throws IOException {
logger.debug("Sending data request to {} has been reset...", flowFilesPath);
requestHasBeenReset = true;
}
@Override
@ -617,13 +819,8 @@ public class SiteToSiteRestApiClient implements Closeable {
try {
response = postResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (final ExecutionException e) {
logger.debug("Something has happened at sending thread. {}", e.getMessage());
final Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new IOException(cause);
}
logger.debug("Something has happened at sending data thread. {}", e.getMessage());
throw toIOException(e);
} catch (TimeoutException | InterruptedException e) {
throw new IOException(e);
}
@ -765,7 +962,7 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
private String readTransactionUrl(final CloseableHttpResponse response) {
private String readTransactionUrl(final HttpResponse response) {
final Header locationUriIntentHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME);
logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);

View File

@ -29,6 +29,7 @@ import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
@ -55,6 +56,9 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.littleshoot.proxy.HttpProxyServer;
import org.littleshoot.proxy.ProxyAuthenticator;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +70,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.HashMap;
@ -95,6 +100,9 @@ public class TestHttpClient {
private static ServerConnector sslConnector;
final private static AtomicBoolean isTestCaseFinished = new AtomicBoolean(false);
private static HttpProxyServer proxyServer;
private static HttpProxyServer proxyServerWithAuth;
private static Set<PortDTO> inputPorts;
private static Set<PortDTO> outputPorts;
private static Set<PeerDTO> peers;
@ -451,12 +459,64 @@ public class TestHttpClient {
server.start();
logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
startProxyServer();
startProxyServerWithAuth();
}
private static void startProxyServer() throws IOException {
int proxyServerPort;
try (final ServerSocket serverSocket = new ServerSocket(0)) {
proxyServerPort = serverSocket.getLocalPort();
}
proxyServer = DefaultHttpProxyServer.bootstrap()
.withPort(proxyServerPort)
.withAllowLocalOnly(true)
.start();
}
private static final String PROXY_USER = "proxy user";
private static final String PROXY_PASSWORD = "proxy password";
private static void startProxyServerWithAuth() throws IOException {
int proxyServerPort;
try (final ServerSocket serverSocket = new ServerSocket(0)) {
proxyServerPort = serverSocket.getLocalPort();
}
proxyServerWithAuth = DefaultHttpProxyServer.bootstrap()
.withPort(proxyServerPort)
.withAllowLocalOnly(true)
.withProxyAuthenticator(new ProxyAuthenticator() {
@Override
public boolean authenticate(String userName, String password) {
return PROXY_USER.equals(userName) && PROXY_PASSWORD.equals(password);
}
@Override
public String getRealm() {
return "NiFi Unit Test";
}
})
.start();
}
@AfterClass
public static void teardown() throws Exception {
logger.info("Stopping server.");
server.stop();
logger.info("Stopping servers.");
try {
server.stop();
} catch (Exception e) {
logger.error("Failed to stop Jetty server due to " + e, e);
}
try {
proxyServer.stop();
} catch (Exception e) {
logger.error("Failed to stop Proxy server due to " + e, e);
}
try {
proxyServerWithAuth.stop();
} catch (Exception e) {
logger.error("Failed to stop Proxy server with auth due to " + e, e);
}
}
private static class DataPacketBuilder {
@ -486,7 +546,6 @@ public class TestHttpClient {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "TRACE");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "DEBUG");
final URI uri = server.getURI();
isTestCaseFinished.set(false);
final PeerDTO peer = new PeerDTO();
@ -648,35 +707,82 @@ public class TestHttpClient {
}
}
private void testSend(SiteToSiteClient client) throws Exception {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNotNull(transaction);
serverChecksum = "1071206772";
for (int i = 0; i < 20; i++) {
DataPacket packet = new DataPacketBuilder()
.contents("Example contents from client.")
.attr("Client attr 1", "Client attr 1 value")
.attr("Client attr 2", "Client attr 2 value")
.build();
transaction.send(packet);
long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
logger.info("{}: {} bytes have been written.", i, written);
}
transaction.confirm();
transaction.complete();
}
@Test
public void testSendSuccess() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.build()
final SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.build()
) {
testSend(client);
}
}
@Test
public void testSendSuccessWithProxy() throws Exception {
try (
final SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServer.getListenAddress().getPort(), null, null))
.build()
) {
testSend(client);
}
}
@Test
public void testSendProxyAuthFailed() throws Exception {
try (
final SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), null, null))
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNull("createTransaction should fail at peer selection and return null.", transaction);
}
assertNotNull(transaction);
}
serverChecksum = "1071206772";
@Test
public void testSendSuccessWithProxyAuth() throws Exception {
for (int i = 0; i < 20; i++) {
DataPacket packet = new DataPacketBuilder()
.contents("Example contents from client.")
.attr("Client attr 1", "Client attr 1 value")
.attr("Client attr 2", "Client attr 2 value")
.build();
transaction.send(packet);
long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
logger.info("{}: {} bytes have been written.", i, written);
}
transaction.confirm();
transaction.complete();
try (
final SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
.build()
) {
testSend(client);
}
}
@ -685,31 +791,11 @@ public class TestHttpClient {
public void testSendSuccessHTTPS() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
final SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNotNull(transaction);
serverChecksum = "1071206772";
for (int i = 0; i < 20; i++) {
DataPacket packet = new DataPacketBuilder()
.contents("Example contents from client.")
.attr("Client attr 1", "Client attr 1 value")
.attr("Client attr 2", "Client attr 2 value")
.build();
transaction.send(packet);
long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten();
logger.info("{}: {} bytes have been written.", i, written);
}
transaction.confirm();
transaction.complete();
testSend(client);
}
}
@ -754,6 +840,34 @@ public class TestHttpClient {
}
@Test
public void testSendLargeFileHTTPWithProxy() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServer.getListenAddress().getPort(), null, null))
.build()
) {
testSendLargeFile(client);
}
}
@Test
public void testSendLargeFileHTTPWithProxyAuth() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
.build()
) {
testSendLargeFile(client);
}
}
@Test
public void testSendLargeFileHTTPS() throws Exception {
@ -767,6 +881,34 @@ public class TestHttpClient {
}
@Test
public void testSendLargeFileHTTPSWithProxy() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServer.getListenAddress().getPort(), null, null))
.build()
) {
testSendLargeFile(client);
}
}
@Test
public void testSendLargeFileHTTPSWithProxyAuth() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("input-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
.build()
) {
testSendLargeFile(client);
}
}
@Test
public void testSendSuccessCompressed() throws Exception {
@ -947,6 +1089,19 @@ public class TestHttpClient {
}
}
private void testReceive(SiteToSiteClient client) throws IOException {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNotNull(transaction);
DataPacket packet;
while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet);
}
transaction.confirm();
transaction.complete();
}
@Test
public void testReceiveSuccess() throws Exception {
@ -955,16 +1110,33 @@ public class TestHttpClient {
.portName("output-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
testReceive(client);
}
}
assertNotNull(transaction);
@Test
public void testReceiveSuccessWithProxy() throws Exception {
DataPacket packet;
while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet);
}
transaction.confirm();
transaction.complete();
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("output-running")
.httpProxy(new HttpProxy("localhost", proxyServer.getListenAddress().getPort(), null, null))
.build()
) {
testReceive(client);
}
}
@Test
public void testReceiveSuccessWithProxyAuth() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilder()
.portName("output-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
.build()
) {
testReceive(client);
}
}
@ -976,16 +1148,33 @@ public class TestHttpClient {
.portName("output-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
testReceive(client);
}
}
assertNotNull(transaction);
@Test
public void testReceiveSuccessHTTPSWithProxy() throws Exception {
DataPacket packet;
while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet);
}
transaction.confirm();
transaction.complete();
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("output-running")
.httpProxy(new HttpProxy("localhost", proxyServer.getListenAddress().getPort(), null, null))
.build()
) {
testReceive(client);
}
}
@Test
public void testReceiveSuccessHTTPSWithProxyAuth() throws Exception {
try (
SiteToSiteClient client = getDefaultBuilderHTTPS()
.portName("output-running")
.httpProxy(new HttpProxy("localhost", proxyServerWithAuth.getListenAddress().getPort(), PROXY_USER, PROXY_PASSWORD))
.build()
) {
testReceive(client);
}
}
@ -998,16 +1187,7 @@ public class TestHttpClient {
.useCompression(true)
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
assertNotNull(transaction);
DataPacket packet;
while ((packet = transaction.receive()) != null) {
consumeDataPacket(packet);
}
transaction.confirm();
transaction.complete();
testReceive(client);
}
}