NIFI-7467 Refactored S2S peer selection logic.

Removed list structure for peer selection as it was unnecessary and often wasteful (most clusters are 3 - 7 nodes, the list was always 128 elements).
Changed integer percentages to double to allow for better normalization.
Removed 80% cap on remote peers as it was due to legacy requirements.
Added unit tests for non-deterministic distribution calculations.
Added unit tests for edge cases due to rounding errors, single valid remotes, unbalanced clusters, and peer queue consecutive selection tracking.
Migrated all legacy PeerSelector unit tests to new API.
Removed unused System time manipulation as tests no longer need it.
Added class-level Javadoc to PeerSelector.
Removed S2S details request replication, as the responses were not being merged, which led to incorrect ports being returned and breaking S2S peer retrieval.
Fixed copy/paste error where input ports were being listed as output ports during remote flow refresh.
Fixed comments and added unbalanced cluster test scenarios.
Removed unnecessary marker interface.
Removed commented code.
Changed weighting & penalization behavior.
Changed dependency scope to test.

This closes #4289.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Andy LoPresto 2020-05-18 22:33:05 -07:00
parent dfefeb7b18
commit 845b66ab92
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
16 changed files with 1849 additions and 820 deletions

View File

@ -76,6 +76,15 @@
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.remote;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
public class PeerDescription {
private final String hostname;
@ -28,6 +30,12 @@ public class PeerDescription {
this.secure = secure;
}
public PeerDescription(final PeerDTO peerDTO) {
this.hostname = peerDTO.getHostname();
this.port = peerDTO.getPort();
this.secure = peerDTO.isSecure();
}
public String getHostname() {
return hostname;
}

View File

@ -16,6 +16,15 @@
*/
package org.apache.nifi.remote;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
/**
* This class represents the state of a specific peer, both its identifying information (contained
* in the {@link PeerDescription}: hostname, port, and security) and its current status (number of
* flowfiles and whether it can query other peers for status). Equality is only based on the
* identifying information, so when iterating over multiple PeerStatus objects, more recent statuses
* will replace previously acquired statuses for a specific peer.
*/
public class PeerStatus {
private final PeerDescription description;
@ -28,6 +37,17 @@ public class PeerStatus {
this.queryForPeers = queryForPeers;
}
/**
* Copy constructor from a {@link PeerDTO}. {@link #isQueryForPeers()} is hard-coded to {@code true}.
*
* @param peerDTO the peer DTO object with hostname, port, security, and flowfile count
*/
public PeerStatus(final PeerDTO peerDTO) {
this.description = new PeerDescription(peerDTO);
this.numFlowFiles = peerDTO.getFlowFileCount();
this.queryForPeers = true;
}
public PeerDescription getPeerDescription() {
return description;
}

View File

@ -16,6 +16,26 @@
*/
package org.apache.nifi.remote.client;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@ -26,343 +46,522 @@ import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
/**
* Service which maintains state around peer (NiFi node(s) in a remote instance (cluster or
* standalone)). There is an internal cache which stores identifying information about each
* node and the current workload of each in number of flowfiles being processed. Individual
* nodes can be penalized for an amount of time (see {@link #penalize(Peer, long)}) to avoid
* sending/receiving data from them. Attempts are made to balance communications ("busier"
* nodes will {@code TransferDirection.SEND} more and {@code TransferDirection.RECEIVE} fewer
* flowfiles from this instance).
*/
public class PeerSelector {
private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
// The timeout for the peer status cache
private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
private static final long PEER_REFRESH_PERIOD = 60000L;
private final ReentrantLock peerRefreshLock = new ReentrantLock();
private volatile List<PeerStatus> peerStatuses;
private volatile Set<PeerStatus> lastFetchedQueryablePeers;
private volatile long peerRefreshTime = 0L;
private final AtomicLong peerIndex = new AtomicLong(0L);
private volatile PeerStatusCache peerStatusCache;
// The service which saves the peer state to persistent storage
private final PeerPersistence peerPersistence;
// The service which retrieves peer state
private final PeerStatusProvider peerStatusProvider;
// Maps the peer description to a millisecond penalty expiration
private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap<>();
// The most recently fetched peer statuses
private volatile PeerStatusCache peerStatusCache;
private EventReporter eventReporter;
private final PeerStatusProvider peerStatusProvider;
private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
static class SystemTime {
long currentTimeMillis() {
return System.currentTimeMillis();
}
}
private SystemTime systemTime = new SystemTime();
/**
* Replace the SystemTime instance.
* This method is purely used by unit testing, to emulate peer refresh period.
* Returns a peer selector with the provided collaborators.
*
* @param peerStatusProvider the service which retrieves peer state
* @param peerPersistence the service which persists peer state
*/
void setSystemTime(final SystemTime systemTime) {
logger.info("Replacing systemTime instance to {}.", systemTime);
this.systemTime = systemTime;
}
public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
this.peerStatusProvider = peerStatusProvider;
this.peerPersistence = peerPersistence;
// On instantiation, retrieve the peer status cache
restoreInitialPeerStatusCache();
}
/**
* Populates the peer status cache from the peer persistence provider (e.g. the file system or
* persisted cluster state). If this fails, it will log a warning and continue, as it is not
* required for startup. If the cached protocol differs from the currently configured protocol,
* the cache will be cleared.
*/
private void restoreInitialPeerStatusCache() {
try {
PeerStatusCache restoredPeerStatusCache = null;
if (peerPersistence != null) {
restoredPeerStatusCache = peerPersistence.restore();
// If there is an existing cache, ensure that the protocol matches the current protocol
if (restoredPeerStatusCache != null) {
final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
// If the protocols have changed, clear the cache
if (!currentProtocol.equals(cachedProtocol)) {
logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
restoredPeerStatusCache = null;
}
}
}
this.peerStatusCache = restoredPeerStatusCache;
} catch (final IOException ioe) {
logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
peerPersistence.getClass().getSimpleName(), ioe);
peerPersistence.getClass().getSimpleName(), ioe);
}
}
private void persistPeerStatuses() {
try {
peerPersistence.save(peerStatusCache);
} catch (final IOException e) {
error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
" and the nodes specified at the RPG are down," +
" may be unable to transfer data until communications with those nodes are restored", e.toString());
logger.error("", e);
}
}
List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
final int numDestinations = Math.max(128, statuses.size());
final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
long totalFlowFileCount = 0L;
for (final PeerStatus nodeInfo : statuses) {
totalFlowFileCount += nodeInfo.getFlowFileCount();
/**
* Returns the normalized weight for this ratio of peer flowfiles to total flowfiles and the given direction. The number will be
* a Double between 0 and 100 indicating the percent of all flowfiles the peer
* should send/receive. The transfer direction is <em>from the perspective of this node to the peer</em>
* (i.e. how many flowfiles should <em>this node send</em> to the peer, or how many flowfiles
* should <em>this node receive</em> from the peer).
*
* @param direction the transfer direction ({@code SEND} weights the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights them higher if they have more)
* @param totalFlowFileCount the total flowfile count in the remote instance (standalone or cluster)
* @param flowFileCount the flowfile count for the given peer
* @param peerCount the number of peers in the remote instance
* @return the normalized weight of this peer
*/
private static double calculateNormalizedWeight(TransferDirection direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
// If there is only a single remote, send/receive all data to/from it
if (peerCount == 1) {
return 100;
}
int totalEntries = 0;
for (final PeerStatus nodeInfo : statuses) {
final int flowFileCount = nodeInfo.getFlowFileCount();
// don't allow any node to get more than 80% of the data
final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount));
final double relativeWeighting = (direction == TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
final int entries = Math.max(1, (int) (numDestinations * relativeWeighting));
double cappedPercent;
// If no flowfiles exist in the remote instance, evenly weight each node with 1/N
if (totalFlowFileCount == 0) {
cappedPercent = 1.0 / peerCount;
} else {
final double percentageOfFlowFiles = ((double) flowFileCount / totalFlowFileCount);
cappedPercent = percentageOfFlowFiles;
entryCountMap.put(nodeInfo, Math.max(1, entries));
totalEntries += entries;
}
final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
for (int i = 0; i < totalEntries; i++) {
destinations.add(null);
}
for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
final PeerStatus nodeInfo = entry.getKey();
final int numEntries = entry.getValue();
int skipIndex = numEntries;
for (int i = 0; i < numEntries; i++) {
int n = (skipIndex * i);
while (true) {
final int index = n % destinations.size();
PeerStatus status = destinations.get(index);
if (status == null) {
status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
destinations.set(index, status);
break;
} else {
n++;
}
}
// If sending to the remote, allocate more flowfiles to the less-stressed peers
if (direction == TransferDirection.SEND) {
cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
}
}
return new BigDecimal(cappedPercent * 100).setScale(2, RoundingMode.FLOOR).doubleValue();
}
// Shuffle destinations to provide better distribution.
// Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
// Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
Collections.shuffle(destinations, new Random(0));
/**
* Returns an ordered map of peers sorted in descending order by value (relative weight).
*
* @param unsortedMap the unordered map of peers to weights
* @return the sorted (desc) map (by value)
*/
private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
List<Map.Entry<PeerStatus, Double>> list = new ArrayList<>(unsortedMap.entrySet());
list.sort(Map.Entry.comparingByValue());
final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:");
for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {
final double percentage = entry.getValue() * 100D / destinations.size();
distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
for (int i = list.size() - 1; i >= 0; i--) {
Map.Entry<PeerStatus, Double> entry = list.get(i);
result.put(entry.getKey(), entry.getValue());
}
logger.info(distributionDescription.toString());
// Jumble the list of destinations.
return destinations;
return result;
}
/**
* Prints the distribution of the peers to the logger.
*
* @param sortedPeerWorkloads the peers and relative weights
*/
private static void printDistributionStatistics(Map<PeerStatus, Double> sortedPeerWorkloads, TransferDirection direction) {
if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
DecimalFormat df = new DecimalFormat("##.##");
df.setRoundingMode(RoundingMode.FLOOR);
final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New weighted distribution of nodes:");
for (final Map.Entry<PeerStatus, Double> entry : sortedPeerWorkloads.entrySet()) {
final double percentage = entry.getValue();
distributionDescription.append("\n").append(entry.getKey())
.append(" will").append(direction == TransferDirection.RECEIVE ? " send " : " receive ")
.append(df.format(percentage)).append("% of data");
}
logger.debug(distributionDescription.toString());
}
}
/**
* Returns the total of all values in the map. This method is frequently used to calculate the total number of
* flowfiles in the instance from the respective peer flowfile counts or the total percentage from the relative weights.
*
* @param peerWeightMap the map of peers to flowfile counts or relative weights
* @return the total of the map values
*/
private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
return peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
}
/**
* Resets all penalization states for the peers.
*/
public void clear() {
peerPenaltyExpirations.clear();
}
/**
* Return status of a peer that will be used for the next communication.
* The peers with lower workloads will be selected with higher probability.
*
* @param direction the amount of workload is calculated based on transaction direction,
* for SEND, a peer with fewer flow files is preferred,
* for RECEIVE, a peer with more flow files is preferred
* @return a selected peer, if there is no available peer or all peers are penalized, then return null
*/
public PeerStatus getNextPeerStatus(final TransferDirection direction) {
Set<PeerStatus> peerStatuses = getPeerStatuses();
Map<PeerStatus, Double> orderedPeerStatuses = buildWeightedPeerMap(peerStatuses, direction);
return getAvailablePeerStatus(orderedPeerStatuses);
}
/**
* Returns {@code true} if this peer is currently penalized and should not send/receive flowfiles.
*
* @param peerStatus the peer status identifying the peer
* @return true if this peer is penalized
*/
public boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerPenaltyExpirations.get(peerStatus.getPeerDescription());
return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
}
/**
* Updates internal state map to penalize a PeerStatus that points to the
* specified peer
* specified peer.
*
* @param peer the peer
* @param penalizationMillis period of time to penalize a given peer
* @param peer the peer
* @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
*/
public void penalize(final Peer peer, final long penalizationMillis) {
penalize(peer.getDescription(), penalizationMillis);
}
/**
* Updates internal state map to penalize a PeerStatus that points to the
* specified peer.
*
* @param peerDescription the peer description (identifies the peer)
* @param penalizationMillis period of time to penalize a given peer (relative time, not absolute)
*/
public void penalize(final PeerDescription peerDescription, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(peerDescription);
Long expiration = peerPenaltyExpirations.get(peerDescription);
if (expiration == null) {
expiration = Long.valueOf(0L);
expiration = 0L;
}
final long newExpiration = Math.max(expiration, systemTime.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(peerDescription, Long.valueOf(newExpiration));
}
public boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
return (expirationEnd != null && expirationEnd > systemTime.currentTimeMillis());
}
public void clear() {
peerTimeoutExpirations.clear();
}
private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
return (peerList == null || peerList.isEmpty() || systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
peerPenaltyExpirations.put(peerDescription, newExpiration);
}
/**
* Return status of a peer that will be used for the next communication.
* The peer with less workload will be selected with higher probability.
* @param direction the amount of workload is calculated based on transaction direction,
* for SEND, a peer with less flow files is preferred,
* for RECEIVE, a peer with more flow files is preferred
* @return a selected peer, if there is no available peer or all peers are penalized, then return null
* Allows for external callers to trigger a refresh of the internal peer status cache. Performs the refresh if the cache has expired. If the cache is still valid, skips the refresh.
*/
public PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
if (isPeerRefreshNeeded(peerList)) {
peerRefreshLock.lock();
try {
// now that we have the lock, check again that we need to refresh (because another thread
// could have been refreshing while we were waiting for the lock).
peerList = peerStatuses;
if (isPeerRefreshNeeded(peerList)) {
try {
peerList = createPeerStatusList(direction);
} catch (final Exception e) {
final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
warn(logger, eventReporter, message);
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
}
this.peerStatuses = peerList;
peerRefreshTime = systemTime.currentTimeMillis();
}
} finally {
peerRefreshLock.unlock();
}
}
if (peerList == null || peerList.isEmpty()) {
return null;
}
PeerStatus peerStatus;
for (int i = 0; i < peerList.size(); i++) {
final long idx = peerIndex.getAndIncrement();
final int listIndex = (int) (idx % peerList.size());
peerStatus = peerList.get(listIndex);
if (isPenalized(peerStatus)) {
logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
} else {
return peerStatus;
}
}
logger.debug("{} All peers appear to be penalized; returning null", this);
return null;
}
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
Set<PeerStatus> statuses = getPeerStatuses();
if (statuses == null) {
refreshPeers();
statuses = getPeerStatuses();
if (statuses == null) {
logger.debug("{} found no peers to connect to", this);
return Collections.emptyList();
}
}
return formulateDestinationList(statuses, direction);
}
private Set<PeerStatus> getPeerStatuses() {
final PeerStatusCache cache = this.peerStatusCache;
if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) {
return null;
}
if (cache.getTimestamp() + PEER_CACHE_MILLIS < systemTime.currentTimeMillis()) {
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
for (final PeerStatus status : cache.getStatuses()) {
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
equalizedSet.add(equalizedStatus);
}
return equalizedSet;
}
return cache.getStatuses();
}
public void refreshPeers() {
final PeerStatusCache existingCache = peerStatusCache;
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
return;
}
try {
final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
persistPeerStatuses();
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
} catch (Exception e) {
warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("", e);
}
public void refresh() {
long cacheAgeMs = getCacheAge();
logger.debug("External refresh triggered. Last refresh was {} ms ago", cacheAgeMs);
if (isPeerRefreshNeeded()) {
logger.debug("Refreshing peer status cache");
refreshPeerStatusCache();
} else {
logger.debug("Cache is still valid; skipping refresh");
}
}
/**
* Sets the event reporter instance.
*
* @param eventReporter the event reporter
*/
public void setEventReporter(EventReporter eventReporter) {
this.eventReporter = eventReporter;
}
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
/**
* Returns a map of peers prepared for flowfile transfer in the specified direction. Each peer is a key and the value is a
* weighted percentage of the total flowfiles in the remote instance. For example, in a cluster where the total number of flowfiles
* is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C, the resulting map for
* {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 * 100 / (3-1)) => 40.0).
*
* @param statuses the set of all peers
* @param direction the direction of transfer ({@code SEND} weights the destinations higher if they have more flowfiles, {@code RECEIVE} weights them higher if they have fewer)
* @return the ordered map of each peer to its relative weight
*/
LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final Set<PeerStatus> statuses, final TransferDirection direction) {
// Get all the destinations with their relative weights
final Map<PeerStatus, Double> peerWorkloads = createDestinationMap(statuses, direction);
// Look at all of the peers that we fetched last time.
final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
if (lastFetched != null && !lastFetched.isEmpty()) {
lastFetched.stream().map(peer -> peer.getPeerDescription())
.forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
if (!peerWorkloads.isEmpty()) {
// This map is sorted, but not by key, so it cannot use SortedMap
LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads = sortMapByWeight(peerWorkloads);
// Print the expected distribution of the peers
printDistributionStatistics(sortedPeerWorkloads, direction);
return sortedPeerWorkloads;
} else {
logger.debug("No peers available");
return new LinkedHashMap<>();
}
}
/**
* Returns a map indexed by a peer to the normalized weight (number of flowfiles currently being
* processed by the peer as a percentage of the total). This is used to allocate flowfiles to
* the various peers as destinations.
*
* @param peerStatuses the set of peers, along with their current workload (number of flowfiles)
* @param direction whether sending flowfiles to these peers or receiving them
* @return the map of weighted peers
*/
@NotNull
private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> peerStatuses, TransferDirection direction) {
final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
// Calculate the total number of flowfiles in the peers
long totalFlowFileCount = peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", totalFlowFileCount);
// For each node, calculate the relative weight and store it in the map
for (final PeerStatus nodeInfo : peerStatuses) {
final int flowFileCount = nodeInfo.getFlowFileCount();
final double normalizedWeight = calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount, peerStatuses.size());
peerWorkloads.put(nodeInfo, normalizedWeight);
}
// Always add the configured node info to the list of peers to communicate with
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
return peerWorkloads;
}
/**
* Returns a set of {@link PeerStatus} objects representing all remote peers for the provided
* {@link PeerDescription}s. If a queried peer returns updated state on a peer which has already
* been captured, the new state is used.
* <p>
* Example:
* <p>
* 3 node cluster with nodes A, B, C
* <p>
* Node A knows about Node B and Node C, B about A and C, etc.
*
* <pre>
* Action | Statuses
* query(A) -> B.status, C.status | Bs1, Cs1
* query(B) -> A.status, C.status | As1, Bs1, Cs2
* query(C) -> A.status, B.status | As2, Bs2, Cs2
* </pre>
*
* @param peersToRequestClusterInfoFrom the set of peers to query
* @return the complete set of statuses for each collection of peers
* @throws IOException if there is a problem fetching peer statuses
*/
private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> peersToRequestClusterInfoFrom) throws IOException {
logger.debug("Fetching remote peer statuses from: {}", peersToRequestClusterInfoFrom);
Exception lastFailure = null;
final Set<PeerStatus> allPeerStatuses = new HashSet<>();
// Iterate through all peers, getting (sometimes multiple) status(es) from each
for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
try {
final Set<PeerStatus> statuses = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
lastFetchedQueryablePeers = statuses.stream()
.filter(p -> p.isQueryForPeers())
// Retrieve the peer status(es) from each peer description
final Set<PeerStatus> statusesForPeerDescription = peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
// Filter to remove any peers which are not queryable
final Set<PeerStatus> filteredStatuses = statusesForPeerDescription.stream()
.filter(PeerStatus::isQueryForPeers)
.collect(Collectors.toSet());
return statuses;
allPeerStatuses.addAll(filteredStatuses);
} catch (final Exception e) {
logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}",
logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}",
peerDescription.getHostname(), peerDescription.getPort(), e.toString());
lastFailure = e;
}
}
final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
if (lastFailure != null) {
ioe.addSuppressed(lastFailure);
// If no peers were fetched and an exception was the cause, throw an exception
if (allPeerStatuses.isEmpty() && lastFailure != null) {
throw new IOException("Unable to retrieve nodes from remote instance", lastFailure);
}
throw ioe;
return allPeerStatuses;
}
}
/**
* Returns the {@link PeerStatus} identifying the next peer to send/receive data. This uses random
* selection of peers, weighted by the relative desirability (i.e. for {@code SEND}, peers with more
* flowfiles are more likely to be selected, and for {@code RECEIVE}, peers with fewer flowfiles are
* more likely).
*
* @param orderedPeerStatuses the map of peers to relative weights, sorted in descending order by weight
* @return the peer to send/receive data
*/
private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> orderedPeerStatuses) {
if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
logger.warn("Available peers collection is empty; no peer available");
return null;
}
// Only distribute to unpenalized peers
Map<PeerStatus, Double> unpenalizedPeers = orderedPeerStatuses.entrySet().stream()
.filter(e -> !isPenalized(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final double totalWeights = sumMapValues(unpenalizedPeers);
logger.debug("Determining next available peer ({} peers with total weight {})", unpenalizedPeers.keySet().size(), totalWeights);
final double random = Math.random() * Math.min(100, totalWeights);
logger.debug("Generated random value {}", random);
double threshold = 0.0;
for (Map.Entry<PeerStatus, Double> e : unpenalizedPeers.entrySet()) {
logger.debug("Initial threshold was {}; added peer value {}; total {}", threshold, e.getValue(), threshold + e.getValue());
threshold += e.getValue();
if (random <= threshold) {
return e.getKey();
}
}
logger.debug("Did not select a peer; r {}, t {}, w {}", random, threshold, orderedPeerStatuses.values());
logger.debug("All peers appear to be penalized; returning null");
return null;
}
/**
* Returns the cache age in milliseconds. If the cache is null or not set, returns {@code -1}.
*
* @return the cache age in millis
*/
private long getCacheAge() {
if (peerStatusCache == null) {
return -1;
}
return System.currentTimeMillis() - peerStatusCache.getTimestamp();
}
/**
* Returns the set of queryable peers ({@link PeerStatus#isQueryForPeers()}) most recently fetched.
*
* @return the set of queryable peers (empty set if the cache is {@code null})
*/
@NotNull
private Set<PeerStatus> getLastFetchedQueryablePeers() {
return peerStatusCache != null ? peerStatusCache.getStatuses() : Collections.emptySet();
}
/**
* Returns the set of peer statuses. If the cache is {@code null} or empty, refreshes the cache first and then returns the new peer status set.
*
* @return the most recent peer statuses (empty set if the cache is {@code null})
*/
@NotNull
private Set<PeerStatus> getPeerStatuses() {
if (isPeerRefreshNeeded()) {
refreshPeerStatusCache();
}
return getLastFetchedQueryablePeers();
}
/**
* Returns the set of {@link PeerDescription} objects uniquely identifying each NiFi node which should be queried for {@link PeerStatus}.
*
* @return the set of recently retrieved peers and the bootstrap peer
* @throws IOException if there is a problem retrieving the list of peers to query
*/
private Set<PeerDescription> getPeersToQuery() throws IOException {
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
// Use the peers fetched last time
final Set<PeerStatus> lastFetched = getLastFetchedQueryablePeers();
if (lastFetched != null && !lastFetched.isEmpty()) {
for (PeerStatus peerStatus : lastFetched) {
peersToRequestClusterInfoFrom.add(peerStatus.getPeerDescription());
}
}
// Always add the configured node info to the list of peers
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
return peersToRequestClusterInfoFrom;
}
/**
* Returns {@code true} if this cache has expired.
*
* @param cache the peer status cache
* @return true if the cache is expired
*/
private boolean isCacheExpired(PeerStatusCache cache) {
return cache == null || cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis();
}
/**
* Returns {@code true} if the internal collection of peers is empty or the refresh time has passed.
*
* @return true if the peer statuses should be refreshed
*/
private boolean isPeerRefreshNeeded() {
return (peerStatusCache == null || peerStatusCache.isEmpty() || isCacheExpired(peerStatusCache));
}
/**
* Persists the provided cache instance (in memory and via the {@link PeerPersistence} (e.g. in cluster state or a local file)) for future retrieval.
*
* @param peerStatusCache the cache of current peer statuses to persist
*/
private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
try {
this.peerStatusCache = peerStatusCache;
// The #save mechanism persists the cache to stateful or file-based storage
peerPersistence.save(peerStatusCache);
} catch (final IOException e) {
error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" +
" and the nodes specified at the remote instance are down," +
" may be unable to transfer data until communications with those nodes are restored", e.toString());
logger.error("", e);
}
}
/**
* Refreshes the list of S2S peers that flowfiles can be sent to or received from. Uses the stateful
* cache to reduce network overhead.
*/
private void refreshPeerStatusCache() {
try {
// Splitting enumeration and querying into separate methods allows better testing and composition
final Set<PeerDescription> peersToQuery = getPeersToQuery();
final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peersToQuery);
if (statuses.isEmpty()) {
logger.info("No peers were retrieved from the remote group {}", peersToQuery.stream().map(p -> p.getHostname() + ":" + p.getPort()).collect(Collectors.joining(",")));
}
// Persist the fetched peer statuses
PeerStatusCache peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
persistPeerStatuses(peerStatusCache);
logger.info("Successfully refreshed peer status cache; remote group consists of {} peers", statuses.size());
} catch (Exception e) {
warn(logger, eventReporter, "Unable to refresh remote group peers due to: {}", e.getMessage());
if (logger.isDebugEnabled() && e.getCause() != null) {
logger.warn("Caused by: ", e);
}
}
}
}

View File

@ -16,13 +16,12 @@
*/
package org.apache.nifi.remote.client;
import java.io.IOException;
import java.util.Set;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.io.IOException;
import java.util.Set;
/**
* This interface defines methods used from {@link PeerSelector}.
*/
@ -52,6 +51,7 @@ public interface PeerStatusProvider {
* Fetch peer statuses from a remote NiFi cluster.
* Implementation of this method should fetch peer statuses from the node
* represented by the passed PeerDescription using its transport protocol.
*
* @param peerDescription a bootstrap node or one of query-able nodes lastly fetched successfully
* @return Remote peer statuses
* @throws IOException thrown when it fails to fetch peer statuses of the remote cluster from the specified peer

View File

@ -16,6 +16,17 @@
*/
package org.apache.nifi.remote.client.http;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@ -28,7 +39,6 @@ import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
@ -39,18 +49,6 @@ import org.apache.nifi.web.api.dto.remote.PeerDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusProvider {
private static final Logger logger = LoggerFactory.getLogger(HttpClient.class);
@ -80,7 +78,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
taskExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
peerSelector.refreshPeers();
peerSelector.refresh();
}
}, 0, 5, TimeUnit.SECONDS);
@ -99,7 +97,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
@Override
public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
// Each node should has the same URL structure and network reach-ability with the proxy configuration.
// Each node should have the same URL structure and network reachability with the proxy configuration
// Construct API client and provide to retrieval method
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter())) {
final String scheme = peerDescription.isSecure() ? "https" : "http";
apiClient.setBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort());
@ -110,20 +109,26 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS));
apiClient.setLocalAddress(config.getLocalAddress());
final Collection<PeerDTO> peers = apiClient.getPeers();
if(peers == null || peers.size() == 0){
throw new IOException("Couldn't get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
}
// Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
// was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
.collect(Collectors.toSet());
return fetchRemotePeerStatuses(apiClient);
}
}
private Set<PeerStatus> fetchRemotePeerStatuses(SiteToSiteRestApiClient apiClient) throws IOException {
// Each node should have the same URL structure and network reachability with the proxy configuration
final Collection<PeerDTO> peers = apiClient.getPeers();
logger.debug("Retrieved {} peers from {}: {}", peers.size(), apiClient.getBaseUrl(), peers);
if (peers.size() == 0) {
throw new IOException("Could not get any peer to communicate with. " + apiClient.getBaseUrl() + " returned zero peers.");
}
// Convert the PeerDTOs to PeerStatus objects
// Each PeerStatus will have the queryPeers flag set to true because Site-to-Site over HTTP
// was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed
return peers.stream().map(PeerStatus::new).collect(Collectors.toSet());
}
@Override
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
public Transaction createTransaction(final TransferDirection direction) throws IOException {
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
PeerStatus peerStatus;
@ -188,7 +193,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
// We found a valid peer to communicate with.
final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) {
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()) {
@Override
protected void close() throws IOException {

View File

@ -16,6 +16,32 @@
*/
package org.apache.nifi.remote.client.socket;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@ -42,33 +68,6 @@ import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.remote.util.EventReportUtil.error;
import static org.apache.nifi.remote.util.EventReportUtil.warn;
public class EndpointConnectionPool implements PeerStatusProvider {
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
@ -122,19 +121,9 @@ public class EndpointConnectionPool implements PeerStatusProvider {
}
});
taskExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
peerSelector.refreshPeers();
}
}, 0, 5, TimeUnit.SECONDS);
taskExecutor.scheduleWithFixedDelay(() -> peerSelector.refresh(), 0, 5, TimeUnit.SECONDS);
taskExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
cleanupExpiredSockets();
}
}, 5, 5, TimeUnit.SECONDS);
taskExecutor.scheduleWithFixedDelay(() -> cleanupExpiredSockets(), 5, 5, TimeUnit.SECONDS);
}
private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {

View File

@ -19,9 +19,10 @@ package org.apache.nifi.remote.protocol;
import java.io.Closeable;
import java.io.IOException;
// TODO: Possibly refactor shared interface between this class and SiteToSiteRestApiClient
public interface CommunicationsSession extends Closeable {
public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
CommunicationsInput getInput();

View File

@ -17,7 +17,8 @@
package org.apache.nifi.remote.util;
import java.util.Set;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@ -45,4 +46,19 @@ public class PeerStatusCache {
public SiteToSiteTransportProtocol getTransportProtocol() {
return transportProtocol;
}
public boolean isEmpty() {
return statuses == null || statuses.isEmpty();
}
@Override
public String toString() {
final ToStringBuilder builder = new ToStringBuilder(this);
ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
builder.append("Timestamp", timestamp);
builder.append("Transport protocol", transportProtocol);
builder.append("Peer status count", statuses != null ? statuses.size() : 0);
builder.append("Peer statuses", statuses);
return builder.toString();
}
}

View File

@ -16,10 +16,63 @@
*/
package org.apache.nifi.remote.util;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
@ -85,60 +138,6 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
public class SiteToSiteRestApiClient implements Closeable {
private static final String EVENT_CATEGORY = "Site-to-Site";
@ -1084,7 +1083,7 @@ public class SiteToSiteRestApiClient implements Closeable {
String responseMessage = null;
try {
responseMessage = new String(bos.toByteArray(), "UTF-8");
responseMessage = new String(bos.toByteArray(), StandardCharsets.UTF_8);
logger.debug("readResponse responseMessage={}", responseMessage);
final ObjectMapper mapper = new ObjectMapper();

View File

@ -1,383 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import static java.util.stream.Collectors.toMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
public class TestPeerSelector {
private static final Logger logger = LoggerFactory.getLogger(TestPeerSelector.class);
private Map<String, Integer> calculateAverageSelectedCount(Set<PeerStatus> collection, List<PeerStatus> destinations) {
// Calculate hostname entry, for average calculation. Because there're multiple entry with same host name, different port.
final Map<String, Integer> hostNameCounts
= collection.stream().collect(groupingBy(p -> p.getPeerDescription().getHostname(), reducing(0, p -> 1, Integer::sum)));
// Calculate how many times each hostname is selected.
return destinations.stream().collect(groupingBy(p -> p.getPeerDescription().getHostname(), reducing(0, p -> 1, Integer::sum)))
.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> {
return e.getValue() / hostNameCounts.get(e.getKey());
}));
}
@Test
public void testFormulateDestinationListForOutputEven() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
int consecutiveSamePeerCount = 0;
PeerStatus previousPeer = null;
for (PeerStatus peer : destinations) {
if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) {
consecutiveSamePeerCount++;
// The same peer shouldn't be used consecutively (number of nodes - 1) times or more.
if (consecutiveSamePeerCount >= (collection.size() - 1)) {
fail("The same peer is returned consecutively too frequently.");
}
} else {
consecutiveSamePeerCount = 0;
}
previousPeer = peer;
}
}
@Test
public void testFormulateDestinationListForOutput() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240, true));
collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024, true));
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
assertTrue("HasLots should send lots", selectedCounts.get("HasLots") > selectedCounts.get("HasMedium"));
assertTrue("HasMedium should send medium", selectedCounts.get("HasMedium") > selectedCounts.get("HasLittle"));
}
@Test
public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500, true));
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
assertTrue("HasLots should send lots", selectedCounts.get("HasLots") > selectedCounts.get("HasLittle"));
}
@Test
public void testFormulateDestinationListForInputPorts() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240, true));
collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024, true));
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasMedium"));
assertTrue("HasMedium should get medium", selectedCounts.get("HasMedium") < selectedCounts.get("HasLittle"));
}
@Test
public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500, true));
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
assertTrue("HasLots should get little", selectedCounts.get("HasLots") < selectedCounts.get("HasLittle"));
}
private static class UnitTestSystemTime extends PeerSelector.SystemTime {
private long offset = 0;
@Override
long currentTimeMillis() {
return super.currentTimeMillis() + offset;
}
}
/**
* This test simulates a failure scenario of a remote NiFi cluster. It confirms that:
* <ol>
* <li>PeerSelector uses the bootstrap node to fetch remote peer statuses at the initial attempt</li>
* <li>PeerSelector uses one of query-able nodes lastly fetched successfully</li>
* <li>PeerSelector can refresh remote peer statuses even if the bootstrap node is down</li>
* <li>PeerSelector returns null as next peer when there's no peer available</li>
* <li>PeerSelector always tries to fetch peer statuses at least from the bootstrap node, so that it can
* recover when the node gets back online</li>
* </ol>
*/
@Test
public void testFetchRemotePeerStatuses() throws IOException {
final Set<PeerStatus> peerStatuses = new HashSet<>();
final PeerDescription bootstrapNode = new PeerDescription("Node1", 1111, true);
final PeerDescription node2 = new PeerDescription("Node2", 2222, true);
final PeerStatus bootstrapNodeStatus = new PeerStatus(bootstrapNode, 10, true);
final PeerStatus node2Status = new PeerStatus(node2, 10, true);
peerStatuses.add(bootstrapNodeStatus);
peerStatuses.add(node2Status);
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final UnitTestSystemTime systemTime = new UnitTestSystemTime();
peerSelector.setSystemTime(systemTime);
doReturn(bootstrapNode).when(peerStatusProvider).getBootstrapPeerDescription();
doAnswer(invocation -> {
final PeerDescription peerFetchStatusesFrom = invocation.getArgument(0);
if (peerStatuses.stream().filter(ps -> ps.getPeerDescription().equals(peerFetchStatusesFrom)).collect(Collectors.toSet()).size() > 0) {
// If the remote peer is running, then return available peer statuses.
return peerStatuses;
}
throw new IOException("Connection refused. " + peerFetchStatusesFrom + " is not running.");
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
// 1st attempt. It uses the bootstrap node.
peerSelector.refreshPeers();
PeerStatus peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
// Proceed time so that peer selector refresh statuses.
peerStatuses.remove(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
// 2nd attempt.
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
// Proceed time so that peer selector refresh statuses.
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
// 3rd attempt.
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNotNull(peerStatus);
assertEquals("Node2 should be returned since node 2 is the only available node.", node2, peerStatus.getPeerDescription());
// Remove node2 to simulate that it goes down. There's no available node at this point.
peerStatuses.remove(node2Status);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertNull("PeerSelector should return null as next peer status, since there's no available peer", peerStatus);
// Add node1 back. PeerSelector should be able to fetch peer statuses because it always tries to fetch at least from the bootstrap node.
peerStatuses.add(bootstrapNodeStatus);
systemTime.offset += TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES) + 1;
peerSelector.refreshPeers();
peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peerStatus.getPeerDescription());
}
@Test
public void testPeerStatusManagedCache() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final StateManager stateManager = Mockito.mock(StateManager.class);
final StateMap stateMap = Mockito.mock(StateMap.class);
final Map<String, String> state = new HashMap<>();
state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
doAnswer(invocation -> {
final Map<String, String> updatedMap = invocation.getArgument(0);
state.clear();
state.putAll(updatedMap);
return null;
}).when(stateManager).setState(any(), eq(Scope.LOCAL));
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
// PeerSelector should restore peer statuses from managed cache.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Restored peers should be used",
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
// If the stored state is too old, PeerSelector refreshes peers.
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis() - 120_000));
peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Peers should be refreshed",
"RAW\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
}
@Test
public void testPeerStatusManagedCacheDifferentProtocol() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final StateManager stateManager = Mockito.mock(StateManager.class);
final StateMap stateMap = Mockito.mock(StateMap.class);
final Map<String, String> state = new HashMap<>();
state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
doAnswer(invocation -> {
final Map<String, String> updatedMap = invocation.getArgument(0);
state.clear();
state.putAll(updatedMap);
return null;
}).when(stateManager).setState(any(), eq(Scope.LOCAL));
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
// PeerSelector should NOT restore peer statuses from managed cache because protocol changed.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Restored peers should NOT be used",
"HTTP\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
}
@Test
public void testPeerStatusFileCache() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
final File file = File.createTempFile("peers", "txt");
file.deleteOnExit();
try (final FileOutputStream fos = new FileOutputStream(file)) {
fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
}
final Supplier<String> readFile = () -> {
try (final FileInputStream fin = new FileInputStream(file);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fin))) {
final StringBuilder lines = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
lines.append(line).append("\n");
}
return lines.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
// PeerSelector should restore peer statuses from managed cache.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
peerSelector.refreshPeers();
assertEquals("Restored peers should be used",
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", readFile.get());
// If the stored state is too old, PeerSelector refreshes peers.
file.setLastModified(System.currentTimeMillis() - 120_000);
peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
peerSelector.refreshPeers();
assertEquals("Peers should be refreshed",
"RAW\nnifi0:8081:false:true\n", readFile.get());
}
}

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%-4r [%t] %-5p %c{3} - %m%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>./target/log</file>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.remote.client" level="DEBUG"/>
<logger name="org.apache.nifi.remote.client.PeerSelectorTest" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>

View File

@ -16,6 +16,37 @@
*/
package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
@ -50,38 +81,6 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as Incoming Ports and
@ -919,7 +918,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final List<String> inputPortString = dto.getInputPorts().stream()
.map(port -> "InputPort[name=" + port.getName() + ", targetId=" + port.getId() + "]")
.collect(Collectors.toList());
final List<String> outputPortString = dto.getInputPorts().stream()
final List<String> outputPortString = dto.getOutputPorts().stream()
.map(port -> "OutputPort[name=" + port.getName() + ", targetId=" + port.getId() + "]")
.collect(Collectors.toList());

View File

@ -30,7 +30,7 @@
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.controller.tasks" level="DEBUG" />"
<logger name="org.apache.nifi.controller.tasks" level="DEBUG"/>
<logger name="org.apache.nifi.controller.service" level="DEBUG"/>
<logger name="org.apache.nifi.encrypt" level="DEBUG"/>
<logger name="org.apache.nifi.security.util.crypto" level="DEBUG"/>

View File

@ -17,11 +17,29 @@
package org.apache.nifi.web.api;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
@ -46,26 +64,6 @@ import org.apache.nifi.web.api.entity.PeersEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
/**
* RESTful endpoint for managing a SiteToSite connection.
*/
@ -131,10 +129,6 @@ public class SiteToSiteResource extends ApplicationResource {
authorizeSiteToSite();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// get the controller dto
final ControllerDTO controller = serviceFacade.getSiteToSiteDetails();