diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index bb5efd7af6..d3fb41f412 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -54,6 +54,7 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; import javax.security.cert.CertificateExpiredException; import javax.security.cert.CertificateNotYetValidException; +import javax.ws.rs.core.Response; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -88,7 +89,6 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.entity.ControllerEntity; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +110,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status"; public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + // status codes + public static final int OK_STATUS_CODE = Status.OK.getStatusCode(); + public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode(); + public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode(); + private final String id; private final URI targetUri; @@ -860,7 +865,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try { // perform the request final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - if (!Status.OK.equals(response.getClientResponseStatus())) { + + if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) { writeLock.lock(); try { for (final Iterator iter = inputPorts.values().iterator(); iter.hasNext();) { @@ -882,7 +888,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // consume the entity entirely response.getEntity(String.class); - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase()); + throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase()); } final ControllerEntity entity = response.getEntity(ControllerEntity.class); @@ -1303,56 +1309,54 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try { final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - switch (response.getClientResponseStatus()) { - case OK: - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - final ControllerDTO dto = entity.getController(); + + final int statusCode = response.getStatus(); + + if ( statusCode == OK_STATUS_CODE ) { + final ControllerEntity entity = response.getEntity(ControllerEntity.class); + final ControllerDTO dto = entity.getController(); - if (dto.getRemoteSiteListeningPort() == null) { - authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time."; + if (dto.getRemoteSiteListeningPort() == null) { + authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time."; + } else { + authorizationIssue = null; + } + + writeLock.lock(); + try { + listeningPort = dto.getRemoteSiteListeningPort(); + destinationSecure = dto.isSiteToSiteSecure(); + } finally { + writeLock.unlock(); + } + + final String remoteInstanceId = dto.getInstanceId(); + boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); + pointsToCluster.set(isPointingToCluster); + } else if ( statusCode == UNAUTHORIZED_STATUS_CODE ) { + try { + final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); + if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()) ) { + logger.info("{} Issued a Request to communicate with remote instance", this); } else { - authorizationIssue = null; + logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ + this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()}); } - - writeLock.lock(); - try { - listeningPort = dto.getRemoteSiteListeningPort(); - destinationSecure = dto.isSiteToSiteSecure(); - } finally { - writeLock.unlock(); + } catch (final Exception e) { + logger.error("{} Failed to request account due to {}", this, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); } + } - final String remoteInstanceId = dto.getInstanceId(); - boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); - pointsToCluster.set(isPointingToCluster); - break; - case UNAUTHORIZED: - try { - final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); - if (requestAccountResponse.getClientResponseStatus() == Status.OK) { - logger.info("{} Issued a Request to communicate with remote instance", this); - } else { - logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ - this, requestAccountResponse.getClientResponseStatus().getStatusCode(), requestAccountResponse.getClientResponseStatus().getReasonPhrase()}); - } - } catch (final Exception e) { - logger.error("{} Failed to request account due to {}", this, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - - authorizationIssue = response.getEntity(String.class); - break; - case FORBIDDEN: - authorizationIssue = response.getEntity(String.class); - break; - default: - final String message = response.getEntity(String.class); - logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", - new Object[]{this, response.getClientResponseStatus().getStatusCode(), response.getClientResponseStatus().getReasonPhrase(), message}); - authorizationIssue = "Unable to determine Site-to-Site availability."; - break; + authorizationIssue = response.getEntity(String.class); + } else if ( statusCode == FORBIDDEN_STATUS_CODE ) { + authorizationIssue = response.getEntity(String.class); + } else { + final String message = response.getEntity(String.class); + logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", + new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message}); + authorizationIssue = "Unable to determine Site-to-Site availability."; } } catch (Exception e) { logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));