From f97b3fe455e7cf8bd051c7a40f3e3f7ee1d1a16a Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Wed, 24 May 2017 14:28:51 -0400 Subject: [PATCH] NIFI-3963: - Ensuring the RemoteGroupPort yields when the details cannot be refreshed from any of the configured remote instances. This closes #1853. Signed-off-by: Bryan Bende --- .../client/socket/EndpointConnectionPool.java | 62 ++++++++++--------- .../UnreachableClusterException.java | 33 ++++++++++ .../nifi/remote/StandardRemoteGroupPort.java | 39 +++++++----- 3 files changed, 90 insertions(+), 44 deletions(-) create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 926e4b4d75..a360c873de 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -16,9 +16,32 @@ */ package org.apache.nifi.remote.client.socket; -import static org.apache.nifi.remote.util.EventReportUtil.error; -import static org.apache.nifi.remote.util.EventReportUtil.warn; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.PeerSelector; +import org.apache.nifi.remote.client.PeerStatusProvider; +import org.apache.nifi.remote.client.SiteInfoProvider; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.exception.UnreachableClusterException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -44,31 +67,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; - -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerDescription; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.RemoteDestination; -import org.apache.nifi.remote.RemoteResourceInitiator; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.PeerSelector; -import org.apache.nifi.remote.client.PeerStatusProvider; -import org.apache.nifi.remote.client.SiteInfoProvider; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.TransmissionDisabledException; -import org.apache.nifi.remote.exception.UnknownPortException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.nifi.remote.util.EventReportUtil.error; +import static org.apache.nifi.remote.util.EventReportUtil.warn; public class EndpointConnectionPool implements PeerStatusProvider { @@ -157,7 +157,13 @@ public class EndpointConnectionPool implements PeerStatusProvider { SocketClientProtocol protocol = null; EndpointConnection connection; Peer peer = null; - URI clusterUrl = siteInfoProvider.getActiveClusterUrl(); + + final URI clusterUrl; + try { + clusterUrl = siteInfoProvider.getActiveClusterUrl(); + } catch (final IOException ioe) { + throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe); + } do { final List addBack = new ArrayList<>(); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java new file mode 100644 index 0000000000..5e9452fb30 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnreachableClusterException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +/** + * UnreachableClusterException when none of the target clusterUrls are reachable. + */ +public class UnreachableClusterException extends ProtocolException { + + private static final long serialVersionUID = 6147433671708846798L; + + public UnreachableClusterException(final String message) { + super(message); + } + + public UnreachableClusterException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index b1288f3d36..1be5b42214 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.remote; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.ssl.SSLContext; - import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -56,6 +40,7 @@ import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.exception.UnreachableClusterException; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; @@ -69,6 +54,21 @@ import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + public class StandardRemoteGroupPort extends RemoteGroupPort { private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis @@ -239,6 +239,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { session.rollback(); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; + } catch (final UnreachableClusterException e) { + context.yield(); + final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString()); + logger.error(message); + session.rollback(); + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); + return; } catch (final IOException e) { // we do not yield here because the 'peer' will be penalized, and we won't communicate with that particular nifi instance // for a while due to penalization, but we can continue to talk to other nifi instances