NIFI-4932: Enable S2S work behind a Reverse Proxy

Adding S2S endpoint Reverse Proxy mapping capability.
Added license header to SVG files.
Incorporated review comments.
Use regex to check property key processing.
Catch AttributeExpressionLanguageParsingException.
This closes #2510
This commit is contained in:
Koji Kawamura 2018-02-06 11:37:06 +09:00 committed by Matt Gilman
parent 7c0ee014d3
commit 1913b1e2a8
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
13 changed files with 1025 additions and 31 deletions

View File

@ -2659,6 +2659,10 @@ RFC 5952 Sections link:https://tools.ietf.org/html/rfc5952#section-4[4] and link
_nifi.properties_. This property accepts a comma separated list of expected values. In the event an incoming request has an X-ProxyContextPath or X-Forwarded-Context header value that is not
present in the whitelist, the "An unexpected error has occurred" page will be shown and an error will be written to the nifi-app.log.
* Additional configurations at both proxy server and NiFi cluster are required to make NiFi Site-to-Site work behind reverse proxies. See <<site_to_site_reverse_proxy_properties>> for details.
** In order to transfer data via Site-to-Site protocol through reverse proxies, both proxy and Site-to-Site client NiFi users need to have following policies, 'retrieve site-to-site details', 'receive data via site-to-site' for input ports, and 'send data via site-to-site' for output ports.
[[kerberos_service]]
== Kerberos Service
NiFi can be configured to use Kerberos SPNEGO (or "Kerberos Service") for authentication. In this scenario, users will hit the REST endpoint `/access/kerberos` and the server will respond with a `401` status code and the challenge response header `WWW-Authenticate: Negotiate`. This communicates to the browser to use the GSS-API and load the user's Kerberos ticket and provide it as a Base64-encoded header value in the subsequent request. It will be of the form `Authorization: Negotiate YII...`. NiFi will attempt to validate this ticket with the KDC. If it is successful, the user's _principal_ will be returned as the identity, and the flow will follow login/credential authentication, in that a JWT will be issued in the response to prevent the unnecessary overhead of Kerberos authentication on every subsequent request. If the ticket cannot be validated, it will return with the appropriate error response code. The user will then be able to provide their Kerberos credentials to the login form if the `KerberosLoginIdentityProvider` has been configured. See <<kerberos_login_identity_provider>> login identity provider for more details.
@ -3069,6 +3073,259 @@ responses from the remote system for `30 secs`. This allows NiFi to avoid consta
has many instances of Remote Process Groups.
|====
[[site_to_site_reverse_proxy_properties]]
=== Site to Site Routing Properties for Reverse Proxies
Site-to-Site requires peer-to-peer communication between a client and a remote NiFi node. E.g. if a remote NiFi cluster has 3 nodes, nifi0, nifi1 and nifi2, then a client requests have to be reachable to each of those remote node.
If a NiFi cluster is planned to receive/transfer data from/to Site-to-Site clients over the internet or a company firewall, a reverse proxy server can be deployed in front of the NiFi cluster nodes as a gateway to route client requests to upstream NiFi nodes, to reduce number of servers and ports those have to be exposed.
In such environment, the same NiFi cluster would also be expected to be accessed by Site-to-Site clients within the same network. Sending FlowFiles to itself for load distribution among NiFi cluster nodes can be a typical example. In this case, client requests should be routed directly to a node without going through the reverse proxy.
In order to support such deployments, remote NiFi clusters need to expose its Site-to-Site endpoints dynamically based on client request contexts. Following properties configure how peers should be exposed to clients. A routing definition consists of 4 properties, 'when', 'hostname', 'port', and 'secure', grouped by 'protocol' and 'name'. Multiple routing definitions can be configured. 'protocol' represents Site-to-Site transport protocol, i.e. raw or http.
|====
|*Property*|*Description*
|nifi.remote.route.{protocol}.{name}.when|Boolean value, 'true' or 'false'. Controls whether the routing definition for this name should be used.
|nifi.remote.route.{protocol}.{name}.hostname|Specify hostname that will be introduced to Site-to-Site clients for further communications.
|nifi.remote.route.{protocol}.{name}.port|Specify port number that will be introduced to Site-to-Site clients for further communications.
|nifi.remote.route.{protocol}.{name}.secure|Boolean value, 'true' or 'false'. Specify whether the remote peer should be accessed via secure protocol. Defaults to 'false'.
|====
All of above routing properties can use NiFi Expression Language to compute target peer description from request context. Available variables are:
|===
|*Variable name*|*Description*
|s2s.{source\|target}.hostname|Hostname of the source where the request came from, and the original target.
|s2s.{source\|target}.port|Same as above, for ports. Source port may not be useful as it is just a client side TCP port.
|s2s.{source\|target}.secure|Same as above, for secure or not.
|s2s.protocol|The name of Site-to-Site protocol being used, RAW or HTTP.
|s2s.request|The name of current request type, SiteToSiteDetail or Peers. See Site-to-Site protocol sequence below for detail.
|HTTP request headers|HTTP request header values can be referred by its name.
|===
==== Site to Site protocol sequence
Configuring these properties correctly would require some understandings on Site-to-Site protocol sequence.
1. A client initiates Site-to-Site protocol by sending a HTTP(S) request to the specified remote URL to get remote cluster Site-to-Site information. Specifically, to '/nifi-api/site-to-site'. This request is called 'SiteToSiteDetail'.
2. A remote NiFi node responds with its input and output ports, and TCP port numbers for RAW and TCP transport protocols.
3. The client sends another request to get remote peers using the TCP port number returned at #2. From this request, raw socket communication is used for RAW transport protocol, while HTTP keeps using HTTP(S). This request is called 'Peers'.
4. A remote NiFi node responds with list of available remote peers containing hostname, port, secure and workload such as the number of queued FlowFiles. From this point, further communication is done between the client and the remote NiFi node.
5. The client decides which peer to transfer data from/to, based on workload information.
6. The client sends a request to create a transaction to a remote NiFi node.
7. The remote NiFi node accepts the transaction.
8. Data is sent to the target peer. Multiple Data packets can be sent in batch manner.
9. When there is no more data to send, or reached to batch limit, the transaction is confirmed on both end by calculating CRC32 hash of sent data.
10. The transaction is committed on both end.
==== Reverse Proxy Configurations
Most reverse proxy software implement HTTP and TCP proxy mode. For NiFi RAW Site-to-Site protocol, both HTTP and TCP proxy configurations are required, and at least 2 ports needed to be opened. NiFi HTTP Site-to-Site protocol can minimize the required number of open ports at the reverse proxy to 1.
Setting correct HTTP headers at reverse proxies are crucial for NiFi to work correctly, not only routing requests but also authorize client requests. See also <<proxy_configuration>> for details.
There are two types of requests-to-NiFi-node mapping techniques those can be applied at reverse proxy servers. One is 'Server name to Node' and the other is 'Port number to Node'.
With 'Server name to Node', the same port can be used to route requests to different upstream NiFi nodes based on the requested server name (e.g. nifi0.example.com, nifi1.example.com). Host name resolution should be configured to map different host names to the same reverse proxy address, that can be done by adding /etc/hosts file or DNS server entries. Also, if clients to reverse proxy uses HTTPS, reverse proxy server certificate should have wildcard common name or SAN to be accessed by different host names.
Some reverse proxy technologies do not support server name routing rules, in such case, use 'Port number to Node' technique. 'Port number to Node' mapping requires N open port at a reverse proxy for a NiFi cluster consists of N nodes.
Refer following examples for actual configurations.
==== Site to Site and Reverse Proxy Examples
Here are some example reverse proxy and NiFi setups to illustrate how configuration files look like.
Client1 in the following diagrams represents a client that does not have direct access to NiFi nodes, and it accesses through the reverse proxy, while Client2 has direct access.
In this example, Nginx is used as a reverse proxy.
===== Example 1: RAW - Server name to Node mapping
image:s2s-rproxy-servername.svg["Server name to Node mapping"]
1. Client1 initiates Site-to-Site protocol, the request is routed to one of upstream NiFi nodes. The NiFi node computes Site-to-Site port for RAW. By the routing rule 'example1' in nifi.properties shown below, port 10443 is returned.
2. Client1 asks peers to 'nifi.example.com:10443', the request is routed to 'nifi0:8081'. The NiFi node computes available peers, by 'example1' routing rule, 'nifi0:8081' is converted to 'nifi0.example.com:10443', so are nifi1 and nifi2. As a result, 'nifi0.example.com:10443', 'nifi1.example.com:10443' and 'nifi2.example.com:10443' are returned.
3. Client1 decides to use 'nifi2.example.com:10443' for further communication.
4. On the other hand, Client2 has two URIs for Site-to-Site bootstrap URIs, and initiates the protocol using one of them. The 'example1' routing does not match this for this request, and port 8081 is returned.
5. Client2 asks peers from 'nifi1:8081'. The 'example1' does not match, so the original 'nifi0:8081', 'nifi1:8081' and 'nifi2:8081' are returned as they are.
6. Client2 decides to use 'nifi2:8081' for further communication.
Routing rule 'example1' is defined in nifi.properties (all node has the same routing configuration):
....
# S2S Routing for RAW, using server name to node
nifi.remote.route.raw.example1.when=\
${X-ProxyHost:equals('nifi.example.com'):or(\
${s2s.source.hostname:equals('nifi.example.com'):or(\
${s2s.source.hostname:equals('192.168.99.100')})})}
nifi.remote.route.raw.example1.hostname=${s2s.target.hostname}.example.com
nifi.remote.route.raw.example1.port=10443
nifi.remote.route.raw.example1.secure=true
....
nginx.conf
....
http {
upstream nifi {
server nifi0:8443;
server nifi1:8443;
server nifi2:8443;
}
# Use dnsmasq so that hostnames such as 'nifi0' can be resolved by /etc/hosts
resolver 127.0.0.1;
server {
listen 443 ssl;
server_name nifi.example.com;
ssl_certificate /etc/nginx/nginx.crt;
ssl_certificate_key /etc/nginx/nginx.key;
proxy_ssl_certificate /etc/nginx/nginx.crt;
proxy_ssl_certificate_key /etc/nginx/nginx.key;
proxy_ssl_trusted_certificate /etc/nginx/nifi-cert.pem;
location / {
proxy_pass https://nifi;
proxy_set_header X-ProxyScheme https;
proxy_set_header X-ProxyHost nginx.example.com;
proxy_set_header X-ProxyPort 17590;
proxy_set_header X-ProxyContextPath /;
proxy_set_header X-ProxiedEntitiesChain $ssl_client_s_dn;
}
}
}
stream {
map $ssl_preread_server_name $nifi {
nifi0.example.com nifi0;
nifi1.example.com nifi1;
nifi2.example.com nifi2;
default nifi0;
}
resolver 127.0.0.1;
server {
listen 10443;
proxy_pass $nifi:8081;
}
}
....
===== Example 2: RAW - Port number to Node mapping
image:s2s-rproxy-portnumber.svg["Port number to Node mapping"]
The 'example2' routing maps original host names (nifi0, 1 and 2) to different proxy ports (10443, 10444 and 10445) using 'equals and 'ifElse' expressions.
nifi.properties (all node has the same routing configuration)
....
# S2S Routing for RAW, using port number to node
nifi.remote.route.raw.example2.when=\
${X-ProxyHost:equals('nifi.example.com'):or(\
${s2s.source.hostname:equals('nifi.example.com'):or(\
${s2s.source.hostname:equals('192.168.99.100')})})}
nifi.remote.route.raw.example2.hostname=nifi.example.com
nifi.remote.route.raw.example2.port=\
${s2s.target.hostname:equals('nifi0'):ifElse('10443',\
${s2s.target.hostname:equals('nifi1'):ifElse('10444',\
${s2s.target.hostname:equals('nifi2'):ifElse('10445',\
'undefined')})})}
nifi.remote.route.raw.example2.secure=true
....
nginx.conf
....
http {
# Same as example 1.
}
stream {
map $ssl_preread_server_name $nifi {
nifi0.example.com nifi0;
nifi1.example.com nifi1;
nifi2.example.com nifi2;
default nifi0;
}
resolver 127.0.0.1;
server {
listen 10443;
proxy_pass nifi0:8081;
}
server {
listen 10444;
proxy_pass nifi1:8081;
}
server {
listen 10445;
proxy_pass nifi2:8081;
}
}
....
===== Example 3: HTTP - Server name to Node mapping
image:s2s-rproxy-http.svg["Server name to Node mapping"]
nifi.properties (all node has the same routing configuration)
....
# S2S Routing for HTTP
nifi.remote.route.http.example3.when=${X-ProxyHost:contains('.example.com')}
nifi.remote.route.http.example3.hostname=${s2s.target.hostname}.example.com
nifi.remote.route.http.example3.port=443
nifi.remote.route.http.example3.secure=true
....
nginx.conf
....
http {
upstream nifi_cluster {
server nifi0:8443;
server nifi1:8443;
server nifi2:8443;
}
# If target node is not specified, use one from cluster.
map $http_host $nifi {
nifi0.example.com:443 "nifi0:8443";
nifi1.example.com:443 "nifi1:8443";
nifi2.example.com:443 "nifi2:8443";
default "nifi_cluster";
}
resolver 127.0.0.1;
server {
listen 443 ssl;
server_name ~^(.+\.example\.com)$;
ssl_certificate /etc/nginx/nginx.crt;
ssl_certificate_key /etc/nginx/nginx.key;
proxy_ssl_certificate /etc/nginx/nginx.crt;
proxy_ssl_certificate_key /etc/nginx/nginx.key;
proxy_ssl_trusted_certificate /etc/nginx/nifi-cert.pem;
location / {
proxy_pass https://$nifi;
proxy_set_header X-ProxyScheme https;
proxy_set_header X-ProxyHost $1;
proxy_set_header X-ProxyPort 443;
proxy_set_header X-ProxyContextPath /;
proxy_set_header X-ProxiedEntitiesChain $ssl_client_s_dn;
}
}
}
....
=== Web Properties
These properties pertain to the web-based User Interface.

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 186 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 190 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 193 KiB

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
/**
* This interface is used to determine whether a ServerProtocol implementation
* can utilize peer description modification for making S2S work behind a reverse proxy.
*/
public interface PeerDescriptionModifiable {
void setPeerDescriptionModifier(final PeerDescriptionModifier modifier);
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isBlank;
public class PeerDescriptionModifier {
private static final Logger logger = LoggerFactory.getLogger(PeerDescriptionModifier.class);
public enum RequestType {
SiteToSiteDetail,
Peers
}
private static class Route {
private String name;
private SiteToSiteTransportProtocol protocol;
private PreparedQuery predicate;
private PreparedQuery hostname;
private PreparedQuery port;
private PreparedQuery secure;
private Route validate() {
if (hostname == null) {
throw new IllegalArgumentException(
format("Found an invalid Site-to-Site route definition [%s] 'hostname' is not specified.", name));
}
if (port == null) {
throw new IllegalArgumentException(
format("Found an invalid Site-to-Site route definition [%s] 'port' is not specified.", name));
}
return this;
}
private PeerDescription getTarget(final Map<String, String> variables) {
final String targetHostName = hostname.evaluateExpressions(variables, null);
if (isBlank(targetHostName)) {
throw new IllegalStateException("Target hostname was not resolved for the route definition " + name);
}
final String targetPortStr = port.evaluateExpressions(variables, null);
if (isBlank(targetPortStr)) {
throw new IllegalStateException("Target port was not resolved for the route definition " + name);
}
final String targetIsSecure = secure == null ? null : secure.evaluateExpressions(variables, null);
return new PeerDescription(targetHostName, Integer.valueOf(targetPortStr), Boolean.valueOf(targetIsSecure));
}
}
private Map<SiteToSiteTransportProtocol, List<Route>> routes;
private static final String PROPERTY_PREFIX = "nifi.remote.route.";
private static final Pattern PROPERTY_REGEX = Pattern.compile("^nifi\\.remote\\.route\\.(raw|http)\\.([^.]+)\\.(when|hostname|port|secure)$");
public PeerDescriptionModifier(final NiFiProperties properties) {
final Map<Tuple<String, String>, List<Tuple<String, String>>> routeDefinitions = properties.getPropertyKeys().stream()
.filter(propertyKey -> propertyKey.startsWith(PROPERTY_PREFIX))
.map(propertyKey -> {
final Matcher matcher = PROPERTY_REGEX.matcher(propertyKey);
if (!matcher.matches()) {
throw new IllegalArgumentException(
format("Found an invalid Site-to-Site route definition property '%s'." +
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.",
propertyKey));
}
return matcher;
})
.collect(Collectors.groupingBy(matcher -> new Tuple<>(matcher.group(1), matcher.group(2)),
Collectors.mapping(matcher -> new Tuple<>(matcher.group(3), matcher.group(0)), Collectors.toList())));
routes = routeDefinitions.entrySet().stream().map(routeDefinition -> {
final Route route = new Route();
// E.g. [raw, example1], [http, example2]
final Tuple<String, String> protocolAndRoutingName = routeDefinition.getKey();
route.protocol = SiteToSiteTransportProtocol.valueOf(protocolAndRoutingName.getKey().toUpperCase());
route.name = protocolAndRoutingName.getValue();
routeDefinition.getValue().forEach(routingConfigNameAndPropertyKey -> {
final String routingConfigName = routingConfigNameAndPropertyKey.getKey();
final String propertyKey = routingConfigNameAndPropertyKey.getValue();
final String routingConfigValue = properties.getProperty(propertyKey);
try {
switch (routingConfigName) {
case "when":
route.predicate = Query.prepare(routingConfigValue);
break;
case "hostname":
route.hostname = Query.prepare(routingConfigValue);
break;
case "port":
route.port = Query.prepare(routingConfigValue);
break;
case "secure":
route.secure = Query.prepare(routingConfigValue);
break;
}
} catch (AttributeExpressionLanguageParsingException e) {
throw new IllegalArgumentException(format("Failed to parse NiFi expression language configured" +
" for Site-to-Site routing property at '%s' due to '%s'", propertyKey, e.getMessage()), e);
}
});
return route;
}).map(Route::validate).collect(Collectors.groupingBy(r -> r.protocol));
}
private void addVariables(Map<String, String> map, String prefix, PeerDescription peer) {
map.put(format("%s.hostname", prefix), peer.getHostname());
map.put(format("%s.port", prefix), String.valueOf(peer.getPort()));
map.put(format("%s.secure", prefix), String.valueOf(peer.isSecure()));
}
public boolean isModificationNeeded(final SiteToSiteTransportProtocol protocol) {
return routes != null && routes.containsKey(protocol) && !routes.get(protocol).isEmpty();
}
/**
* Modifies target peer description so that subsequent request can go through the appropriate route
* @param source The source peer from which a request was sent, this can be any server host participated to relay the request,
* but should be the one which can contribute to derive the correct target peer.
* @param target The original target which should receive and process further incoming requests.
* @param protocol The S2S protocol being used.
* @param requestType The requested API type.
* @param variables Containing context variables those can be referred from Expression Language.
* @return A peer description. The original target peer can be returned if there is no intermediate peer such as reverse proxies needed.
*/
public PeerDescription modify(final PeerDescription source, final PeerDescription target,
final SiteToSiteTransportProtocol protocol, final RequestType requestType,
final Map<String, String> variables) {
addVariables(variables, "s2s.source", source);
addVariables(variables, "s2s.target", target);
variables.put("s2s.protocol", protocol.name());
variables.put("s2s.request", requestType.name());
logger.debug("Modifying PeerDescription, variables={}", variables);
return routes.get(protocol).stream().filter(r -> r.predicate == null
|| Boolean.valueOf(r.predicate.evaluateExpressions(variables, null)))
.map(r -> {
final PeerDescription t = r.getTarget(variables);
logger.debug("Route definition {} matched, {}", r.name, t);
return t;
})
// If a matched route was found, use it, else use the original target.
.findFirst().orElse(target);
}
}

View File

@ -64,6 +64,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
private final NodeInformant nodeInformant;
private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference<>();
private final NiFiProperties nifiProperties;
private final PeerDescriptionModifier peerDescriptionModifier;
private final AtomicBoolean stopped = new AtomicBoolean(false);
@ -78,6 +79,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
this.sslContext = sslContext;
this.nifiProperties = nifiProperties;
this.nodeInformant = nodeInformant;
peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties);
}
@Override
@ -218,6 +220,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant);
if (protocol instanceof PeerDescriptionModifiable) {
((PeerDescriptionModifiable)protocol).setPeerDescriptionModifier(peerDescriptionModifier);
}
final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null);
peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());

View File

@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerDescriptionModifiable;
import org.apache.nifi.remote.PeerDescriptionModifier;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
@ -39,14 +42,22 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakeProperties;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol {
public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements PeerDescriptionModifiable {
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
// Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
private PeerDescriptionModifier peerDescriptionModifier;
@Override
public void setPeerDescriptionModifier(PeerDescriptionModifier modifier) {
peerDescriptionModifier = modifier;
}
@Override
protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException {
@ -189,9 +200,21 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
continue;
}
if (peerDescriptionModifier != null && peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW)) {
final PeerDescription target = new PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure());
final PeerDescription modifiedTarget = peerDescriptionModifier.modify(peer.getDescription(), target,
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
dos.writeUTF(modifiedTarget.getHostname());
dos.writeInt(modifiedTarget.getPort());
dos.writeBoolean(modifiedTarget.isSecure());
} else {
dos.writeUTF(nodeInfo.getSiteToSiteHostname());
dos.writeInt(nodeInfo.getSiteToSitePort());
dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
}
dos.writeInt(nodeInfo.getTotalFlowFiles());
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.properties.StandardNiFiProperties;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestPeerDescriptionModifier {
@Test
public void testNoConfiguration() {
Properties props = new Properties();
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW));
assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP));
}
@Test
public void testInvalidNoHostname() {
Properties props = new Properties();
props.put("nifi.remote.route.raw.no-host.when", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
try {
new PeerDescriptionModifier(properties);
fail("Should throw an Exception");
} catch (IllegalArgumentException e) {
assertEquals("Found an invalid Site-to-Site route definition [no-host] 'hostname' is not specified.", e.getMessage());
}
}
@Test
public void testInvalidNoPort() {
Properties props = new Properties();
props.put("nifi.remote.route.raw.no-port.when", "true");
props.put("nifi.remote.route.raw.no-port.hostname", "proxy.example.com");
final NiFiProperties properties = new StandardNiFiProperties(props);
try {
new PeerDescriptionModifier(properties);
fail("Should throw an Exception");
} catch (IllegalArgumentException e) {
assertEquals("Found an invalid Site-to-Site route definition [no-port] 'port' is not specified.", e.getMessage());
}
}
@Test
public void testInvalidConfigurationName() {
Properties props = new Properties();
props.put("nifi.remote.route.raw.invalid-name.when", "true");
props.put("nifi.remote.route.raw.invalid-name.hostname", "proxy.example.com");
props.put("nifi.remote.route.raw.invalid-name.port", "8081");
props.put("nifi.remote.route.raw.invalid-name.secure", "true");
props.put("nifi.remote.route.raw.invalid-name.unsupported", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
try {
new PeerDescriptionModifier(properties);
fail("Should throw an Exception");
} catch (IllegalArgumentException e) {
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.raw.invalid-name.unsupported'." +
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
}
}
@Test
public void testInvalidPropertyKeyNoProtocol() {
Properties props = new Properties();
props.put("nifi.remote.route.", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
try {
new PeerDescriptionModifier(properties);
fail("Should throw an Exception");
} catch (IllegalArgumentException e) {
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.'." +
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
}
}
@Test
public void testInvalidPropertyKeyNoName() {
Properties props = new Properties();
props.put("nifi.remote.route.http.", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
try {
new PeerDescriptionModifier(properties);
fail("Should throw an Exception");
} catch (IllegalArgumentException e) {
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.http.'." +
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
}
}
@Test
public void testInvalidExpression() {
Properties props = new Properties();
props.put("nifi.remote.route.raw.invalid-el.when", "${nonExistingFunction()}");
props.put("nifi.remote.route.raw.invalid-el.hostname", "proxy.example.com");
props.put("nifi.remote.route.raw.invalid-el.port", "8081");
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
final PeerDescription source = new PeerDescription("client", 12345, true);
final PeerDescription target = new PeerDescription("nifi0", 8081, true);
try {
modifier.modify(source, target,
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
fail("Should throw an Exception");
} catch (AttributeExpressionLanguageException e) {
assertTrue(e.getMessage().startsWith("Invalid Expression"));
}
}
@Test
public void testDefaultIsNotSecure() {
Properties props = new Properties();
props.put("nifi.remote.route.raw.no-port.when", "true");
props.put("nifi.remote.route.raw.no-port.hostname", "proxy.example.com");
props.put("nifi.remote.route.raw.no-port.port", "8443");
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
final PeerDescription source = new PeerDescription("client", 12345, true);
final PeerDescription target = new PeerDescription("nifi0", 8081, true);
final PeerDescription modifiedTarget = modifier.modify(source, target,
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
assertFalse(modifiedTarget.isSecure());
}
@Test
public void testRawPortToNode() {
Properties props = new Properties();
// RAW S2S route configs.
// Port number to Node
// proxy1.example.com:17491 -> nifi0:8081
// proxy1.example.com:17492 -> nifi1:8081
props.put("nifi.remote.route.raw.port-to-node.when", "${X-ProxyHost:equals('proxy1.example.com')" +
":or(${s2s.source.hostname:equals('proxy1.example.com')})}");
props.put("nifi.remote.route.raw.port-to-node.hostname", "proxy1.example.com");
props.put("nifi.remote.route.raw.port-to-node.port",
"${s2s.target.hostname:equals('nifi0'):ifElse('17491'," +
"${s2s.target.hostname:equals('nifi1'):ifElse('17492', 'undefined')})}");
props.put("nifi.remote.route.raw.port-to-node.secure", "true");
// Other S2S configs.
props.put("nifi.remote.input.host", "node0");
props.put("nifi.remote.input.secure", "true");
props.put("nifi.remote.input.socket.port", "8081");
props.put("nifi.remote.input.http.enabled", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
// For requests coming from the proxy server, modify target description,
// so that client can send further request to the proxy.
// To nifi0.
PeerDescription source = new PeerDescription("proxy1.example.com", 12345, true);
PeerDescription target = new PeerDescription("nifi0", 8081, true);
PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals("proxy1.example.com", modifiedTarget.getHostname());
assertEquals(17491, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// To nifi1.
target = new PeerDescription("nifi1", 8081, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals("proxy1.example.com", modifiedTarget.getHostname());
assertEquals(17492, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// For requests coming directly, use the original target description.
source = new PeerDescription("192.168.1.101", 23456, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals(target, modifiedTarget);
}
@Test
public void testRawServerNameToNode() {
Properties props = new Properties();
// RAW S2S route configs.
// Server name to Node
// nifi0.example.com:17491 -> nifi0:8081
// nifi1.example.com:17491 -> nifi1:8081
props.put("nifi.remote.route.raw.name-to-node.when", "${X-ProxyHost:contains('.example.com')" +
":or(${s2s.source.hostname:contains('.example.com')})}");
props.put("nifi.remote.route.raw.name-to-node.hostname", "${s2s.target.hostname}.example.com");
props.put("nifi.remote.route.raw.name-to-node.port", "17491");
props.put("nifi.remote.route.raw.name-to-node.secure", "true");
// Other S2S configs.
props.put("nifi.remote.input.host", "node0");
props.put("nifi.remote.input.secure", "true");
props.put("nifi.remote.input.socket.port", "8081");
props.put("nifi.remote.input.http.enabled", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
// For requests coming from the proxy server, modify target description,
// so that client can send further request to the proxy.
// To nifi0.
PeerDescription source = new PeerDescription("nifi0.example.com", 12345, true);
PeerDescription target = new PeerDescription("nifi0", 8081, true);
PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals("nifi0.example.com", modifiedTarget.getHostname());
assertEquals(17491, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// To nifi1.
target = new PeerDescription("nifi1", 8081, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals("nifi1.example.com", modifiedTarget.getHostname());
assertEquals(17491, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// For requests coming directly, use the original target description.
source = new PeerDescription("192.168.1.101", 23456, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals(target, modifiedTarget);
}
@Test
public void testHttpsTerminate() {
Properties props = new Properties();
// https://nifi0.example.com -> http://nifi0:8080
// https://nifi1.example.com -> http://nifi1:8080
// S2S HTTP configs.
props.put("nifi.remote.route.http.terminate.when", "${X-ProxyHost:contains('.example.com')" +
":or(${s2s.source.hostname:contains('.example.com')})}");
props.put("nifi.remote.route.http.terminate.hostname", "${s2s.target.hostname}.example.com");
props.put("nifi.remote.route.http.terminate.port", "443");
props.put("nifi.remote.route.http.terminate.secure", "true");
// Other S2S configs.
props.put("nifi.web.http.host", "nifi0");
props.put("nifi.web.http.port", "8080");
props.put("nifi.remote.input.host", "nifi0");
props.put("nifi.remote.input.secure", "false");
props.put("nifi.remote.input.socket.port", "");
props.put("nifi.remote.input.http.enabled", "true");
final NiFiProperties properties = new StandardNiFiProperties(props);
final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties);
// For requests coming from the proxy server, modify target description,
// so that client can send further request to the proxy.
// To nifi0.
PeerDescription source = new PeerDescription("nifi0.example.com", 12345, true);
PeerDescription target = new PeerDescription("nifi0", 8080, false);
final Map<String, String> proxyHeders = new HashMap<>();
proxyHeders.put("X-ProxyHost", "nifi0.example.com:443");
proxyHeders.put("X-Forwarded-For", "172.16.1.103");
PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(proxyHeders));
assertNotNull(modifiedTarget);
assertEquals("nifi0.example.com", modifiedTarget.getHostname());
assertEquals(443, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// To nifi1.
proxyHeders.put("X-ProxyHost", "nifi1.example.com:443");
target = new PeerDescription("nifi1", 8081, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(proxyHeders));
assertNotNull(modifiedTarget);
assertEquals("nifi1.example.com", modifiedTarget.getHostname());
assertEquals(443, modifiedTarget.getPort());
assertEquals(true, modifiedTarget.isSecure());
// For requests coming directly, use the original target description.
source = new PeerDescription("192.168.1.101", 23456, true);
modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
assertNotNull(modifiedTarget);
assertEquals(target, modifiedTarget);
}
}

View File

@ -137,6 +137,11 @@ public abstract class ApplicationResource {
* @return resource uri
*/
protected String generateResourceUri(final String... path) {
URI uri = buildResourceUri(path);
return uri.toString();
}
private URI buildResourceUri(final String... path) {
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
uriBuilder.segment(path);
URI uri = uriBuilder.build();
@ -179,7 +184,7 @@ public abstract class ApplicationResource {
} catch (final URISyntaxException use) {
throw new UriBuilderException(use);
}
return uri.toString();
return uri;
}
/**
@ -1226,9 +1231,9 @@ public abstract class ApplicationResource {
public Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity,
Integer protocolVersion, final HttpRemoteSiteListener transactionManager) {
String path = "/data-transfer/" + portType + "/" + portId + "/transactions/" + transactionId;
URI location = uriInfo.getBaseUriBuilder().path(path).build();
return noCache(setCommonHeaders(Response.created(location), protocolVersion, transactionManager)
final URI transactionUri = buildResourceUri("data-transfer", portType, portId, "transactions", transactionId);
return noCache(setCommonHeaders(Response.created(transactionUri), protocolVersion, transactionManager)
.header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE))
.entity(entity).build();
}

View File

@ -48,7 +48,6 @@ import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
@ -74,6 +73,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

View File

@ -30,9 +30,12 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerDescriptionModifier;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
@ -56,6 +59,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -80,9 +85,11 @@ public class SiteToSiteResource extends ApplicationResource {
private final ResponseCreator responseCreator = new ResponseCreator();
private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
private final HttpRemoteSiteListener transactionManager;
private final PeerDescriptionModifier peerDescriptionModifier;
public SiteToSiteResource(final NiFiProperties nifiProperties) {
transactionManager = HttpRemoteSiteListener.getInstance(nifiProperties);
peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties);
}
/**
@ -131,6 +138,34 @@ public class SiteToSiteResource extends ApplicationResource {
// get the controller dto
final ControllerDTO controller = serviceFacade.getSiteToSiteDetails();
// Alter s2s port.
final boolean modificationNeededRaw = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW);
final boolean modificationNeededHttp = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP);
if (modificationNeededRaw || modificationNeededHttp) {
final PeerDescription source = getSourcePeerDescription(req);
final Boolean isSiteToSiteSecure = controller.isSiteToSiteSecure();
final String siteToSiteHostname = getSiteToSiteHostname(req);
final Map<String, String> httpHeaders = getHttpHeaders(req);
if (modificationNeededRaw) {
final PeerDescription rawTarget = new PeerDescription(siteToSiteHostname, controller.getRemoteSiteListeningPort(), isSiteToSiteSecure);
final PeerDescription modifiedRawTarget = peerDescriptionModifier.modify(source, rawTarget,
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(httpHeaders));
controller.setRemoteSiteListeningPort(modifiedRawTarget.getPort());
}
if (modificationNeededHttp) {
final PeerDescription httpTarget = new PeerDescription(siteToSiteHostname, controller.getRemoteSiteHttpListeningPort(), isSiteToSiteSecure);
final PeerDescription modifiedHttpTarget = peerDescriptionModifier.modify(source, httpTarget,
SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(httpHeaders));
controller.setRemoteSiteHttpListeningPort(modifiedHttpTarget.getPort());
if (!controller.isSiteToSiteSecure() && modifiedHttpTarget.isSecure()) {
// In order to enable TLS terminate at the reverse proxy server, even if NiFi itself is not secured, introduce the endpoint as secure.
controller.setSiteToSiteSecure(true);
}
}
}
// build the response entity
final ControllerEntity entity = new ControllerEntity();
entity.setController(controller);
@ -147,6 +182,20 @@ public class SiteToSiteResource extends ApplicationResource {
return noCache(Response.ok(entity)).build();
}
private PeerDescription getSourcePeerDescription(@Context HttpServletRequest req) {
return new PeerDescription(req.getRemoteHost(), req.getRemotePort(), req.isSecure());
}
private Map<String, String> getHttpHeaders(@Context HttpServletRequest req) {
final Map<String, String> headers = new HashMap<>();
final Enumeration<String> headerNames = req.getHeaderNames();
while (headerNames.hasMoreElements()) {
final String name = headerNames.nextElement();
headers.put(name, req.getHeader(name));
}
return headers;
}
/**
* Returns the available Peers and its status of this NiFi.
*
@ -187,18 +236,29 @@ public class SiteToSiteResource extends ApplicationResource {
}
final List<PeerDTO> peers = new ArrayList<>();
final PeerDescription source = getSourcePeerDescription(req);
final boolean modificationNeeded = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP);
final Map<String, String> headers = modificationNeeded ? getHttpHeaders(req) : null;
if (properties.isNode()) {
try {
final Map<NodeIdentifier, NodeWorkload> clusterWorkload = clusterCoordinator.getClusterWorkload();
clusterWorkload.entrySet().stream().forEach(entry -> {
clusterWorkload.forEach((nodeId, workload) -> {
final String siteToSiteHostname = nodeId.getSiteToSiteAddress() == null ? nodeId.getApiAddress() : nodeId.getSiteToSiteAddress();
final int siteToSitePort = nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort();
PeerDescription target = new PeerDescription(siteToSiteHostname, siteToSitePort, nodeId.isSiteToSiteSecure());
if (modificationNeeded) {
target = peerDescriptionModifier.modify(source, target,
SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers));
}
final PeerDTO peer = new PeerDTO();
final NodeIdentifier nodeId = entry.getKey();
final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
peer.setSecure(nodeId.isSiteToSiteSecure());
peer.setFlowFileCount(entry.getValue().getFlowFileCount());
peer.setHostname(target.getHostname());
peer.setPort(target.getPort());
peer.setSecure(target.isSecure());
peer.setFlowFileCount(workload.getFlowFileCount());
peers.add(peer);
});
} catch (IOException e) {
@ -208,7 +268,32 @@ public class SiteToSiteResource extends ApplicationResource {
} else {
// Standalone mode.
final PeerDTO peer = new PeerDTO();
final String siteToSiteHostname = getSiteToSiteHostname(req);
PeerDescription target = new PeerDescription(siteToSiteHostname,
properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
if (modificationNeeded) {
target = peerDescriptionModifier.modify(source, target,
SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers));
}
peer.setHostname(target.getHostname());
peer.setPort(target.getPort());
peer.setSecure(target.isSecure());
peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host.
peers.add(peer);
}
final PeersEntity entity = new PeersEntity();
entity.setPeers(peers);
return noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager)).build();
}
private String getSiteToSiteHostname(final HttpServletRequest req) {
// Private IP address or hostname may not be accessible from client in some environments.
// So, use the value defined in nifi.properties instead when it is defined.
final String remoteInputHost = properties.getRemoteInputHost();
@ -223,18 +308,7 @@ public class SiteToSiteResource extends ApplicationResource {
localName = req.getLocalName();
}
peer.setHostname(isEmpty(remoteInputHost) ? localName : remoteInputHost);
peer.setPort(properties.getRemoteInputHttpPort());
peer.setSecure(properties.isSiteToSiteSecure());
peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host.
peers.add(peer);
}
final PeersEntity entity = new PeersEntity();
entity.setPeers(peers);
return noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager)).build();
return isEmpty(remoteInputHost) ? localName : remoteInputHost;
}
// setters

View File

@ -41,10 +41,14 @@ import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import static org.apache.nifi.web.api.ApplicationResource.PROXY_HOST_HTTP_HEADER;
import static org.apache.nifi.web.api.ApplicationResource.PROXY_PORT_HTTP_HEADER;
import static org.apache.nifi.web.api.ApplicationResource.PROXY_SCHEME_HTTP_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@ -53,6 +57,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestDataTransferResource {
@ -156,6 +161,52 @@ public class TestDataTransferResource {
final ServletContext context = null;
final UriInfo uriInfo = mockUriInfo(locationUriStr);
final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass()
.getDeclaredField("uriInfo");
uriInfoField.setAccessible(true);
uriInfoField.set(resource, uriInfo);
final HttpServletRequest request = mock(HttpServletRequest.class);
final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass()
.getDeclaredField("httpServletRequest");
httpServletRequestField.setAccessible(true);
httpServletRequestField.set(resource, request);
final InputStream inputStream = null;
final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream);
TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity();
assertEquals(201, response.getStatus());
assertEquals(ResponseCode.PROPERTIES_OK.getCode(), resultEntity.getResponseCode());
assertEquals(locationUriStr, response.getMetadata().getFirst(HttpHeaders.LOCATION_HEADER_NAME).toString());
}
@Test
public void testCreateTransactionThroughReverseProxy() throws Exception {
final HttpServletRequest req = createCommonHttpServletRequest();
final DataTransferResource resource = getDataTransferResource();
final String locationUriStr = "https://nifi2.example.com:443/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id";
final ServletContext context = null;
final UriInfo uriInfo = mockUriInfo(locationUriStr);
final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass()
.getDeclaredField("uriInfo");
uriInfoField.setAccessible(true);
uriInfoField.set(resource, uriInfo);
final HttpServletRequest request = mock(HttpServletRequest.class);
when(request.getHeader(PROXY_SCHEME_HTTP_HEADER)).thenReturn("https");
when(request.getHeader(PROXY_HOST_HTTP_HEADER)).thenReturn("nifi2.example.com");
when(request.getHeader(PROXY_PORT_HTTP_HEADER)).thenReturn("443");
final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass()
.getDeclaredField("httpServletRequest");
httpServletRequestField.setAccessible(true);
httpServletRequestField.set(resource, request);
final InputStream inputStream = null;
final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream);