NIFI-2259: HTTP Site-to-Site can't handle DEST_FULL

HTTP Site-to-Site can't handle TRANSACTION_FINISHED_BUT_DESTINATION_FULL
scenario as expected.

That happens if the remote NiFi's input port destination relationship
becomes full during Site-to-Site client sends data. The data which has
already sent to the remote NiFi has to be committed successfully.
However, the remote NiFi returns 503 as a response of commit HTTP
request. Because it does check port availability.

The port availability check shouldn't be called at commit request, since
the session at source NiFi has already been committed. The remote NiFi
should commit its session as well, and return
TRANSACTION_FINISHED_BUT_DESTINATION_FULL response.

This fix makes a remote NiFi to keep the handshaken properties when it holds
transaction to be committed. Then if a transaction already has
handshaken properties, then use it, instead of doing a handshake process
again.
This commit is contained in:
Koji Kawamura 2016-07-25 12:14:12 +09:00 committed by joewitt
parent 809f042353
commit aae2d27879
4 changed files with 49 additions and 23 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.protocol.FlowFileTransaction; import org.apache.nifi.remote.protocol.FlowFileTransaction;
import org.apache.nifi.remote.protocol.HandshakenProperties;
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
@ -90,10 +91,12 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
private class TransactionWrapper { private class TransactionWrapper {
private final FlowFileTransaction transaction; private final FlowFileTransaction transaction;
private final HandshakenProperties handshakenProperties;
private long lastCommunicationAt; private long lastCommunicationAt;
private TransactionWrapper(final FlowFileTransaction transaction) { private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) {
this.transaction = transaction; this.transaction = transaction;
this.handshakenProperties = handshakenProperties;
this.lastCommunicationAt = System.currentTimeMillis(); this.lastCommunicationAt = System.currentTimeMillis();
} }
@ -166,13 +169,17 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
public String createTransaction() { public String createTransaction() {
final String transactionId = UUID.randomUUID().toString(); final String transactionId = UUID.randomUUID().toString();
transactions.put(transactionId, new TransactionWrapper(null)); transactions.put(transactionId, new TransactionWrapper(null, null));
logger.debug("Created a new transaction: {}", transactionId); logger.debug("Created a new transaction: {}", transactionId);
return transactionId; return transactionId;
} }
public boolean isTransactionActive(final String transactionId) { public boolean isTransactionActive(final String transactionId) {
TransactionWrapper transaction = transactions.get(transactionId); TransactionWrapper transaction = transactions.get(transactionId);
return isTransactionActive(transaction);
}
private boolean isTransactionActive(TransactionWrapper transaction) {
if (transaction == null) { if (transaction == null) {
return false; return false;
} }
@ -182,7 +189,23 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
return true; return true;
} }
public void holdTransaction(final String transactionId, final FlowFileTransaction transaction) throws IllegalStateException { /**
* @param transactionId transactionId to check
* @return Returns a HandshakenProperties instance which is created when this transaction is started,
* only if the transaction is active,
* and it holds a HandshakenProperties,
* otherwise return null
*/
public HandshakenProperties getHandshakenProperties(final String transactionId) {
TransactionWrapper transaction = transactions.get(transactionId);
if (isTransactionActive(transaction)) {
return transaction.handshakenProperties;
}
return null;
}
public void holdTransaction(final String transactionId, final FlowFileTransaction transaction,
final HandshakenProperties handshakenProperties) throws IllegalStateException {
// We don't check expiration of the transaction here, to support large file transport or slow network. // We don't check expiration of the transaction here, to support large file transport or slow network.
// The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource. // The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource.
TransactionWrapper currentTransaction = transactions.remove(transactionId); TransactionWrapper currentTransaction = transactions.remove(transactionId);
@ -197,7 +220,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
logger.debug("Holding a transaction: {}", transactionId); logger.debug("Holding a transaction: {}", transactionId);
// Server has received or sent all data, and transaction TTL count down starts here. // Server has received or sent all data, and transaction TTL count down starts here.
// However, if the client doesn't consume data fast enough, server might expire and rollback the transaction. // However, if the client doesn't consume data fast enough, server might expire and rollback the transaction.
transactions.put(transactionId, new TransactionWrapper(transaction)); transactions.put(transactionId, new TransactionWrapper(transaction, handshakenProperties));
} }
public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException { public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException {

View File

@ -34,6 +34,7 @@ import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.StringUtils;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -65,11 +66,21 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
@Override @Override
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
HandshakenProperties confirmed = new HandshakenProperties();
HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
confirmed.setCommsIdentifier(commsSession.getTransactionId()); final String transactionId = commsSession.getTransactionId();
validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
HandshakenProperties confirmed = null;
if (!StringUtils.isEmpty(transactionId)) {
// If handshake is already done, use it.
confirmed = transactionManager.getHandshakenProperties(transactionId);
}
if (confirmed == null) {
// If it's not, then do handshake.
confirmed = new HandshakenProperties();
confirmed.setCommsIdentifier(transactionId);
validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
}
logger.debug("{} Done handshake, confirmed={}", this, confirmed); logger.debug("{} Done handshake, confirmed={}", this, confirmed);
return confirmed; return confirmed;
@ -168,7 +179,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
String transactionId = commSession.getTransactionId(); String transactionId = commSession.getTransactionId();
logger.debug("{} Holding transaction. transactionId={}", this, transactionId); logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
transactionManager.holdTransaction(transactionId, transaction); transactionManager.holdTransaction(transactionId, transaction, handshakenProperties);
return transaction.getFlowFilesSent().size(); return transaction.getFlowFilesSent().size();
} }

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.protocol.FlowFileTransaction; import org.apache.nifi.remote.protocol.FlowFileTransaction;
import org.apache.nifi.remote.protocol.HandshakenProperties;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -45,7 +46,9 @@ public class TestHttpRemoteSiteListener {
ProcessSession processSession = Mockito.mock(ProcessSession.class); ProcessSession processSession = Mockito.mock(ProcessSession.class);
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
transactionManager.holdTransaction(transactionId, transaction); transactionManager.holdTransaction(transactionId, transaction, new HandshakenProperties());
assertNotNull(transactionManager.getHandshakenProperties(transactionId));
transaction = transactionManager.finalizeTransaction(transactionId); transaction = transactionManager.finalizeTransaction(transactionId);
assertNotNull(transaction); assertNotNull(transaction);
@ -63,10 +66,10 @@ public class TestHttpRemoteSiteListener {
ProcessSession processSession = Mockito.mock(ProcessSession.class); ProcessSession processSession = Mockito.mock(ProcessSession.class);
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
transactionManager.holdTransaction(transactionId, transaction); transactionManager.holdTransaction(transactionId, transaction, null);
try { try {
transactionManager.holdTransaction(transactionId, transaction); transactionManager.holdTransaction(transactionId, transaction, null);
fail("The same transaction id can't hold another transaction"); fail("The same transaction id can't hold another transaction");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
} }
@ -83,7 +86,7 @@ public class TestHttpRemoteSiteListener {
ProcessSession processSession = Mockito.mock(ProcessSession.class); ProcessSession processSession = Mockito.mock(ProcessSession.class);
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null); FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
try { try {
transactionManager.holdTransaction(transactionId, transaction); transactionManager.holdTransaction(transactionId, transaction, null);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
fail("Transaction can be held even if the transaction id is not valid anymore," + fail("Transaction can be held even if the transaction id is not valid anymore," +
" in order to support large file or slow network."); " in order to support large file or slow network.");

View File

@ -316,8 +316,6 @@ public class DataTransferResource extends ApplicationResource {
HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator); HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator);
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol); HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
// TODO: How should I pass cluster information?
// serverProtocol.setNodeInformant(clusterManager);
serverProtocol.handshake(peer); serverProtocol.handshake(peer);
return serverProtocol; return serverProtocol;
} }
@ -833,15 +831,6 @@ public class DataTransferResource extends ApplicationResource {
return result; return result;
} }
// TODO: NCM no longer exists.
/*
if (properties.isClusterManager()) {
result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager.");
return result;
}
*/
try { try {
result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator); result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator);
} catch (BadRequestException e) { } catch (BadRequestException e) {