NIFI-3963: - Ensuring the RemoteGroupPort yields when the details cannot be refreshed from any of the configured remote instances.

This closes #1853.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Gilman 2017-05-24 14:28:51 -04:00 committed by Bryan Bende
parent ae3db82303
commit f97b3fe455
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 90 additions and 44 deletions

View File

@ -16,9 +16,32 @@
*/
package org.apache.nifi.remote.client.socket;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@ -44,31 +67,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
public class EndpointConnectionPool implements PeerStatusProvider {
@ -157,7 +157,13 @@ public class EndpointConnectionPool implements PeerStatusProvider {
SocketClientProtocol protocol = null;
EndpointConnection connection;
Peer peer = null;
URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
final URI clusterUrl;
try {
clusterUrl = siteInfoProvider.getActiveClusterUrl();
} catch (final IOException ioe) {
throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", ioe);
}
do {
final List<EndpointConnection> addBack = new ArrayList<>();

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.exception;
/**
* UnreachableClusterException when none of the target clusterUrls are reachable.
*/
public class UnreachableClusterException extends ProtocolException {
private static final long serialVersionUID = 6147433671708846798L;
public UnreachableClusterException(final String message) {
super(message);
}
public UnreachableClusterException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -16,22 +16,6 @@
*/
package org.apache.nifi.remote;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@ -56,6 +40,7 @@ import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@ -69,6 +54,21 @@ import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class StandardRemoteGroupPort extends RemoteGroupPort {
private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis
@ -239,6 +239,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.rollback();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final UnreachableClusterException e) {
context.yield();
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
session.rollback();
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final IOException e) {
// we do not yield here because the 'peer' will be penalized, and we won't communicate with that particular nifi instance
// for a while due to penalization, but we can continue to talk to other nifi instances