NIFI-3657 This closes #1634. Fix HTTP S2S to use local address.

- Fixed SiteInfoProvider and HttpClient to use specified local address with its SiteToSiteRestApiClient
- Removed setupRequestConfig method call from connection and read timeout setter methods at SiteToSiteRestApiClient, because it created config object before local address was set
- Null clear StandardRemoteProcessGroup localAddress when user clears Local Network Interface

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Koji Kawamura 2017-03-29 15:18:57 +09:00 committed by joewitt
parent 0ddb90cd2d
commit 8ce2a1b3a7
5 changed files with 16 additions and 11 deletions

View File

@ -33,7 +33,7 @@ public abstract class AbstractSiteToSiteClient implements SiteToSiteClient {
siteInfoProvider.setConnectTimeoutMillis(commsTimeout); siteInfoProvider.setConnectTimeoutMillis(commsTimeout);
siteInfoProvider.setReadTimeoutMillis(commsTimeout); siteInfoProvider.setReadTimeoutMillis(commsTimeout);
siteInfoProvider.setProxy(config.getHttpProxy()); siteInfoProvider.setProxy(config.getHttpProxy());
siteInfoProvider.setLocalAddress(config.getLocalAddress());
} }
@Override @Override

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote.client; package org.apache.nifi.remote.client;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
@ -48,6 +49,7 @@ public class SiteInfoProvider {
private Boolean siteToSiteSecure; private Boolean siteToSiteSecure;
private long remoteRefreshTime; private long remoteRefreshTime;
private HttpProxy proxy; private HttpProxy proxy;
private InetAddress localAddress;
private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
@ -63,8 +65,6 @@ public class SiteInfoProvider {
final ControllerDTO controller; final ControllerDTO controller;
final URI connectedClusterUrl; final URI connectedClusterUrl;
try (final SiteToSiteRestApiClient apiClient = createSiteToSiteRestApiClient(sslContext, proxy)) { try (final SiteToSiteRestApiClient apiClient = createSiteToSiteRestApiClient(sslContext, proxy)) {
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
controller = apiClient.getController(clusterUrls); controller = apiClient.getController(clusterUrls);
try { try {
connectedClusterUrl = new URI(apiClient.getBaseUrl()); connectedClusterUrl = new URI(apiClient.getBaseUrl());
@ -100,7 +100,11 @@ public class SiteInfoProvider {
} }
protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) { protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
return new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP); final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
apiClient.setLocalAddress(localAddress);
return apiClient;
} }
public boolean isWebInterfaceSecure() { public boolean isWebInterfaceSecure() {
@ -271,4 +275,8 @@ public class SiteInfoProvider {
public void setProxy(HttpProxy proxy) { public void setProxy(HttpProxy proxy) {
this.proxy = proxy; this.proxy = proxy;
} }
public void setLocalAddress(InetAddress localAddress) {
this.localAddress = localAddress;
}
} }

View File

@ -106,6 +106,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setConnectTimeoutMillis(timeoutMillis);
apiClient.setReadTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis);
apiClient.setLocalAddress(config.getLocalAddress());
final Collection<PeerDTO> peers = apiClient.getPeers(); final Collection<PeerDTO> peers = apiClient.getPeers();
if(peers == null || peers.size() == 0){ if(peers == null || peers.size() == 0){
@ -153,6 +154,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
apiClient.setBaseUrl(peer.getUrl()); apiClient.setBaseUrl(peer.getUrl());
apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setConnectTimeoutMillis(timeoutMillis);
apiClient.setReadTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis);
apiClient.setLocalAddress(config.getLocalAddress());
apiClient.setCompress(config.isUseCompression()); apiClient.setCompress(config.isUseCompression());
apiClient.setRequestExpirationMillis(config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS)); apiClient.setRequestExpirationMillis(config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS));

View File

@ -1206,12 +1206,10 @@ public class SiteToSiteRestApiClient implements Closeable {
public void setConnectTimeoutMillis(final int connectTimeoutMillis) { public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis; this.connectTimeoutMillis = connectTimeoutMillis;
setupRequestConfig();
} }
public void setReadTimeoutMillis(final int readTimeoutMillis) { public void setReadTimeoutMillis(final int readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis; this.readTimeoutMillis = readTimeoutMillis;
setupRequestConfig();
} }
public static String getFirstUrl(final String clusterUrlStr) { public static String getFirstUrl(final String clusterUrlStr) {

View File

@ -880,6 +880,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try { try {
this.networkInterfaceName = interfaceName; this.networkInterfaceName = interfaceName;
if (interfaceName == null) { if (interfaceName == null) {
this.localAddress = null;
this.nicValidationResult = null; this.nicValidationResult = null;
} else { } else {
try { try {
@ -930,11 +931,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter()); SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setLocalAddress(getLocalAddress());
final InetAddress localAddress = getLocalAddress();
if (localAddress != null) {
apiClient.setLocalAddress(localAddress);
}
return apiClient; return apiClient;
} }