From a3586e04d9978e105cc5645e893dc6d77b79b86e Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 24 Aug 2016 17:59:25 +0900 Subject: [PATCH] NIFI-2459: Site-to-Site bootstrap node failure Refresh remote peer statuses even if the bootstrap node goes down. Migrate existing code which handles the situation from EndpointConnectionPool to PeerSelector, so that both RAW and HTTP transport protocol has the same capability. This closes #927. --- .../nifi/remote/client/PeerSelector.java | 71 ++++++++++++-- .../remote/client/PeerStatusProvider.java | 35 ++++++- .../nifi/remote/client/http/HttpClient.java | 12 ++- .../client/socket/EndpointConnectionPool.java | 60 +++--------- .../nifi/remote/client/TestPeerSelector.java | 95 +++++++++++++++++++ 5 files changed, 216 insertions(+), 57 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index e452b0fa12..0ec8951a43 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.nifi.remote.util.EventReportUtil.error; import static org.apache.nifi.remote.util.EventReportUtil.warn; @@ -61,6 +62,7 @@ public class PeerSelector { private final ReentrantLock peerRefreshLock = new ReentrantLock(); private volatile List peerStatuses; + private volatile Set lastFetchedQueryablePeers; private volatile long peerRefreshTime = 0L; private final AtomicLong peerIndex = new AtomicLong(0L); private volatile PeerStatusCache peerStatusCache; @@ -71,6 +73,22 @@ public class PeerSelector { private final PeerStatusProvider peerStatusProvider; private final ConcurrentMap peerTimeoutExpirations = new ConcurrentHashMap<>(); + static class SystemTime { + long currentTimeMillis() { + return System.currentTimeMillis(); + } + } + private SystemTime systemTime = new SystemTime(); + + /** + * Replace the SystemTime instance. + * This method is purely used by unit testing, to emulate peer refresh period. + */ + void setSystemTime(final SystemTime systemTime) { + logger.info("Replacing systemTime instance to {}.", systemTime); + this.systemTime = systemTime; + } + public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) { this.peerStatusProvider = peerStatusProvider; this.persistenceFile = persistenceFile; @@ -213,13 +231,13 @@ public class PeerSelector { expiration = Long.valueOf(0L); } - final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); + final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis); peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration)); } public boolean isPenalized(final PeerStatus peerStatus) { final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); - return (expirationEnd != null && expirationEnd > System.currentTimeMillis()); + return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis()); } public void clear() { @@ -227,7 +245,7 @@ public class PeerSelector { } private boolean isPeerRefreshNeeded(final List peerList) { - return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); + return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); } /** @@ -258,7 +276,7 @@ public class PeerSelector { } this.peerStatuses = peerList; - peerRefreshTime = System.currentTimeMillis(); + peerRefreshTime = systemTime.currentTimeMillis(); } } finally { peerRefreshLock.unlock(); @@ -305,7 +323,7 @@ public class PeerSelector { return null; } - if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { + if (cache.getTimestamp() + PEER_CACHE_MILLIS < systemTime.currentTimeMillis()) { final Set equalizedSet = new HashSet<>(cache.getStatuses().size()); for (final PeerStatus status : cache.getStatuses()) { final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers()); @@ -320,12 +338,12 @@ public class PeerSelector { public void refreshPeers() { final PeerStatusCache existingCache = peerStatusCache; - if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { + if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) { return; } try { - final Set statuses = peerStatusProvider.fetchRemotePeerStatuses(); + final Set statuses = fetchRemotePeerStatuses(); persistPeerStatuses(statuses); peerStatusCache = new PeerStatusCache(statuses); logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); @@ -340,4 +358,43 @@ public class PeerSelector { public void setEventReporter(EventReporter eventReporter) { this.eventReporter = eventReporter; } + + private Set fetchRemotePeerStatuses() throws IOException { + final Set peersToRequestClusterInfoFrom = new HashSet<>(); + + // Look at all of the peers that we fetched last time. + final Set lastFetched = lastFetchedQueryablePeers; + if (lastFetched != null && !lastFetched.isEmpty()) { + lastFetched.stream().map(peer -> peer.getPeerDescription()) + .forEach(desc -> peersToRequestClusterInfoFrom.add(desc)); + } + + // Always add the configured node info to the list of peers to communicate with + peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription()); + + logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom); + Exception lastFailure = null; + for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) { + try { + final Set statuses = peerStatusProvider.fetchRemotePeerStatuses(peerDescription); + lastFetchedQueryablePeers = statuses.stream() + .filter(p -> p.isQueryForPeers()) + .collect(Collectors.toSet()); + + return statuses; + } catch (final Exception e) { + logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}", + peerDescription.getHostname(), peerDescription.getPort(), e.toString()); + lastFailure = e; + } + } + + final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster"); + if (lastFailure != null) { + ioe.addSuppressed(lastFailure); + } + + throw ioe; + } + } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java index 68c30af14a..817bccf071 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java @@ -16,12 +16,45 @@ */ package org.apache.nifi.remote.client; +import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; import java.io.IOException; import java.util.Set; +/** + * This interface defines methods used from {@link PeerSelector}. + */ public interface PeerStatusProvider { - Set fetchRemotePeerStatuses() throws IOException; + /** + *

+ * Returns a PeerDescription instance, which represents a bootstrap remote NiFi node. + * The bootstrap node is always used to fetch remote peer statuses. + *

+ *

+ * Once the PeerSelector successfully got remote peer statuses, it periodically fetches remote peer statuses, + * so that it can detect remote NiFi cluster topology changes such as addition or removal of nodes. + * To refresh remote peer statuses, PeerSelector calls {@link #fetchRemotePeerStatuses} with one of query-able nodes + * lastly fetched from the remote NiFi cluster, until it gets a successful result, + * or throws IOException if none of them responds successfully. + *

+ *

+ * This mechanism lets PeerSelector works even if the bootstrap remote NiFi node goes down. + *

+ * @return peer description of a bootstrap remote NiFi node + * @throws IOException thrown when it fails to retrieve the bootstrap remote node information + */ + PeerDescription getBootstrapPeerDescription() throws IOException; + + /** + * Fetch peer statuses from a remote NiFi cluster. + * Implementation of this method should fetch peer statuses from the node + * represented by the passed PeerDescription using its transport protocol. + * @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully + * @return Remote peer statuses + * @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer + */ + Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException; + } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 3c92acd16f..0f0d4a59de 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -84,12 +84,11 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } @Override - public Set fetchRemotePeerStatuses() throws IOException { + public PeerDescription getBootstrapPeerDescription() throws IOException { if (siteInfoProvider.getSiteToSiteHttpPort() == null) { throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications"); } - final String scheme = siteInfoProvider.isSecure() ? "https" : "http"; final URI clusterUrl; try { clusterUrl = new URI(config.getUrl()); @@ -97,8 +96,15 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e); } + return new PeerDescription(clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure()); + } + + @Override + public Set fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException { + // Each node should has the same URL structure and network reach-ability with the proxy configuration. try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) { - final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort()); + final String scheme = peerDescription.isSecure() ? "https" : "http"; + final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort()); final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); apiClient.setConnectTimeoutMillis(timeoutMillis); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index f90aed9c2b..a17deaabe5 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -41,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import javax.net.ssl.SSLContext; @@ -360,7 +359,20 @@ public class EndpointConnectionPool implements PeerStatusProvider { } } - private Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException { + @Override + public PeerDescription getBootstrapPeerDescription() throws IOException { + final String hostname = clusterUrl.getHost(); + final Integer port = siteInfoProvider.getSiteToSitePort(); + if (port == null) { + throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); + } + + final boolean secure = siteInfoProvider.isSecure(); + return new PeerDescription(hostname, port, secure); + } + + @Override + public Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException { final String hostname = peerDescription.getHostname(); final int port = peerDescription.getPort(); @@ -414,50 +426,6 @@ public class EndpointConnectionPool implements PeerStatusProvider { return peerStatuses; } - @Override - public Set fetchRemotePeerStatuses() throws IOException { - final Set peersToRequestClusterInfoFrom = new HashSet<>(); - - // Look at all of the peers that we fetched last time. - final Set lastFetched = lastFetchedQueryablePeers; - if (lastFetched != null && !lastFetched.isEmpty()) { - lastFetched.stream().map(peer -> peer.getPeerDescription()) - .forEach(desc -> peersToRequestClusterInfoFrom.add(desc)); - } - - // Always add the configured node info to the list of peers to communicate with - final String hostname = clusterUrl.getHost(); - final Integer port = siteInfoProvider.getSiteToSitePort(); - if (port == null) { - throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); - } - - final boolean secure = siteInfoProvider.isSecure(); - peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure)); - - Exception lastFailure = null; - for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) { - try { - final Set statuses = fetchRemotePeerStatuses(peerDescription); - lastFetchedQueryablePeers = statuses.stream() - .filter(p -> p.isQueryForPeers()) - .collect(Collectors.toSet()); - - return statuses; - } catch (final Exception e) { - logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort()); - lastFailure = e; - } - } - - final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster"); - if (lastFailure != null) { - ioe.addSuppressed(lastFailure); - } - - throw ioe; - } - private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { final PeerDescription description = peerStatus.getPeerDescription(); return establishSiteToSiteConnection(description.getHostname(), description.getPort()); diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index 4c0f0d6074..c434c7b4cd 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -29,11 +29,19 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.reducing; import static java.util.stream.Collectors.toMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; public class TestPeerSelector { @@ -122,4 +130,91 @@ public class TestPeerSelector { logger.info("selectedCounts={}", selectedCounts); assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasLittle")); } + + private static class UnitTestSystemTime extends PeerSelector.SystemTime { + private long offset = 0; + + @Override + long currentTimeMillis() { + return super.currentTimeMillis() + offset; + } + } + + /** + * This test simulates a failure scenario of a remote NiFi cluster. It confirms that: + *
    + *
  1. PeerSelector uses the bootstrap node to fetch remote peer statuses at the initial attempt
  2. + *
  3. PeerSelector uses one of query-able nodes lastly fetched successfully
  4. + *
  5. PeerSelector can refresh remote peer statuses even if the bootstrap node is down
  6. + *
  7. PeerSelector returns null as next peer when there's no peer available
  8. + *
  9. PeerSelector always tries to fetch peer statuses at least from the bootstrap node, so that it can + * recover when the node gets back online
  10. + *
+ */ + @Test + public void testFetchRemotePeerStatuses() throws IOException { + + final Set peerStatuses = new HashSet<>(); + final PeerDescription bootstrapNode = new PeerDescription("Node1", 1111, true); + final PeerDescription node2 = new PeerDescription("Node2", 2222, true); + final PeerStatus bootstrapNodeStatus = new PeerStatus(bootstrapNode, 10, true); + final PeerStatus node2Status = new PeerStatus(node2, 10, true); + peerStatuses.add(bootstrapNodeStatus); + peerStatuses.add(node2Status); + + final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); + final PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); + final UnitTestSystemTime systemTime = new UnitTestSystemTime(); + peerSelector.setSystemTime(systemTime); + + doReturn(bootstrapNode).when(peerStatusProvider).getBootstrapPeerDescription(); + doAnswer(invocation -> { + final PeerDescription peerFetchStatusesFrom = invocation.getArgumentAt(0, PeerDescription.class); + if (peerStatuses.stream().filter(ps -> ps.getPeerDescription().equals(peerFetchStatusesFrom)).collect(Collectors.toSet()).size() > 0) { + // If the remote peer is running, then return available peer statuses. + return peerStatuses; + } + throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running."); + }).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class)); + + // 1st attempt. It uses the bootstrap node. + peerSelector.refreshPeers(); + PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); + + // Proceed time so that peer selector refresh statuses. + peerStatuses.remove(bootstrapNodeStatus); + systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; + + // 2nd attempt. + peerSelector.refreshPeers(); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); + + // Proceed time so that peer selector refresh statuses. + systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; + + // 3rd attempt. + peerSelector.refreshPeers(); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNotNull(peerStatus); + assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription()); + + // Remove node2 to simulate that it goes down. There's no available node at this point. + peerStatuses.remove(node2Status); + systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; + + peerSelector.refreshPeers(); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus); + + // Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node. + peerStatuses.add(bootstrapNodeStatus); + systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1; + + peerSelector.refreshPeers(); + peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE); + assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription()); + } }