NIFI-2028: Fixed Site-to-Site Transit URI

Fixed Site-to-Site Transit URI for HTTP to be consistent with RAW socket.

- Removed url from CommunicationsSession since it's redundant as we have
  Peer.url, too. The value was not used from anywhere other than HTTP
Site-to-Site.
- Added createTransitUri method in Communicant interface, so that
  implementation can customize transitUri while providing consistent
interface.
This commit is contained in:
Koji Kawamura 2016-07-25 11:36:23 +09:00 committed by joewitt
parent 09840027a3
commit 809f042353
22 changed files with 404 additions and 84 deletions

View File

@ -22,27 +22,6 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes
private String userDn;
private volatile String uri;
public AbstractCommunicationsSession(final String uri) {
this.uri = uri;
}
@Override
public String toString() {
return uri;
}
@Override
public void setUri(final String uri) {
this.uri = uri;
}
@Override
public String getUri() {
return uri;
}
@Override
public String getUserDn() {
return userDn;
@ -52,4 +31,10 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes
public void setUserDn(final String dn) {
this.userDn = dn;
}
@Override
public String createTransitUri(String communicantUrl, String sourceFlowFileIdentifier) {
return communicantUrl + (communicantUrl.endsWith("/") ? "" : "/") + sourceFlowFileIdentifier;
}
}

View File

@ -43,4 +43,10 @@ public interface Communicant {
* if the Distinguished Name is unknown
*/
String getDistinguishedName();
/**
* @return When data is transferred via Site-to-Site, provenance events are generated.
* This method returns a transit url used for the provenance event.
*/
String createTransitUri(final String sourceFlowFileIdentifier);
}

View File

@ -148,4 +148,9 @@ public class Peer implements Communicant {
public String getDistinguishedName() {
return commsSession.getUserDn();
}
@Override
public String createTransitUri(String sourceFlowFileIdentifier) {
return commsSession.createTransitUri(url, sourceFlowFileIdentifier);
}
}

View File

@ -126,7 +126,6 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final CommunicationsSession commSession = new HttpCommunicationsSession();
final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
commSession.setUri(nodeApiUrl);
final String clusterUrl = config.getUrl();
final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);

View File

@ -465,7 +465,6 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
final boolean siteToSiteSecure = siteInfoProvider.isSecure();
final String destinationUri = "nifi://" + hostname + ":" + port;
CommunicationsSession commsSession = null;
try {
@ -478,7 +477,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
socketChannel.connect();
commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
commsSession = new SSLSocketChannelCommunicationsSession(socketChannel);
try {
commsSession.setUserDn(socketChannel.getDn());
@ -490,11 +489,10 @@ public class EndpointConnectionPool implements PeerStatusProvider {
socketChannel.socket().connect(new InetSocketAddress(hostname, port), commsTimeout);
socketChannel.socket().setSoTimeout(commsTimeout);
commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
commsSession = new SocketChannelCommunicationsSession(socketChannel);
}
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
commsSession.setUri(destinationUri);
} catch (final IOException ioe) {
if (commsSession != null) {
commsSession.close();

View File

@ -29,9 +29,10 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession {
protected final HttpInput input;
protected final HttpOutput output;
protected String checksum;
private String dataTransferUrl;
public HttpCommunicationsSession(){
super(null);
super();
this.input = new HttpInput();
this.output = new HttpOutput();
}
@ -93,5 +94,15 @@ public class HttpCommunicationsSession extends AbstractCommunicationsSession {
this.checksum = checksum;
}
/**
* @param dataTransferUrl Set data transfer url to use as provenance event transit url.
*/
public void setDataTransferUrl(String dataTransferUrl) {
this.dataTransferUrl = dataTransferUrl;
}
@Override
public String createTransitUri(String communicantUrl, String sourceFlowFileIdentifier) {
return dataTransferUrl;
}
}

View File

@ -28,8 +28,8 @@ public class SocketChannelCommunicationsSession extends AbstractCommunicationsSe
private final SocketChannelOutput response;
private int timeout = 30000;
public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException {
super(uri);
public SocketChannelCommunicationsSession(final SocketChannel socketChannel) throws IOException {
super();
request = new SocketChannelInput(socketChannel);
response = new SocketChannelOutput(socketChannel);
channel = socketChannel;

View File

@ -26,8 +26,8 @@ public class SSLSocketChannelCommunicationsSession extends AbstractCommunication
private final SSLSocketChannelInput request;
private final SSLSocketChannelOutput response;
public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
super(uri);
public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel) {
super();
request = new SSLSocketChannelInput(channel);
response = new SSLSocketChannelOutput(channel);
this.channel = channel;

View File

@ -31,10 +31,6 @@ public interface CommunicationsSession extends Closeable {
int getTimeout() throws IOException;
void setUri(String uri);
String getUri();
String getUserDn();
void setUserDn(String dn);
@ -59,4 +55,11 @@ public interface CommunicationsSession extends Closeable {
* otherwise
*/
boolean isClosed();
/**
* @param communicantUrl Communicant's url that this session is assigned to.
* @param sourceFlowFileIdentifier Source Flow-file's uuid.
* @return A transit uri to be used in a provenance event.
*/
String createTransitUri(final String communicantUrl, final String sourceFlowFileIdentifier);
}

View File

@ -48,9 +48,9 @@ public class HttpClientTransaction extends AbstractTransaction {
this.transactionUrl = transactionUrl;
this.apiClient = apiUtil;
if(TransferDirection.RECEIVE.equals(direction)){
dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer.getCommunicationsSession());
dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, peer);
} else {
apiUtil.openConnectionForSend(transactionUrl, peer.getCommunicationsSession());
apiUtil.openConnectionForSend(transactionUrl, peer);
}
}

View File

@ -54,6 +54,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.PortNotRunningException;
@ -374,9 +375,12 @@ public class SiteToSiteRestApiClient implements Closeable {
}
public boolean openConnectionForReceive(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
public boolean openConnectionForReceive(final String transactionUrl, final Peer peer) throws IOException {
final HttpGet get = createGet(transactionUrl + "/flow-files");
// Set uri so that it'll be used as transit uri.
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(get.getURI().toString());
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
setHandshakeProperties(get);
@ -414,7 +418,7 @@ public class SiteToSiteRestApiClient implements Closeable {
return r;
}
};
((HttpInput) commSession.getInput()).setInputStream(streamCapture);
((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
startExtendingTtl(transactionUrl, httpIn, response);
keepItOpen = true;
@ -436,10 +440,13 @@ public class SiteToSiteRestApiClient implements Closeable {
private Future<HttpResponse> postResult;
private CountDownLatch transferDataLatch = new CountDownLatch(1);
public void openConnectionForSend(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
public void openConnectionForSend(final String transactionUrl, final Peer peer) throws IOException {
final CommunicationsSession commSession = peer.getCommunicationsSession();
final String flowFilesPath = transactionUrl + "/flow-files";
final HttpPost post = createPost(flowFilesPath);
// Set uri so that it'll be used as transit uri.
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(post.getURI().toString());
post.setHeader("Content-Type", "application/octet-stream");
post.setHeader("Accept", "text/plain");

View File

@ -101,7 +101,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class));
ByteArrayInputStream serverResponse = new ByteArrayInputStream(new byte[0]);
ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
@ -117,7 +117,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class));
TransactionResultEntity resultEntity = new TransactionResultEntity();
resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("3680976076"));
@ -139,7 +139,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class));
TransactionResultEntity resultEntity = new TransactionResultEntity();
resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl), eq(CONFIRM_TRANSACTION), eq("2969091230"));
@ -162,7 +162,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(CommunicationsSession.class));
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), any(Peer.class));
// The checksum is correct, but here we simulate as if it's wrong, BAD_CHECKSUM.
TransactionResultEntity resultEntity = new TransactionResultEntity();
resultEntity.setResponseCode(ResponseCode.BAD_CHECKSUM.getCode());
@ -186,7 +186,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class));
ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
ByteArrayInputStream serverResponse = new ByteArrayInputStream(serverResponseBos.toByteArray());
@ -203,7 +203,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class));
// Emulate that server returns correct checksum.
doAnswer(new Answer() {
@Override
@ -237,7 +237,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class));
doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(Peer.class));
// Emulate that server returns correct checksum.
doAnswer(new Answer() {
@Override
@ -272,7 +272,7 @@ public class TestHttpClientTransaction {
public void testSendWithInvalidChecksum() throws IOException {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(CommunicationsSession.class));
doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), any(Peer.class));
// Emulate that server returns incorrect checksum.
doAnswer(new Answer() {
@Override
@ -313,7 +313,7 @@ public class TestHttpClientTransaction {
SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
final String transactionUrl = "http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";
doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(CommunicationsSession.class));
doNothing().when(apiClient).openConnectionForSend(eq("portId"), any(Peer.class));
// Emulate that server returns correct checksum.
doAnswer(new Answer() {
@Override

View File

@ -154,12 +154,12 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
sslSocketChannel.connect();
LOG.trace("Channel connected");
commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel);
dn = sslSocketChannel.getDn();
commsSession.setUserDn(dn);
} else {
LOG.trace("{} Channel is not secure", this);
commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri);
commsSession = new SocketChannelCommunicationsSession(socketChannel);
dn = null;
}
} catch (final Exception e) {

View File

@ -76,6 +76,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final TransferDirection transferDirection;
private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
SiteToSiteClient getSiteToSiteClient() {
return clientRef.get();
}
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
@ -118,7 +121,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public void shutdown() {
super.shutdown();
final SiteToSiteClient client = clientRef.get();
final SiteToSiteClient client = getSiteToSiteClient();
if (client != null) {
try {
client.close();
@ -175,7 +178,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
firstFlowFile = null;
}
final SiteToSiteClient client = clientRef.get();
final SiteToSiteClient client = getSiteToSiteClient();
final Transaction transaction;
try {
transaction = client.createTransaction(transferDirection);
@ -275,7 +278,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
bytesSent += flowFile.getSize();
logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
final String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key()));
session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
session.remove(flowFile);
@ -331,13 +334,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
flowFile = session.importFrom(dataPacket.getData(), flowFile);
final long receiveNanos = System.nanoTime() - start;
flowFilesReceived.add(flowFile);
String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
if (sourceFlowFileIdentifier == null) {
sourceFlowFileIdentifier = "<Unknown Identifier>";
}
final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
final String transitUri = transaction.getCommunicant().createTransitUri(sourceFlowFileIdentifier);
session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
"Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));

View File

@ -271,8 +271,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
flowFilesSent.add(flowFile);
bytesSent += flowFile.getSize();
String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
final String transitUri = createTransitUri(peer, flowFile.getAttribute(CoreAttributes.UUID.key()));
session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
session.remove(flowFile);
@ -319,6 +318,10 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
}
protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) {
return peer.createTransitUri(sourceFlowFileIdentifier);
}
protected int commitTransferTransaction(Peer peer, FlowFileTransaction transaction) throws IOException {
ProcessSession session = transaction.getSession();
Set<FlowFile> flowFilesSent = transaction.getFlowFilesSent();
@ -446,8 +449,7 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid);
session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
session.transfer(flowFile, Relationship.ANONYMOUS);

View File

@ -48,7 +48,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
private final VersionNegotiator versionNegotiator;
private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
public StandardHttpFlowFileServerProtocol(final VersionNegotiator versionNegotiator) {
super();
this.versionNegotiator = versionNegotiator;
}
@ -222,4 +222,5 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
public String getResourceName() {
return RESOURCE_NAME;
}
}

View File

@ -217,4 +217,10 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
public VersionNegotiator getVersionNegotiator() {
return versionNegotiator;
}
@Override
protected String createTransitUri(Peer peer, String sourceFlowFileIdentifier) {
String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
return (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
}
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TestStandardRemoteGroupPort {
private static final String ID = "remote-group-port-id";
private static final String NAME = "remote-group-port-name";
private RemoteProcessGroup remoteGroup;
private ProcessScheduler scheduler;
private SiteToSiteClient siteToSiteClient;
private Transaction transaction;
private EventReporter eventReporter;
private ProcessGroup processGroup;
public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi";
private StandardRemoteGroupPort port;
private ProcessContext context;
private ProcessSession session;
private ProvenanceReporter provenanceReporter;
@BeforeClass
public static void setup() throws Exception {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
}
private void setupMock(final SiteToSiteTransportProtocol protocol,
final TransferDirection direction) throws Exception {
setupMock(protocol, direction, mock(Transaction.class));
}
private void setupMock(final SiteToSiteTransportProtocol protocol,
final TransferDirection direction,
final Transaction transaction) throws Exception {
processGroup = null;
remoteGroup = mock(RemoteProcessGroup.class);
scheduler = null;
siteToSiteClient = mock(SiteToSiteClient.class);
this.transaction = transaction;
eventReporter = mock(EventReporter.class);
final ConnectableType connectableType;
switch (direction) {
case SEND:
connectableType = ConnectableType.REMOTE_INPUT_PORT;
break;
case RECEIVE:
connectableType = ConnectableType.OUTPUT_PORT;
break;
default:
connectableType = null;
break;
}
port = spy(new StandardRemoteGroupPort(ID, NAME,
processGroup, remoteGroup, direction, connectableType, null, scheduler));
doReturn(true).when(remoteGroup).isTransmitting();
doReturn(protocol).when(remoteGroup).getTransportProtocol();
doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri();
doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
doReturn(eventReporter).when(remoteGroup).getEventReporter();
context = null;
session = mock(ProcessSession.class);
provenanceReporter = mock(ProvenanceReporter.class);
doReturn(provenanceReporter).when(session).getProvenanceReporter();
}
@Test
public void testSendRaw() throws Exception {
setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND);
final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false);
try (final SocketChannel socketChannel = SocketChannel.open()) {
final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
doReturn(peer).when(transaction).getCommunicant();
final QueueSize queueSize = new QueueSize(1, 10);
final FlowFile flowFile = mock(FlowFile.class);
doReturn(queueSize).when(session).getQueueSize();
// Return null when it gets called second time.
doReturn(flowFile).doReturn(null).when(session).get();
final String flowFileUuid = "flowfile-uuid";
doReturn(flowFileUuid).when(flowFile).getAttribute(eq(CoreAttributes.UUID.key()));
port.onTrigger(context, session);
// Transit uri can be customized if necessary.
verify(provenanceReporter).send(eq(flowFile), eq(peerUrl + "/" + flowFileUuid), any(String.class),
any(Long.class), eq(false));
}
}
@Test
public void testReceiveRaw() throws Exception {
setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE);
final String peerUrl = "nifi://node1.example.com:9090";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false);
try (final SocketChannel socketChannel = SocketChannel.open()) {
final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
doReturn(peer).when(transaction).getCommunicant();
final FlowFile flowFile = mock(FlowFile.class);
final String sourceFlowFileUuid = "flowfile-uuid";
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
final byte[] dataPacketContents = "DataPacket Contents".getBytes();
final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents);
final DataPacket dataPacket = new StandardDataPacket(attributes,
dataPacketInputStream, dataPacketContents.length);
doReturn(flowFile).when(session).create();
// Return null when it gets called second time.
doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session);
// Transit uri can be customized if necessary.
verify(provenanceReporter).receive(eq(flowFile), eq(peerUrl + "/" + sourceFlowFileUuid), any(String.class),
any(String.class), any(Long.class));
}
}
@Test
public void testSendHttp() throws Exception {
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND);
final String peerUrl = "http://node1.example.com:8080/nifi";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false);
final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
doReturn(peer).when(transaction).getCommunicant();
commsSession.setDataTransferUrl(flowFileEndpointUri);
final QueueSize queueSize = new QueueSize(1, 10);
final FlowFile flowFile = mock(FlowFile.class);
doReturn(queueSize).when(session).getQueueSize();
// Return null when it's called second time.
doReturn(flowFile).doReturn(null).when(session).get();
port.onTrigger(context, session);
// peerUrl should be used as the transit url.
verify(provenanceReporter).send(eq(flowFile), eq(flowFileEndpointUri), any(String.class),
any(Long.class), eq(false));
}
@Test
public void testReceiveHttp() throws Exception {
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE);
final String peerUrl = "http://node1.example.com:8080/nifi";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false);
final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
doReturn(peer).when(transaction).getCommunicant();
commsSession.setDataTransferUrl(flowFileEndpointUri);
final FlowFile flowFile = mock(FlowFile.class);
final Map<String, String> attributes = new HashMap<>();
final byte[] dataPacketContents = "DataPacket Contents".getBytes();
final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents);
final DataPacket dataPacket = new StandardDataPacket(attributes,
dataPacketInputStream, dataPacketContents.length);
doReturn(flowFile).when(session).create();
// Return null when it's called second time.
doReturn(dataPacket).doReturn(null).when(transaction).receive();
doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session);
// peerUrl should be used as the transit url.
verify(provenanceReporter).receive(eq(flowFile), eq(flowFileEndpointUri), any(String.class),
any(String.class), any(Long.class));
}
}

View File

@ -297,8 +297,11 @@ public class TestHttpFlowFileServerProtocol {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files";
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer);
@ -312,9 +315,9 @@ public class TestHttpFlowFileServerProtocol {
doReturn(flowFile).when(processSession).get();
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String peerUrl = (String)invocation.getArguments()[1];
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
@ -336,13 +339,16 @@ public class TestHttpFlowFileServerProtocol {
@Test
public void testTransferTwoFiles() throws Exception {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final String transactionId = "testTransferTwoFiles";
final Peer peer = getDefaultPeer(transactionId);
final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/"
+ transactionId + "/flow-files";
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer);
@ -360,18 +366,18 @@ public class TestHttpFlowFileServerProtocol {
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String peerUrl = (String)invocation.getArguments()[1];
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile1), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String peerUrl = (String)invocation.getArguments()[1];
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[2];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
@ -465,9 +471,12 @@ public class TestHttpFlowFileServerProtocol {
private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/"
+ transactionId + "/flow-files";
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer);
@ -499,9 +508,9 @@ public class TestHttpFlowFileServerProtocol {
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String peerUrl = (String)invocation.getArguments()[1];
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)
@ -522,13 +531,16 @@ public class TestHttpFlowFileServerProtocol {
@Test
public void testReceiveTwoFiles() throws Exception {
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final String transactionId = "testReceiveTwoFile";
final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/"
+ transactionId + "/flow-files";
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
final Peer peer = getDefaultPeer(transactionId);
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
commsSession.setUserDn("unit-test");
commsSession.setDataTransferUrl(endpointUri);
serverProtocol.handshake(peer);
@ -562,9 +574,9 @@ public class TestHttpFlowFileServerProtocol {
.when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String peerUrl = (String)invocation.getArguments()[1];
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3];
assertEquals("http://peer-host:8080/", peerUrl);
assertEquals(endpointUri, transitUri);
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
return null;
}).when(provenanceReporter)

View File

@ -45,6 +45,7 @@ import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.io.http.HttpOutput;
import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakeProperty;
@ -212,7 +213,7 @@ public class DataTransferResource extends ApplicationResource {
try {
// Execute handshake.
initiateServerProtocol(peer, transportProtocolVersion);
initiateServerProtocol(req, peer, transportProtocolVersion);
TransactionResultEntity entity = new TransactionResultEntity();
entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
@ -280,7 +281,7 @@ public class DataTransferResource extends ApplicationResource {
final int transportProtocolVersion = validationResult.transportProtocolVersion;
try {
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
int numOfFlowFiles = serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
logger.debug("finished receiving flow files, numOfFlowFiles={}", numOfFlowFiles);
if (numOfFlowFiles < 1) {
@ -304,10 +305,15 @@ public class DataTransferResource extends ApplicationResource {
return responseCreator.acceptedResponse(transactionManager, serverChecksum, transportProtocolVersion);
}
private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, Integer transportProtocolVersion) throws IOException {
private HttpFlowFileServerProtocol initiateServerProtocol(final HttpServletRequest req, final Peer peer,
final Integer transportProtocolVersion) throws IOException {
// Switch transaction protocol version based on transport protocol version.
TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion = new TransportProtocolVersionNegotiator(transportProtocolVersion);
VersionNegotiator versionNegotiator = new StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
final String dataTransferUrl = req.getRequestURL().toString();
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(dataTransferUrl);
HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator);
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
// TODO: How should I pass cluster information?
@ -316,11 +322,12 @@ public class DataTransferResource extends ApplicationResource {
return serverProtocol;
}
HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(final VersionNegotiator versionNegotiator) {
return new StandardHttpFlowFileServerProtocol(versionNegotiator);
}
private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
private Peer constructPeer(final HttpServletRequest req, final InputStream inputStream,
final OutputStream outputStream, final String portId, final String transactionId) {
final String clientHostName = req.getRemoteHost();
final int clientPort = req.getRemotePort();
@ -357,7 +364,7 @@ public class DataTransferResource extends ApplicationResource {
commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
}
if(peerDescription.isSecure()){
if (peerDescription.isSecure()) {
final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
logger.debug("initiating peer, nifiUser={}", nifiUser);
commSession.setUserDn(nifiUser.getIdentity());
@ -366,6 +373,7 @@ public class DataTransferResource extends ApplicationResource {
// TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
final String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
final String clusterUrl = "nifi://localhost:" + req.getLocalPort();
return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
}
@ -434,7 +442,7 @@ public class DataTransferResource extends ApplicationResource {
final TransactionResultEntity entity = new TransactionResultEntity();
try {
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
String inputErrMessage = null;
if (responseCode == null) {
@ -540,7 +548,7 @@ public class DataTransferResource extends ApplicationResource {
final TransactionResultEntity entity = new TransactionResultEntity();
try {
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
// Pass the response code sent from the client.
String inputErrMessage = null;
@ -653,7 +661,7 @@ public class DataTransferResource extends ApplicationResource {
final Peer peer = constructPeer(req, inputStream, tempBos, portId, transactionId);
final int transportProtocolVersion = validationResult.transportProtocolVersion;
try {
final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(req, peer, transportProtocolVersion);
StreamingOutput flowFileContent = new StreamingOutput() {
@Override
@ -792,7 +800,7 @@ public class DataTransferResource extends ApplicationResource {
try {
// Do handshake
initiateServerProtocol(peer, transportProtocolVersion);
initiateServerProtocol(req, peer, transportProtocolVersion);
transactionManager.extendTransaction(transactionId);
final TransactionResultEntity entity = new TransactionResultEntity();

View File

@ -18,6 +18,8 @@ package org.apache.nifi.web.api;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@ -214,10 +216,22 @@ public class SiteToSiteResource extends ApplicationResource {
} else {
// Standalone mode.
final PeerDTO peer = new PeerDTO();
// req.getLocalName returns private IP address, that can't be accessed from client in some environments.
// Private IP address or hostname may not be accessible from client in some environments.
// So, use the value defined in nifi.properties instead when it is defined.
final String remoteInputHost = properties.getRemoteInputHost();
peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost);
String localName;
try {
// Get local host name using InetAddress if available, same as RAW socket does.
localName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get local host name using InetAddress.", e);
}
localName = req.getLocalName();
}
peer.setHostname(isEmpty(remoteInputHost) ? localName : remoteInputHost);
peer.setPort(properties.getRemoteInputHttpPort());
peer.setSecure(properties.isSiteToSiteSecure());
peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host.

View File

@ -64,6 +64,9 @@ public class TestDataTransferResource {
private HttpServletRequest createCommonHttpServletRequest() {
final HttpServletRequest req = mock(HttpServletRequest.class);
doReturn("1").when(req).getHeader(eq(HttpHeaders.PROTOCOL_VERSION));
doReturn(new StringBuffer("http://nifi.example.com:8080")
.append("/nifi-api/data-transfer/output-ports/port-id/transactions/tx-id/flow-files"))
.when(req).getRequestURL();
return req;
}