mirror of https://github.com/apache/nifi.git
parent
aae2d27879
commit
e23b235617
|
@ -18,7 +18,7 @@ package org.apache.nifi.remote;
|
||||||
|
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
||||||
import org.apache.nifi.remote.protocol.HandshakenProperties;
|
import org.apache.nifi.remote.protocol.HandshakeProperties;
|
||||||
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
|
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
@ -91,10 +91,10 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
||||||
|
|
||||||
private class TransactionWrapper {
|
private class TransactionWrapper {
|
||||||
private final FlowFileTransaction transaction;
|
private final FlowFileTransaction transaction;
|
||||||
private final HandshakenProperties handshakenProperties;
|
private final HandshakeProperties handshakenProperties;
|
||||||
private long lastCommunicationAt;
|
private long lastCommunicationAt;
|
||||||
|
|
||||||
private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) {
|
private TransactionWrapper(final FlowFileTransaction transaction, final HandshakeProperties handshakenProperties) {
|
||||||
this.transaction = transaction;
|
this.transaction = transaction;
|
||||||
this.handshakenProperties = handshakenProperties;
|
this.handshakenProperties = handshakenProperties;
|
||||||
this.lastCommunicationAt = System.currentTimeMillis();
|
this.lastCommunicationAt = System.currentTimeMillis();
|
||||||
|
@ -191,12 +191,12 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param transactionId transactionId to check
|
* @param transactionId transactionId to check
|
||||||
* @return Returns a HandshakenProperties instance which is created when this transaction is started,
|
* @return Returns a HandshakeProperties instance which is created when this transaction is started,
|
||||||
* only if the transaction is active,
|
only if the transaction is active,
|
||||||
* and it holds a HandshakenProperties,
|
and it holds a HandshakeProperties,
|
||||||
* otherwise return null
|
otherwise return null
|
||||||
*/
|
*/
|
||||||
public HandshakenProperties getHandshakenProperties(final String transactionId) {
|
public HandshakeProperties getHandshakenProperties(final String transactionId) {
|
||||||
TransactionWrapper transaction = transactions.get(transactionId);
|
TransactionWrapper transaction = transactions.get(transactionId);
|
||||||
if (isTransactionActive(transaction)) {
|
if (isTransactionActive(transaction)) {
|
||||||
return transaction.handshakenProperties;
|
return transaction.handshakenProperties;
|
||||||
|
@ -205,7 +205,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void holdTransaction(final String transactionId, final FlowFileTransaction transaction,
|
public void holdTransaction(final String transactionId, final FlowFileTransaction transaction,
|
||||||
final HandshakenProperties handshakenProperties) throws IllegalStateException {
|
final HandshakeProperties handshakenProperties) throws IllegalStateException {
|
||||||
// We don't check expiration of the transaction here, to support large file transport or slow network.
|
// We don't check expiration of the transaction here, to support large file transport or slow network.
|
||||||
// The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource.
|
// The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource.
|
||||||
TransactionWrapper currentTransaction = transactions.remove(transactionId);
|
TransactionWrapper currentTransaction = transactions.remove(transactionId);
|
||||||
|
|
|
@ -63,7 +63,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
||||||
protected boolean shutdown = false;
|
protected boolean shutdown = false;
|
||||||
protected FlowFileCodec negotiatedFlowFileCodec = null;
|
protected FlowFileCodec negotiatedFlowFileCodec = null;
|
||||||
|
|
||||||
protected HandshakenProperties handshakenProperties;
|
protected HandshakeProperties handshakenProperties;
|
||||||
|
|
||||||
protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
|
protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
||||||
return handshakeCompleted;
|
return handshakeCompleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void validateHandshakeRequest(HandshakenProperties confirmed, final Peer peer, final Map<String, String> properties) throws HandshakeException {
|
protected void validateHandshakeRequest(HandshakeProperties confirmed, final Peer peer, final Map<String, String> properties) throws HandshakeException {
|
||||||
Boolean useGzip = null;
|
Boolean useGzip = null;
|
||||||
for (final Map.Entry<String, String> entry : properties.entrySet()) {
|
for (final Map.Entry<String, String> entry : properties.entrySet()) {
|
||||||
final String propertyName = entry.getKey();
|
final String propertyName = entry.getKey();
|
||||||
|
@ -201,7 +201,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
|
||||||
handshakeCompleted = true;
|
handshakeCompleted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract protected HandshakenProperties doHandshake(final Peer peer) throws IOException, HandshakeException;
|
abstract protected HandshakeProperties doHandshake(final Peer peer) throws IOException, HandshakeException;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
|
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.remote.protocol;
|
||||||
|
|
||||||
import org.apache.nifi.remote.exception.HandshakeException;
|
import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
|
|
||||||
public class HandshakenProperties {
|
public class HandshakeProperties {
|
||||||
|
|
||||||
private String commsIdentifier;
|
private String commsIdentifier;
|
||||||
private String transitUriPrefix = null;
|
private String transitUriPrefix = null;
|
|
@ -28,7 +28,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
|
||||||
import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
|
import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
|
||||||
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
||||||
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
||||||
import org.apache.nifi.remote.protocol.HandshakenProperties;
|
import org.apache.nifi.remote.protocol.HandshakeProperties;
|
||||||
import org.apache.nifi.remote.protocol.RequestType;
|
import org.apache.nifi.remote.protocol.RequestType;
|
||||||
import org.apache.nifi.remote.protocol.Response;
|
import org.apache.nifi.remote.protocol.Response;
|
||||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||||
|
@ -65,19 +65,19 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
|
protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException {
|
||||||
|
|
||||||
HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||||
final String transactionId = commsSession.getTransactionId();
|
final String transactionId = commsSession.getTransactionId();
|
||||||
|
|
||||||
HandshakenProperties confirmed = null;
|
HandshakeProperties confirmed = null;
|
||||||
if (!StringUtils.isEmpty(transactionId)) {
|
if (!StringUtils.isEmpty(transactionId)) {
|
||||||
// If handshake is already done, use it.
|
// If handshake is already done, use it.
|
||||||
confirmed = transactionManager.getHandshakenProperties(transactionId);
|
confirmed = transactionManager.getHandshakenProperties(transactionId);
|
||||||
}
|
}
|
||||||
if (confirmed == null) {
|
if (confirmed == null) {
|
||||||
// If it's not, then do handshake.
|
// If it's not, then do handshake.
|
||||||
confirmed = new HandshakenProperties();
|
confirmed = new HandshakeProperties();
|
||||||
confirmed.setCommsIdentifier(transactionId);
|
confirmed.setCommsIdentifier(transactionId);
|
||||||
validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
|
validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.nifi.remote.exception.HandshakeException;
|
||||||
import org.apache.nifi.remote.exception.ProtocolException;
|
import org.apache.nifi.remote.exception.ProtocolException;
|
||||||
import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
|
import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
|
||||||
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
||||||
import org.apache.nifi.remote.protocol.HandshakenProperties;
|
import org.apache.nifi.remote.protocol.HandshakeProperties;
|
||||||
import org.apache.nifi.remote.protocol.RequestType;
|
import org.apache.nifi.remote.protocol.RequestType;
|
||||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
@ -51,9 +51,9 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
|
||||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
|
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
|
protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException {
|
||||||
|
|
||||||
HandshakenProperties confirmed = new HandshakenProperties();
|
HandshakeProperties confirmed = new HandshakeProperties();
|
||||||
|
|
||||||
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
||||||
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.remote;
|
||||||
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
||||||
import org.apache.nifi.remote.protocol.HandshakenProperties;
|
import org.apache.nifi.remote.protocol.HandshakeProperties;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -46,7 +46,7 @@ public class TestHttpRemoteSiteListener {
|
||||||
|
|
||||||
ProcessSession processSession = Mockito.mock(ProcessSession.class);
|
ProcessSession processSession = Mockito.mock(ProcessSession.class);
|
||||||
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
|
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
|
||||||
transactionManager.holdTransaction(transactionId, transaction, new HandshakenProperties());
|
transactionManager.holdTransaction(transactionId, transaction, new HandshakeProperties());
|
||||||
|
|
||||||
assertNotNull(transactionManager.getHandshakenProperties(transactionId));
|
assertNotNull(transactionManager.getHandshakenProperties(transactionId));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue