NIFI-2863: S2S to allow cluster URL more leniently. This closes #1122

- Consolidated the target cluster URL resolving logic into
  SiteToSiteRestApiClient's as a common method
- Changed to more descriptive error message
- Added more unit test cases
This commit is contained in:
Koji Kawamura 2016-10-11 17:07:05 +09:00 committed by Matt Gilman
parent 6dc2f14198
commit c470fae065
6 changed files with 167 additions and 31 deletions

View File

@ -59,7 +59,7 @@ public class SiteInfoProvider {
final ControllerDTO controller;
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) {
apiClient.resolveBaseUrl(clusterUrl);
apiClient.setBaseUrl(SiteToSiteRestApiClient.resolveBaseUrl(clusterUrl));
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
controller = apiClient.getController();

View File

@ -107,7 +107,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
// Each node should has the same URL structure and network reach-ability with the proxy configuration.
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter())) {
final String scheme = peerDescription.isSecure() ? "https" : "http";
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
apiClient.setBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
apiClient.setConnectTimeoutMillis(timeoutMillis);
@ -115,7 +115,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final Collection<PeerDTO> peers = apiClient.getPeers();
if(peers == null || peers.size() == 0){
throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers.");
throw new IOException("Couldn't get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
}
// Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP

View File

@ -107,6 +107,7 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -1129,8 +1130,9 @@ public class SiteToSiteRestApiClient implements Closeable {
try {
return mapper.readValue(responseMessage, entityClass);
} catch (JsonParseException e) {
logger.warn("Failed to parse Json, response={}", responseMessage);
throw e;
final String msg = "Failed to parse Json. The specified URL " + baseUrl + " is not a proper remote NiFi endpoint for Site-to-Site communication.";
logger.warn("{} requestedUrl={}, response={}", msg, get.getURI(), responseMessage);
throw new IOException(msg, e);
}
}
@ -1138,6 +1140,12 @@ public class SiteToSiteRestApiClient implements Closeable {
return baseUrl;
}
/**
* Set the baseUrl as it is, without altering or adjusting the specified url string.
* If the url is specified by user input, and if it needs to be resolved with leniency,
* then use {@link #resolveBaseUrl(String)} method before passing it to this method.
* @param baseUrl url to set
*/
public void setBaseUrl(final String baseUrl) {
this.baseUrl = baseUrl;
}
@ -1150,29 +1158,54 @@ public class SiteToSiteRestApiClient implements Closeable {
this.readTimeoutMillis = readTimeoutMillis;
}
public String resolveBaseUrl(final String clusterUrl) {
public static String resolveBaseUrl(final String clusterUrl) {
Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
URI clusterUri;
try {
clusterUri = new URI(clusterUrl);
clusterUri = new URI(clusterUrl.trim());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e);
}
return this.resolveBaseUrl(clusterUri);
return resolveBaseUrl(clusterUri);
}
public String resolveBaseUrl(final URI clusterUrl) {
String urlPath = clusterUrl.getPath();
if (urlPath.endsWith("/")) {
urlPath = urlPath.substring(0, urlPath.length() - 1);
/**
* Resolve NiFi API url with leniency. This method does following conversion on uri path:
* <ul>
* <li>/ to /nifi-api</li>
* <li>/nifi to /nifi-api</li>
* <li>/some/path/ to /some/path/nifi-api</li>
* </ul>
* @param clusterUrl url to be resolved
* @return resolved url
*/
public static String resolveBaseUrl(final URI clusterUrl) {
String uriPath = clusterUrl.getPath().trim();
if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {
uriPath = "/nifi";
} else if (uriPath.endsWith("/")) {
uriPath = uriPath.substring(0, uriPath.length() - 1);
}
if (uriPath.endsWith("/nifi")) {
uriPath += "-api";
} else if (!uriPath.endsWith("/nifi-api")) {
uriPath += "/nifi-api";
}
try {
return new URL(clusterUrl.getScheme(), clusterUrl.getHost(), clusterUrl.getPort(), uriPath).toURI().toString();
} catch (MalformedURLException|URISyntaxException e) {
throw new IllegalArgumentException(e);
}
return resolveBaseUrl(clusterUrl.getScheme(), clusterUrl.getHost(), clusterUrl.getPort(), urlPath + "-api");
}
public String resolveBaseUrl(final String scheme, final String host, final int port) {
return resolveBaseUrl(scheme, host, port, "/nifi-api");
public void setBaseUrl(final String scheme, final String host, final int port) {
setBaseUrl(scheme, host, port, "/nifi-api");
}
private String resolveBaseUrl(final String scheme, final String host, final int port, final String path) {
private void setBaseUrl(final String scheme, final String host, final int port, final String path) {
final String baseUri;
try {
baseUri = new URL(scheme, host, port, path).toURI().toString();
@ -1180,7 +1213,6 @@ public class SiteToSiteRestApiClient implements Closeable {
throw new IllegalArgumentException(e);
}
this.setBaseUrl(baseUri);
return baseUri;
}
public void setCompress(final boolean compress) {

View File

@ -43,12 +43,14 @@ import org.apache.nifi.web.api.entity.PeersEntity;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -144,6 +146,15 @@ public class TestHttpClient {
}
}
public static class WrongSiteInfoServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// This response simulates when a Site-to-Site is given an URL which has wrong path.
respondWithText(resp, "<p class=\"message-pane-content\">You may have mistyped...</p>", 200);
}
}
public static class PeersServlet extends HttpServlet {
@Override
@ -420,21 +431,32 @@ public class TestHttpClient {
// Create embedded Jetty server
server = new Server(0);
ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/nifi-api");
server.setHandler(contextHandler);
final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
ServletHandler servletHandler = new ServletHandler();
final ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/nifi-api");
final ServletContextHandler wrongPathContextHandler = new ServletContextHandler();
wrongPathContextHandler.setContextPath("/wrong/nifi-api");
handlerCollection.setHandlers(new Handler[]{contextHandler, wrongPathContextHandler});
server.setHandler(handlerCollection);
final ServletHandler servletHandler = new ServletHandler();
contextHandler.insertHandler(servletHandler);
SslContextFactory sslContextFactory = new SslContextFactory();
final ServletHandler wrongPathServletHandler = new ServletHandler();
wrongPathContextHandler.insertHandler(wrongPathServletHandler);
final SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
sslContextFactory.setKeyStorePassword("localtest");
sslContextFactory.setKeyStoreType("JKS");
httpConnector = new ServerConnector(server);
HttpConfiguration https = new HttpConfiguration();
final HttpConfiguration https = new HttpConfiguration();
https.addCustomizer(new SecureRequestCustomizer());
sslConnector = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
@ -442,6 +464,8 @@ public class TestHttpClient {
server.setConnectors(new Connector[] { httpConnector, sslConnector });
wrongPathServletHandler.addServletWithMapping(WrongSiteInfoServlet.class, "/site-to-site");
servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site");
servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers");
@ -644,12 +668,14 @@ public class TestHttpClient {
private SiteToSiteClient.Builder getDefaultBuilder() {
return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
.url("http://localhost:" + httpConnector.getLocalPort() + "/nifi")
.timeout(3, TimeUnit.MINUTES)
;
}
private SiteToSiteClient.Builder getDefaultBuilderHTTPS() {
return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP)
.url("https://localhost:" + sslConnector.getLocalPort() + "/nifi")
.timeout(3, TimeUnit.MINUTES)
.keystoreFilename("src/test/resources/certs/localhost-ks.jks")
.keystorePass("localtest")
.keystoreType(KeystoreType.JKS)
@ -686,6 +712,25 @@ public class TestHttpClient {
}
@Test
public void testWrongPath() throws Exception {
final URI uri = server.getURI();
try (
SiteToSiteClient client = getDefaultBuilder()
.url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong")
.portName("input-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
assertNull(transaction);
}
}
@Test
public void testNoAvailablePeer() throws Exception {

View File

@ -20,6 +20,8 @@ import org.apache.nifi.events.EventReporter;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestSiteToSiteRestApiClient {
@Test
@ -65,4 +67,68 @@ public class TestSiteToSiteRestApiClient {
Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl);
}
@Test
public void testResolveBaseUrlLeniency() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "http://localhost:8080/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api/"));
expectedUri = "http://localhost/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi-api"));
expectedUri = "http://localhost:8080/some/path/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api/"));
}
@Test
public void testResolveBaseUrlLeniencyHttps() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "https://localhost:8443/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api/"));
expectedUri = "https://localhost/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi-api"));
expectedUri = "https://localhost:8443/some/path/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api/"));
}
}

View File

@ -147,15 +147,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
uri = new URI(requireNonNull(targetUri.trim()));
// Trim the trailing /
String uriPath = uri.getPath();
if (uriPath == null || uriPath.equals("/") || uriPath.trim().isEmpty()) {
uriPath = "/nifi";
} else if (uriPath.endsWith("/")) {
uriPath = uriPath.substring(0, uriPath.length() - 1);
}
final String apiPath = SiteToSiteRestApiClient.resolveBaseUrl(uri);
final String apiPath = uri.getScheme() + "://" + uri.getHost() + ":" + uri.getPort() + uriPath.trim() + "-api";
apiUri = new URI(apiPath);
} catch (final URISyntaxException e) {
throw new IllegalArgumentException(e);