diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java index 08fb188389..e08a3acce3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.remote.protocol.FlowFileTransaction; +import org.apache.nifi.remote.protocol.HandshakenProperties; import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -90,10 +91,12 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { private class TransactionWrapper { private final FlowFileTransaction transaction; + private final HandshakenProperties handshakenProperties; private long lastCommunicationAt; - private TransactionWrapper(final FlowFileTransaction transaction) { + private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) { this.transaction = transaction; + this.handshakenProperties = handshakenProperties; this.lastCommunicationAt = System.currentTimeMillis(); } @@ -166,13 +169,17 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { public String createTransaction() { final String transactionId = UUID.randomUUID().toString(); - transactions.put(transactionId, new TransactionWrapper(null)); + transactions.put(transactionId, new TransactionWrapper(null, null)); logger.debug("Created a new transaction: {}", transactionId); return transactionId; } public boolean isTransactionActive(final String transactionId) { TransactionWrapper transaction = transactions.get(transactionId); + return isTransactionActive(transaction); + } + + private boolean isTransactionActive(TransactionWrapper transaction) { if (transaction == null) { return false; } @@ -182,7 +189,23 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { return true; } - public void holdTransaction(final String transactionId, final FlowFileTransaction transaction) throws IllegalStateException { + /** + * @param transactionId transactionId to check + * @return Returns a HandshakenProperties instance which is created when this transaction is started, + * only if the transaction is active, + * and it holds a HandshakenProperties, + * otherwise return null + */ + public HandshakenProperties getHandshakenProperties(final String transactionId) { + TransactionWrapper transaction = transactions.get(transactionId); + if (isTransactionActive(transaction)) { + return transaction.handshakenProperties; + } + return null; + } + + public void holdTransaction(final String transactionId, final FlowFileTransaction transaction, + final HandshakenProperties handshakenProperties) throws IllegalStateException { // We don't check expiration of the transaction here, to support large file transport or slow network. // The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource. TransactionWrapper currentTransaction = transactions.remove(transactionId); @@ -197,7 +220,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { logger.debug("Holding a transaction: {}", transactionId); // Server has received or sent all data, and transaction TTL count down starts here. // However, if the client doesn't consume data fast enough, server might expire and rollback the transaction. - transactions.put(transactionId, new TransactionWrapper(transaction)); + transactions.put(transactionId, new TransactionWrapper(transaction, handshakenProperties)); } public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java index ebbee17c44..660c49803e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java @@ -34,6 +34,7 @@ import org.apache.nifi.remote.protocol.Response; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.StringUtils; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -65,11 +66,21 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr @Override protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { - HandshakenProperties confirmed = new HandshakenProperties(); HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); - confirmed.setCommsIdentifier(commsSession.getTransactionId()); - validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams()); + final String transactionId = commsSession.getTransactionId(); + + HandshakenProperties confirmed = null; + if (!StringUtils.isEmpty(transactionId)) { + // If handshake is already done, use it. + confirmed = transactionManager.getHandshakenProperties(transactionId); + } + if (confirmed == null) { + // If it's not, then do handshake. + confirmed = new HandshakenProperties(); + confirmed.setCommsIdentifier(transactionId); + validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams()); + } logger.debug("{} Done handshake, confirmed={}", this, confirmed); return confirmed; @@ -168,7 +179,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); String transactionId = commSession.getTransactionId(); logger.debug("{} Holding transaction. transactionId={}", this, transactionId); - transactionManager.holdTransaction(transactionId, transaction); + transactionManager.holdTransaction(transactionId, transaction, handshakenProperties); return transaction.getFlowFilesSent().size(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java index ded2042552..6ab8988db2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.protocol.FlowFileTransaction; +import org.apache.nifi.remote.protocol.HandshakenProperties; import org.apache.nifi.util.NiFiProperties; import org.junit.BeforeClass; import org.junit.Test; @@ -45,7 +46,9 @@ public class TestHttpRemoteSiteListener { ProcessSession processSession = Mockito.mock(ProcessSession.class); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); - transactionManager.holdTransaction(transactionId, transaction); + transactionManager.holdTransaction(transactionId, transaction, new HandshakenProperties()); + + assertNotNull(transactionManager.getHandshakenProperties(transactionId)); transaction = transactionManager.finalizeTransaction(transactionId); assertNotNull(transaction); @@ -63,10 +66,10 @@ public class TestHttpRemoteSiteListener { ProcessSession processSession = Mockito.mock(ProcessSession.class); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); - transactionManager.holdTransaction(transactionId, transaction); + transactionManager.holdTransaction(transactionId, transaction, null); try { - transactionManager.holdTransaction(transactionId, transaction); + transactionManager.holdTransaction(transactionId, transaction, null); fail("The same transaction id can't hold another transaction"); } catch (IllegalStateException e) { } @@ -83,7 +86,7 @@ public class TestHttpRemoteSiteListener { ProcessSession processSession = Mockito.mock(ProcessSession.class); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); try { - transactionManager.holdTransaction(transactionId, transaction); + transactionManager.holdTransaction(transactionId, transaction, null); } catch (IllegalStateException e) { fail("Transaction can be held even if the transaction id is not valid anymore," + " in order to support large file or slow network."); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index 3034e1efd8..e2de588a68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -316,8 +316,6 @@ public class DataTransferResource extends ApplicationResource { HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator); HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol); - // TODO: How should I pass cluster information? - // serverProtocol.setNodeInformant(clusterManager); serverProtocol.handshake(peer); return serverProtocol; } @@ -833,15 +831,6 @@ public class DataTransferResource extends ApplicationResource { return result; } - // TODO: NCM no longer exists. - /* - if (properties.isClusterManager()) { - result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager."); - return result; - } - */ - - try { result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator); } catch (BadRequestException e) {