NIFI-161: Removed references to deprecated methods

This commit is contained in:
Mark Payne 2014-12-11 09:44:38 -05:00
parent cbea1f1936
commit 74c7940487
1 changed files with 52 additions and 48 deletions

View File

@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.security.cert.CertificateExpiredException; import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException; import javax.security.cert.CertificateNotYetValidException;
import javax.ws.rs.core.Response;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
@ -88,7 +89,6 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.entity.ControllerEntity; import org.apache.nifi.web.api.entity.ControllerEntity;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -110,6 +110,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status"; public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
// status codes
public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode();
public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode();
private final String id; private final String id;
private final URI targetUri; private final URI targetUri;
@ -860,7 +865,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try { try {
// perform the request // perform the request
final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS)); final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS));
if (!Status.OK.equals(response.getClientResponseStatus())) {
if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) {
writeLock.lock(); writeLock.lock();
try { try {
for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) { for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) {
@ -882,7 +888,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// consume the entity entirely // consume the entity entirely
response.getEntity(String.class); response.getEntity(String.class);
throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase()); throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase());
} }
final ControllerEntity entity = response.getEntity(ControllerEntity.class); final ControllerEntity entity = response.getEntity(ControllerEntity.class);
@ -1303,56 +1309,54 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try { try {
final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
switch (response.getClientResponseStatus()) {
case OK:
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
final ControllerDTO dto = entity.getController();
if (dto.getRemoteSiteListeningPort() == null) { final int statusCode = response.getStatus();
authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time.";
if ( statusCode == OK_STATUS_CODE ) {
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
final ControllerDTO dto = entity.getController();
if (dto.getRemoteSiteListeningPort() == null) {
authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time.";
} else {
authorizationIssue = null;
}
writeLock.lock();
try {
listeningPort = dto.getRemoteSiteListeningPort();
destinationSecure = dto.isSiteToSiteSecure();
} finally {
writeLock.unlock();
}
final String remoteInstanceId = dto.getInstanceId();
boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
pointsToCluster.set(isPointingToCluster);
} else if ( statusCode == UNAUTHORIZED_STATUS_CODE ) {
try {
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()) ) {
logger.info("{} Issued a Request to communicate with remote instance", this);
} else { } else {
authorizationIssue = null; logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()});
} }
} catch (final Exception e) {
writeLock.lock(); logger.error("{} Failed to request account due to {}", this, e.toString());
try { if (logger.isDebugEnabled()) {
listeningPort = dto.getRemoteSiteListeningPort(); logger.error("", e);
destinationSecure = dto.isSiteToSiteSecure();
} finally {
writeLock.unlock();
} }
}
final String remoteInstanceId = dto.getInstanceId(); authorizationIssue = response.getEntity(String.class);
boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); } else if ( statusCode == FORBIDDEN_STATUS_CODE ) {
pointsToCluster.set(isPointingToCluster); authorizationIssue = response.getEntity(String.class);
break; } else {
case UNAUTHORIZED: final String message = response.getEntity(String.class);
try { logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}",
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
if (requestAccountResponse.getClientResponseStatus() == Status.OK) { authorizationIssue = "Unable to determine Site-to-Site availability.";
logger.info("{} Issued a Request to communicate with remote instance", this);
} else {
logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
this, requestAccountResponse.getClientResponseStatus().getStatusCode(), requestAccountResponse.getClientResponseStatus().getReasonPhrase()});
}
} catch (final Exception e) {
logger.error("{} Failed to request account due to {}", this, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
authorizationIssue = response.getEntity(String.class);
break;
case FORBIDDEN:
authorizationIssue = response.getEntity(String.class);
break;
default:
final String message = response.getEntity(String.class);
logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}",
new Object[]{this, response.getClientResponseStatus().getStatusCode(), response.getClientResponseStatus().getReasonPhrase(), message});
authorizationIssue = "Unable to determine Site-to-Site availability.";
break;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));