NIFI-2459: Site-to-Site bootstrap node failure

Refresh remote peer statuses even if the bootstrap node goes down.

Migrate existing code which handles the situation from
EndpointConnectionPool to PeerSelector, so that both RAW and HTTP
transport protocol has the same capability.

This closes #927.
This commit is contained in:
Koji Kawamura 2016-08-24 17:59:25 +09:00 committed by Mark Payne
parent 1a9d505b4e
commit a3586e04d9
5 changed files with 216 additions and 57 deletions

View File

@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
@ -61,6 +62,7 @@ public class PeerSelector {
private final ReentrantLock peerRefreshLock = new ReentrantLock();
private volatile List<PeerStatus> peerStatuses;
private volatile Set<PeerStatus> lastFetchedQueryablePeers;
private volatile long peerRefreshTime = 0L;
private final AtomicLong peerIndex = new AtomicLong(0L);
private volatile PeerStatusCache peerStatusCache;
@ -71,6 +73,22 @@ public class PeerSelector {
private final PeerStatusProvider peerStatusProvider;
private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
static class SystemTime {
long currentTimeMillis() {
return System.currentTimeMillis();
}
}
private SystemTime systemTime = new SystemTime();
/**
* Replace the SystemTime instance.
* This method is purely used by unit testing, to emulate peer refresh period.
*/
void setSystemTime(final SystemTime systemTime) {
logger.info("Replacing systemTime instance to {}.", systemTime);
this.systemTime = systemTime;
}
public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) {
this.peerStatusProvider = peerStatusProvider;
this.persistenceFile = persistenceFile;
@ -213,13 +231,13 @@ public class PeerSelector {
expiration = Long.valueOf(0L);
}
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
}
public boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
}
public void clear() {
@ -227,7 +245,7 @@ public class PeerSelector {
}
private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
}
/**
@ -258,7 +276,7 @@ public class PeerSelector {
}
this.peerStatuses = peerList;
peerRefreshTime = System.currentTimeMillis();
peerRefreshTime = systemTime.currentTimeMillis();
}
} finally {
peerRefreshLock.unlock();
@ -305,7 +323,7 @@ public class PeerSelector {
return null;
}
if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
if (cache.getTimestamp() + PEER_CACHE_MILLIS < systemTime.currentTimeMillis()) {
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
for (final PeerStatus status : cache.getStatuses()) {
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
@ -320,12 +338,12 @@ public class PeerSelector {
public void refreshPeers() {
final PeerStatusCache existingCache = peerStatusCache;
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
return;
}
try {
final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses();
final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
persistPeerStatuses(statuses);
peerStatusCache = new PeerStatusCache(statuses);
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
@ -340,4 +358,43 @@ public class PeerSelector {
public void setEventReporter(EventReporter eventReporter) {
this.eventReporter = eventReporter;
}
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
// Look at all of the peers that we fetched last time.
final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
if (lastFetched != null && !lastFetched.isEmpty()) {
lastFetched.stream().map(peer -> peer.getPeerDescription())
.forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
}
// Always add the configured node info to the list of peers to communicate with
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
Exception lastFailure = null;
for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
try {
final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
lastFetchedQueryablePeers = statuses.stream()
.filter(p -> p.isQueryForPeers())
.collect(Collectors.toSet());
return statuses;
} catch (final Exception e) {
logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}",
peerDescription.getHostname(), peerDescription.getPort(), e.toString());
lastFailure = e;
}
}
final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
if (lastFailure != null) {
ioe.addSuppressed(lastFailure);
}
throw ioe;
}
}

View File

@ -16,12 +16,45 @@
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import java.io.IOException;
import java.util.Set;
/**
* This interface defines methods used from {@link PeerSelector}.
*/
public interface PeerStatusProvider {
Set<PeerStatus> fetchRemotePeerStatuses() throws IOException;
/**
* <p>
* Returns a PeerDescription instance, which represents a bootstrap remote NiFi node.
* The bootstrap node is always used to fetch remote peer statuses.
* </p>
* <p>
* Once the PeerSelector successfully got remote peer statuses, it periodically fetches remote peer statuses,
* so that it can detect remote NiFi cluster topology changes such as addition or removal of nodes.
* To refresh remote peer statuses, PeerSelector calls {@link #fetchRemotePeerStatuses} with one of query-able nodes
* lastly fetched from the remote NiFi cluster, until it gets a successful result,
* or throws IOException if none of them responds successfully.
* </p>
* <p>
* This mechanism lets PeerSelector works even if the bootstrap remote NiFi node goes down.
* </p>
* @return peer description of a bootstrap remote NiFi node
* @throws IOException thrown when it fails to retrieve the bootstrap remote node information
*/
PeerDescription getBootstrapPeerDescription() throws IOException;
/**
* Fetch peer statuses from a remote NiFi cluster.
* Implementation of this method should fetch peer statuses from the node
* represented by the passed PeerDescription using its transport protocol.
* @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully
* @return Remote peer statuses
* @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer
*/
Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException;
}

View File

@ -84,12 +84,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
}
@Override
public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
public PeerDescription getBootstrapPeerDescription() throws IOException {
if (siteInfoProvider.getSiteToSiteHttpPort() == null) {
throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications");
}
final String scheme = siteInfoProvider.isSecure() ? "https" : "http";
final URI clusterUrl;
try {
clusterUrl = new URI(config.getUrl());
@ -97,8 +96,15 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
}
return new PeerDescription(clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure());
}
@Override
public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
// Each node should has the same URL structure and network reach-ability with the proxy configuration.
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
final String scheme = peerDescription.isSecure() ? "https" : "http";
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
apiClient.setConnectTimeoutMillis(timeoutMillis);

View File

@ -41,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
@ -360,7 +359,20 @@ public class EndpointConnectionPool implements PeerStatusProvider {
}
}
private Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
@Override
public PeerDescription getBootstrapPeerDescription() throws IOException {
final String hostname = clusterUrl.getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
}
final boolean secure = siteInfoProvider.isSecure();
return new PeerDescription(hostname, port, secure);
}
@Override
public Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
final String hostname = peerDescription.getHostname();
final int port = peerDescription.getPort();
@ -414,50 +426,6 @@ public class EndpointConnectionPool implements PeerStatusProvider {
return peerStatuses;
}
@Override
public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
// Look at all of the peers that we fetched last time.
final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
if (lastFetched != null && !lastFetched.isEmpty()) {
lastFetched.stream().map(peer -> peer.getPeerDescription())
.forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
}
// Always add the configured node info to the list of peers to communicate with
final String hostname = clusterUrl.getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
}
final boolean secure = siteInfoProvider.isSecure();
peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure));
Exception lastFailure = null;
for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
try {
final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peerDescription);
lastFetchedQueryablePeers = statuses.stream()
.filter(p -> p.isQueryForPeers())
.collect(Collectors.toSet());
return statuses;
} catch (final Exception e) {
logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort());
lastFailure = e;
}
}
final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
if (lastFailure != null) {
ioe.addSuppressed(lastFailure);
}
throw ioe;
}
private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
final PeerDescription description = peerStatus.getPeerDescription();
return establishSiteToSiteConnection(description.getHostname(), description.getPort());

View File

@ -29,11 +29,19 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
public class TestPeerSelector {
@ -122,4 +130,91 @@ public class TestPeerSelector {
logger.info("selectedCounts={}", selectedCounts);
assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasLittle"));
}
private static class UnitTestSystemTime extends PeerSelector.SystemTime {
private long offset = 0;
@Override
long currentTimeMillis() {
return super.currentTimeMillis() + offset;
}
}
/**
* This test simulates a failure scenario of a remote NiFi cluster. It confirms that:
* <ol>
* <li>PeerSelector uses the bootstrap node to fetch remote peer statuses at the initial attempt</li>
* <li>PeerSelector uses one of query-able nodes lastly fetched successfully</li>
* <li>PeerSelector can refresh remote peer statuses even if the bootstrap node is down</li>
* <li>PeerSelector returns null as next peer when there's no peer available</li>
* <li>PeerSelector always tries to fetch peer statuses at least from the bootstrap node, so that it can
* recover when the node gets back online</li>
* </ol>
*/
@Test
public void testFetchRemotePeerStatuses() throws IOException {
final Set<PeerStatus> peerStatuses = new HashSet<>();
final PeerDescription bootstrapNode = new PeerDescription("Node1", 1111, true);
final PeerDescription node2 = new PeerDescription("Node2", 2222, true);
final PeerStatus bootstrapNodeStatus = new PeerStatus(bootstrapNode, 10, true);
final PeerStatus node2Status = new PeerStatus(node2, 10, true);
peerStatuses.add(bootstrapNodeStatus);
peerStatuses.add(node2Status);
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final UnitTestSystemTime systemTime = new UnitTestSystemTime();
peerSelector.setSystemTime(systemTime);
doReturn(bootstrapNode).when(peerStatusProvider).getBootstrapPeerDescription();
doAnswer(invocation -> {
final PeerDescription peerFetchStatusesFrom = invocation.getArgumentAt(0, PeerDescription.class);
if (peerStatuses.stream().filter(ps -> ps.getPeerDescription().equals(peerFetchStatusesFrom)).collect(Collectors.toSet()).size() > 0) {
// If the remote peer is running, then return available peer statuses.
return peerStatuses;
}
throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
// 1st attempt. It uses the bootstrap node.
peerSelector.refreshPeers();
PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
// Proceed time so that peer selector refresh statuses.
peerStatuses.remove(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
// 2nd attempt.
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
// Proceed time so that peer selector refresh statuses.
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
// 3rd attempt.
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
// Remove node2 to simulate that it goes down. There's no available node at this point.
peerStatuses.remove(node2Status);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus);
// Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
peerStatuses.add(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription());
}
}