NIFI-2578:

- Ensuring NiFi still attempts to issue a registration request to support 1.x -> 0.x site to site.
This commit is contained in:
Matt Gilman 2016-08-17 11:03:52 -04:00 committed by Mark Payne
parent a0fab15eb7
commit 3d1a24ab7d
7 changed files with 22 additions and 371 deletions

View File

@ -16,31 +16,18 @@
*/ */
package org.apache.nifi.remote; package org.apache.nifi.remote;
import java.io.IOException; import com.sun.jersey.api.client.Client;
import java.net.URI; import com.sun.jersey.api.client.ClientResponse;
import java.net.URISyntaxException; import com.sun.jersey.api.client.WebResource;
import java.util.Map; import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.nifi.web.util.WebUtils;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.net.URI;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.entity.ControllerEntity;
import org.apache.nifi.web.util.WebUtils;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
public class RemoteNiFiUtils { public class RemoteNiFiUtils {
public static final String CONTROLLER_URI_PATH = "/controller";
private static final int CONNECT_TIMEOUT = 10000; private static final int CONNECT_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000; private static final int READ_TIMEOUT = 10000;
@ -50,61 +37,6 @@ public class RemoteNiFiUtils {
this.client = getClient(sslContext); this.client = getClient(sslContext);
} }
/**
* Gets the content at the specified URI.
*
* @param uri uri to retrieve
* @param timeoutMillis time to wait in millis
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
return get(uri, timeoutMillis, null);
}
/**
* Gets the content at the specified URI using the given query parameters.
*
* @param uri to retrieve
* @param timeoutMillis wait period in millis
* @param queryParams query parameters
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
if (queryParams != null) {
for (final Map.Entry<String, String> queryEntry : queryParams.entrySet()) {
webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
}
}
webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
}
/**
* Performs a HEAD request to the specified URI.
*
* @param uri to retrieve
* @param timeoutMillis wait time in millis
* @return response
* @throws ClientHandlerException ex
* @throws UniformInterfaceException ex
*/
public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
return webResource.head();
}
private Client getClient(final SSLContext sslContext) { private Client getClient(final SSLContext sslContext) {
final Client client; final Client client;
if (sslContext == null) { if (sslContext == null) {
@ -120,69 +52,7 @@ public class RemoteNiFiUtils {
} }
/** /**
* Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance is not configured to use Site-to-Site transfers. * Issues a registration request for this NiFi instance for the instance at the baseApiUri.
*
* @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
* @param timeoutMillis wait time in millis
* @return port number
* @throws IOException ex
*/
public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteListeningPort(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteRootGroupId(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getController(uriObject, timeoutMillis).getInstanceId();
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
/**
* Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance is not configured to use Site-to-Site transfers.
*
* @param uri the full URI to fetch, including the path.
* @return port
* @throws IOException ex
*/
private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
}
private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getId();
}
public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
final ClientResponse response = get(uri, timeoutMillis);
if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
return entity.getController();
} else {
final String responseMessage = response.getEntity(String.class);
throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
}
}
/**
* Issues a registration request on behalf of the current user.
* *
* @param baseApiUri uri to register with * @param baseApiUri uri to register with
* @return response * @return response

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote; package org.apache.nifi.remote;
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.UniformInterfaceException;
import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.Resource;
@ -52,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -1135,6 +1137,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} }
} }
private boolean isWebApiSecure() {
return targetUri.toString().toLowerCase().startsWith("https");
}
@Override @Override
public boolean isSiteToSiteEnabled() { public boolean isSiteToSiteEnabled() {
readLock.lock(); readLock.lock();
@ -1182,9 +1188,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} catch (SiteToSiteRestApiClient.HttpGetFailedException e) { } catch (SiteToSiteRestApiClient.HttpGetFailedException e) {
if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) { if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) {
// TODO: implement registration request
/*
try { try {
// attempt to issue a registration request in case the target instance is a 0.x
final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) { if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) {
logger.info("{} Issued a Request to communicate with remote instance", this); logger.info("{} Issued a Request to communicate with remote instance", this);
@ -1192,21 +1198,20 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()}); this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()});
} }
} catch (final Exception e) { } catch (final Exception re) {
logger.error("{} Failed to request account due to {}", this, e.toString()); logger.error("{} Failed to request account due to {}", this, re.toString());
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.error("", e); logger.error("", re);
} }
} }
*/
authorizationIssue = e.getDescription();
authorizationIssue = e.getDescription();
} else if (e.getResponseCode() == FORBIDDEN_STATUS_CODE) { } else if (e.getResponseCode() == FORBIDDEN_STATUS_CODE) {
authorizationIssue = e.getDescription(); authorizationIssue = e.getDescription();
} else { } else {
final String message = e.getDescription(); final String message = e.getDescription();
logger.warn("{} When communicating with remote instance, got unexpected result. {}", logger.warn("{} When communicating with remote instance, got unexpected result. {}",
new Object[]{this, e.getMessage()}); new Object[]{this, message});
authorizationIssue = "Unable to determine Site-to-Site availability."; authorizationIssue = "Unable to determine Site-to-Site availability.";
} }
} }

View File

@ -1,218 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.util;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.MediaType;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.entity.ControllerEntity;
import org.apache.nifi.web.util.WebUtils;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
/**
*
*/
public class RemoteProcessGroupUtils {
public static final String CONTROLLER_URI_PATH = "/controller";
private static final int CONNECT_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
private final Client client;
public RemoteProcessGroupUtils(final SSLContext sslContext) {
this.client = getClient(sslContext);
}
/**
* Gets the content at the specified URI.
*
* @param uri the URI to get the content of
* @param timeoutMillis the period of time to wait
* @return the response of the request
* @throws ClientHandlerException issues handling the response
* @throws UniformInterfaceException issues with the request
*/
public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
return get(uri, timeoutMillis, null);
}
/**
* Gets the content at the specified URI using the given query parameters.
*
* @param uri the uri to get the content of
* @param timeoutMillis the period of time to wait for a response
* @param queryParams the query params of the request
* @return the client response of the request
* @throws ClientHandlerException issues with the response
* @throws UniformInterfaceException issues with the request
*/
public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
if (queryParams != null) {
for (final Map.Entry<String, String> queryEntry : queryParams.entrySet()) {
webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue());
}
}
webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
}
/**
* Performs a HEAD request to the specified URI.
*
* @param uri the uri to request the head of
* @param timeoutMillis the period of time to wait for a reponse
* @return the client response
* @throws ClientHandlerException issues with the request
* @throws UniformInterfaceException issues with the response
*/
public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException {
// perform the request
WebResource webResource = client.resource(uri);
webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis);
webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis);
return webResource.head();
}
/**
* Gets a client for the given sslContext
*
* @param sslContext the ssl context to get a client for
* @return the client
*/
private Client getClient(final SSLContext sslContext) {
final Client client;
if (sslContext == null) {
client = WebUtils.createClient(null);
} else {
client = WebUtils.createClient(null, sslContext);
}
client.setReadTimeout(READ_TIMEOUT);
client.setConnectTimeout(CONNECT_TIMEOUT);
return client;
}
/**
* Returns the port on which the remote instance is listening for Flow File
* transfers, or <code>null</code> if the remote instance is not configured
* to use Site-to-Site transfers.
*
* @param uri the base URI of the remote instance. This should include the
* path only to the nifi-api level, as well as the protocol, host, and port.
* @param timeoutMillis the period of time to wait for the port
* @return the port
* @throws IOException if an error occurs getting the remote port
* information
*/
public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteListeningPort(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getRemoteRootGroupId(uriObject, timeoutMillis);
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
try {
final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
return getController(uriObject, timeoutMillis).getInstanceId();
} catch (URISyntaxException e) {
throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
}
}
/**
* Returns the port on which the remote instance is listening for Flow File
* transfers, or <code>null</code> if the remote instance is not configured
* to use Site-to-Site transfers.
*
* @param uri the full URI to fetch, including the path.
* @return the remote listening port
* @throws IOException if unable to get port
*/
private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
}
private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
return getController(uri, timeoutMillis).getId();
}
private ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
final ClientResponse response = get(uri, timeoutMillis);
if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
return entity.getController();
} else {
final String responseMessage = response.getEntity(String.class);
throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
}
}
/**
* Issues a registration request on behalf of the current user.
*
* @param baseApiUri the URI to issue a request to
* @return the response of the request
*/
public ClientResponse issueRegistrationRequest(String baseApiUri) {
final URI uri = URI.create(String.format("%s/controller/users", baseApiUri));
// set up the query params
MultivaluedMapImpl entity = new MultivaluedMapImpl();
entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
// create the web resource
WebResource webResource = client.resource(uri);
// get the client utils and make the request
return webResource.type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
}
}

View File

@ -509,7 +509,6 @@ public class ControllerResource extends ApplicationResource {
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Path("cluster/nodes/{id}") @Path("cluster/nodes/{id}")
// TODO - @PreAuthorize("hasAnyRole('ROLE_ADMIN')")
@ApiOperation( @ApiOperation(
value = "Updates a node in the cluster", value = "Updates a node in the cluster",
response = NodeEntity.class, response = NodeEntity.class,

View File

@ -208,7 +208,6 @@ public class CountersResource extends ApplicationResource {
@Consumes(MediaType.WILDCARD) @Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Path("{id}") @Path("{id}")
// TODO - @PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation( @ApiOperation(
value = "Updates the specified counter. This will reset the counter value to 0", value = "Updates the specified counter. This will reset the counter value to 0",
notes = NON_GUARANTEED_ENDPOINT, notes = NON_GUARANTEED_ENDPOINT,

View File

@ -235,7 +235,6 @@ public class FlowResource extends ApplicationResource {
@Consumes(MediaType.WILDCARD) @Consumes(MediaType.WILDCARD)
@Produces(MediaType.TEXT_PLAIN) @Produces(MediaType.TEXT_PLAIN)
@Path("client-id") @Path("client-id")
// TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@ApiOperation( @ApiOperation(
value = "Generates a client id.", value = "Generates a client id.",
response = String.class, response = String.class,

View File

@ -16,10 +16,6 @@
*/ */
package org.apache.nifi.web.api; package org.apache.nifi.web.api;
import java.net.InetAddress;
import java.net.UnknownHostException;
import com.wordnik.swagger.annotations.Api; import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponse;
@ -60,6 +56,8 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -173,7 +171,6 @@ public class SiteToSiteResource extends ApplicationResource {
@Path("/peers") @Path("/peers")
@Consumes(MediaType.WILDCARD) @Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
// TODO: @PreAuthorize("hasRole('ROLE_NIFI')")
@ApiOperation( @ApiOperation(
value = "Returns the available Peers and its status of this NiFi", value = "Returns the available Peers and its status of this NiFi",
response = PeersEntity.class, response = PeersEntity.class,