diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java index 6c8a4ecebd..3113076857 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -20,10 +20,12 @@ public class PeerStatus { private final PeerDescription description; private final int numFlowFiles; + private final boolean queryForPeers; - public PeerStatus(final PeerDescription description, final int numFlowFiles) { + public PeerStatus(final PeerDescription description, final int numFlowFiles, final boolean queryForPeers) { this.description = description; this.numFlowFiles = numFlowFiles; + this.queryForPeers = queryForPeers; } public PeerDescription getPeerDescription() { @@ -34,6 +36,13 @@ public class PeerStatus { return numFlowFiles; } + /** + * @return true if this node can be queried for its peers, false otherwise. + */ + public boolean isQueryForPeers() { + return queryForPeers; + } + @Override public String toString() { return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index b67e014a9f..e452b0fa12 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -97,7 +97,7 @@ public class PeerSelector { for (final PeerStatus status : statuses) { final PeerDescription description = status.getPeerDescription(); - final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n"; + final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n"; out.write(line.getBytes(StandardCharsets.UTF_8)); } @@ -120,7 +120,7 @@ public class PeerSelector { String line; while ((line = reader.readLine()) != null) { final String[] splits = line.split(Pattern.quote(":")); - if (splits.length != 3) { + if (splits.length != 3 && splits.length != 4) { continue; } @@ -128,7 +128,9 @@ public class PeerSelector { final int port = Integer.parseInt(splits[1]); final boolean secure = Boolean.parseBoolean(splits[2]); - statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1)); + final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]); + + statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer)); } } @@ -172,7 +174,7 @@ public class PeerSelector { final int index = n % destinations.size(); PeerStatus status = destinations.get(index); if (status == null) { - status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount()); + status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers()); destinations.set(index, status); break; } else { @@ -306,7 +308,7 @@ public class PeerSelector { if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { final Set equalizedSet = new HashSet<>(cache.getStatuses().size()); for (final PeerStatus status : cache.getStatuses()) { - final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1); + final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers()); equalizedSet.add(equalizedStatus); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 3312e88193..4cc794b3eb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -93,45 +93,44 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr final URI clusterUrl; try { clusterUrl = new URI(config.getUrl()); - } catch (URISyntaxException e) { + } catch (final URISyntaxException e) { throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e); } - try ( - SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()) - ) { - String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort()); + try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) { + final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort()); - int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); + final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); - Collection peers = apiClient.getPeers(); + + final Collection peers = apiClient.getPeers(); if(peers == null || peers.size() == 0){ throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers."); } - return peers.stream() - .map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount())) + // Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP + // was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed. + return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true)) .collect(Collectors.toSet()); } } @Override - public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { - - int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); + public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException { + final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); PeerStatus peerStatus; while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) { logger.debug("peerStatus={}", peerStatus); - CommunicationsSession commSession = new HttpCommunicationsSession(); - String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription()); + final CommunicationsSession commSession = new HttpCommunicationsSession(); + final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription()); commSession.setUri(nodeApiUrl); - String clusterUrl = config.getUrl(); - Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl); + final String clusterUrl = config.getUrl(); + final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl); - int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS); + final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS); String portId = config.getPortIdentifier(); if (StringUtils.isEmpty(portId)) { portId = siteInfoProvider.getPortIdentifier(config.getPortName(), direction); @@ -141,7 +140,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } } - SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()); apiClient.setBaseUrl(peer.getUrl()); apiClient.setConnectTimeoutMillis(timeoutMillis); @@ -157,7 +156,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr try { transactionUrl = apiClient.initiateTransaction(direction, portId); commSession.setUserDn(apiClient.getTrustedPeerDn()); - } catch (Exception e) { + } catch (final Exception e) { + apiClient.close(); logger.debug("Penalizing a peer due to {}", e.getMessage()); peerSelector.penalize(peer, penaltyMillis); @@ -170,8 +170,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } // We found a valid peer to communicate with. - Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion(); - HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction, + final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion(); + final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction, config.isUseCompression(), portId, penaltyMillis, config.getEventReporter()); transaction.initialize(apiClient, transactionUrl); @@ -183,7 +183,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } - private String resolveNodeApiUrl(PeerDescription description) { + private String resolveNodeApiUrl(final PeerDescription description) { return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api"; } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 8a6a91f795..6869e4ba06 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -16,6 +16,35 @@ */ package org.apache.nifi.remote.client.socket; +import static org.apache.nifi.remote.util.EventReportUtil.error; +import static org.apache.nifi.remote.util.EventReportUtil.warn; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.channels.SocketChannel; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerDescription; @@ -40,33 +69,6 @@ import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.nio.channels.SocketChannel; -import java.security.cert.CertificateException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import static org.apache.nifi.remote.util.EventReportUtil.error; -import static org.apache.nifi.remote.util.EventReportUtil.warn; - public class EndpointConnectionPool implements PeerStatusProvider { private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); @@ -84,6 +86,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { private volatile int commsTimeout; private volatile boolean shutdown = false; + private volatile Set lastFetchedQueryablePeers; private final SiteInfoProvider siteInfoProvider; private final PeerSelector peerSelector; @@ -145,8 +148,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { return getEndpointConnection(direction, null); } - public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) - throws IOException { + public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException { // // Attempt to get a connection state that already exists for this URL. // @@ -358,15 +360,13 @@ public class EndpointConnectionPool implements PeerStatusProvider { } } - public Set fetchRemotePeerStatuses() throws IOException { - final String hostname = clusterUrl.getHost(); - final Integer port = siteInfoProvider.getSiteToSitePort(); - if (port == null) { - throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); - } + private Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException { + final String hostname = peerDescription.getHostname(); + final int port = peerDescription.getPort(); final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://")); final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); + final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); final SocketClientProtocol clientProtocol = new SocketClientProtocol(); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); @@ -414,6 +414,50 @@ public class EndpointConnectionPool implements PeerStatusProvider { return peerStatuses; } + @Override + public Set fetchRemotePeerStatuses() throws IOException { + final Set peersToRequestClusterInfoFrom = new HashSet<>(); + + // Look at all of the peers that we fetched last time. + final Set lastFetched = lastFetchedQueryablePeers; + if (lastFetched != null && !lastFetched.isEmpty()) { + lastFetched.stream().map(peer -> peer.getPeerDescription()) + .forEach(desc -> peersToRequestClusterInfoFrom.add(desc)); + } + + // Always add the configured node info to the list of peers to communicate with + final String hostname = clusterUrl.getHost(); + final Integer port = siteInfoProvider.getSiteToSitePort(); + if (port == null) { + throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); + } + + final boolean secure = siteInfoProvider.isSecure(); + peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure)); + + Exception lastFailure = null; + for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) { + try { + final Set statuses = fetchRemotePeerStatuses(peerDescription); + lastFetchedQueryablePeers = statuses.stream() + .filter(p -> p.isQueryForPeers()) + .collect(Collectors.toSet()); + + return statuses; + } catch (final Exception e) { + logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort()); + lastFailure = e; + } + } + + final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster"); + if (lastFailure != null) { + ioe.addSuppressed(lastFailure); + } + + throw ioe; + } + private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { final PeerDescription description = peerStatus.getPeerDescription(); return establishSiteToSiteConnection(description.getHostname(), description.getPort()); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java index ae12c67061..8148bf2bc0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java @@ -32,11 +32,12 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession { private Transaction.TransactionState status = Transaction.TransactionState.TRANSACTION_STARTED; private ResponseCode responseCode; - public HttpServerCommunicationsSession(InputStream inputStream, OutputStream outputStream, String transactionId){ + public HttpServerCommunicationsSession(final InputStream inputStream, final OutputStream outputStream, final String transactionId, final String userDn) { super(); input.setInputStream(inputStream); output.setOutputStream(outputStream); this.transactionId = transactionId; + setUserDn(userDn); } // This status is only needed by HttpFlowFileServerProtocol, HttpClientTransaction has its own status. @@ -46,7 +47,7 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession { return status; } - public void setStatus(Transaction.TransactionState status) { + public void setStatus(final Transaction.TransactionState status) { this.status = status; } @@ -58,11 +59,11 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession { return responseCode; } - public void setResponseCode(ResponseCode responseCode) { + public void setResponseCode(final ResponseCode responseCode) { this.responseCode = responseCode; } - public void putHandshakeParam(HandshakeProperty key, String value) { + public void putHandshakeParam(final HandshakeProperty key, final String value) { handshakeParams.put(key.name(), value); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 25e78cb6d2..477c52d581 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -51,7 +51,8 @@ import java.util.concurrent.TimeUnit; public class SocketClientProtocol implements ClientProtocol { - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); + // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0 + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1); private RemoteDestination destination; private boolean useCompression = false; @@ -217,6 +218,8 @@ public class SocketClientProtocol implements ClientProtocol { final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + final boolean queryPeersForOtherPeers = getVersionNegotiator().getVersion() >= 6; + RequestType.REQUEST_PEER_LIST.writeRequestType(dos); dos.flush(); final int numPeers = dis.readInt(); @@ -226,7 +229,7 @@ public class SocketClientProtocol implements ClientProtocol { final int port = dis.readInt(); final boolean secure = dis.readBoolean(); final int flowFileCount = dis.readInt(); - peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount)); + peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount, queryPeersForOtherPeers)); } logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 8910598443..cb3a55a15c 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -125,11 +125,8 @@ public class SiteToSiteRestApiClient implements Closeable { private static final int RESPONSE_CODE_OK = 200; private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_ACCEPTED = 202; - private static final int RESPONSE_CODE_SEE_OTHER = 303; private static final int RESPONSE_CODE_BAD_REQUEST = 400; - private static final int RESPONSE_CODE_UNAUTHORIZED = 401; private static final int RESPONSE_CODE_NOT_FOUND = 404; - private static final int RESPONSE_CODE_SERVICE_UNAVAILABLE = 503; private static final Logger logger = LoggerFactory.getLogger(SiteToSiteRestApiClient.class); @@ -161,6 +158,7 @@ public class SiteToSiteRestApiClient implements Closeable { public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) { this.sslContext = sslContext; this.proxy = proxy; + ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); @@ -168,6 +166,7 @@ public class SiteToSiteRestApiClient implements Closeable { public Thread newThread(final Runnable r) { final Thread thread = defaultFactory.newThread(r); thread.setName(Thread.currentThread().getName() + " TTLExtend"); + thread.setDaemon(true); return thread; } }); @@ -210,9 +209,9 @@ public class SiteToSiteRestApiClient implements Closeable { private void setupRequestConfig() { final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() - .setConnectionRequestTimeout(connectTimeoutMillis) - .setConnectTimeout(connectTimeoutMillis) - .setSocketTimeout(readTimeoutMillis); + .setConnectionRequestTimeout(connectTimeoutMillis) + .setConnectTimeout(connectTimeoutMillis) + .setSocketTimeout(readTimeoutMillis); if (proxy != null) { requestConfigBuilder.setProxy(proxy.getHttpHost()); @@ -226,8 +225,8 @@ public class SiteToSiteRestApiClient implements Closeable { if (proxy != null) { if (!isEmpty(proxy.getUsername()) && !isEmpty(proxy.getPassword())) { credentialsProvider.setCredentials( - new AuthScope(proxy.getHttpHost()), - new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword())); + new AuthScope(proxy.getHttpHost()), + new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword())); } } @@ -242,7 +241,7 @@ public class SiteToSiteRestApiClient implements Closeable { } httpClient = clientBuilder - .setDefaultCredentialsProvider(getCredentialsProvider()).build(); + .setDefaultCredentialsProvider(getCredentialsProvider()).build(); } private void setupAsyncClient() { @@ -268,9 +267,9 @@ public class SiteToSiteRestApiClient implements Closeable { final SSLSession sslSession; if (conn instanceof ManagedHttpClientConnection) { - sslSession = ((ManagedHttpClientConnection)conn).getSSLSession(); + sslSession = ((ManagedHttpClientConnection) conn).getSSLSession(); } else if (conn instanceof ManagedNHttpClientConnection) { - sslSession = ((ManagedNHttpClientConnection)conn).getSSLSession(); + sslSession = ((ManagedNHttpClientConnection) conn).getSSLSession(); } else { throw new RuntimeException("Unexpected connection type was used, " + conn); } @@ -285,7 +284,7 @@ public class SiteToSiteRestApiClient implements Closeable { try { final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); trustedPeerDn = cert.getSubjectDN().getName().trim(); - } catch (CertificateException e) { + } catch (final CertificateException e) { final String msg = "Could not extract subject DN from SSL session peer certificate"; logger.warn(msg); throw new SSLPeerUnverifiedException(msg); @@ -296,14 +295,14 @@ public class SiteToSiteRestApiClient implements Closeable { public ControllerDTO getController() throws IOException { try { - HttpGet get = createGet("/site-to-site"); + final HttpGet get = createGet("/site-to-site"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); return execute(get, ControllerEntity.class).getController(); - } catch (HttpGetFailedException e) { + } catch (final HttpGetFailedException e) { if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) { logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url..."); - HttpGet get = createGet("/controller"); + final HttpGet get = createGet("/controller"); return execute(get, ControllerEntity.class).getController(); } throw e; @@ -311,12 +310,12 @@ public class SiteToSiteRestApiClient implements Closeable { } public Collection getPeers() throws IOException { - HttpGet get = createGet("/site-to-site/peers"); + final HttpGet get = createGet("/site-to-site/peers"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); return execute(get, PeersEntity.class).getPeers(); } - public String initiateTransaction(TransferDirection direction, String portId) throws IOException { + public String initiateTransaction(final TransferDirection direction, final String portId) throws IOException { if (TransferDirection.RECEIVE.equals(direction)) { return initiateTransaction("output-ports", portId); } else { @@ -324,10 +323,9 @@ public class SiteToSiteRestApiClient implements Closeable { } } - private String initiateTransaction(String portType, String portId) throws IOException { + private String initiateTransaction(final String portType, final String portId) throws IOException { logger.debug("initiateTransaction handshaking portType={}, portId={}", portType, portId); - HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions"); - + final HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions"); post.setHeader("Accept", "application/json"); post.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); @@ -335,27 +333,27 @@ public class SiteToSiteRestApiClient implements Closeable { setHandshakeProperties(post); try (CloseableHttpResponse response = getHttpClient().execute(post)) { - int responseCode = response.getStatusLine().getStatusCode(); + final int responseCode = response.getStatusLine().getStatusCode(); logger.debug("initiateTransaction responseCode={}", responseCode); String transactionUrl; switch (responseCode) { - case RESPONSE_CODE_CREATED : + case RESPONSE_CODE_CREATED: EntityUtils.consume(response.getEntity()); transactionUrl = readTransactionUrl(response); if (isEmpty(transactionUrl)) { throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header"); } - Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION); + final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION); if (transportProtocolVersionHeader == null) { throw new ProtocolException("Server didn't return confirmed protocol version"); } - Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue()); + final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue()); logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer); transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer); - Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL); + final Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL); if (serverTransactionTtlHeader == null) { throw new ProtocolException("Server didn't return " + HttpHeaders.SERVER_SIDE_TRANSACTION_TTL); } @@ -373,33 +371,36 @@ public class SiteToSiteRestApiClient implements Closeable { } - public boolean openConnectionForReceive(String transactionUrl, CommunicationsSession commSession) throws IOException { + public boolean openConnectionForReceive(final String transactionUrl, final CommunicationsSession commSession) throws IOException { - HttpGet get = createGet(transactionUrl + "/flow-files"); + final HttpGet get = createGet(transactionUrl + "/flow-files"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); setHandshakeProperties(get); - CloseableHttpResponse response = getHttpClient().execute(get); - int responseCode = response.getStatusLine().getStatusCode(); + final CloseableHttpResponse response = getHttpClient().execute(get); + final int responseCode = response.getStatusLine().getStatusCode(); logger.debug("responseCode={}", responseCode); boolean keepItOpen = false; try { switch (responseCode) { - case RESPONSE_CODE_OK : + case RESPONSE_CODE_OK: logger.debug("Server returned RESPONSE_CODE_OK, indicating there was no data."); EntityUtils.consume(response.getEntity()); return false; - case RESPONSE_CODE_ACCEPTED : - InputStream httpIn = response.getEntity().getContent(); - InputStream streamCapture = new InputStream() { + case RESPONSE_CODE_ACCEPTED: + final InputStream httpIn = response.getEntity().getContent(); + final InputStream streamCapture = new InputStream() { boolean closed = false; + @Override public int read() throws IOException { - if(closed) return -1; - int r = httpIn.read(); + if (closed) { + return -1; + } + final int r = httpIn.read(); if (r < 0) { closed = true; logger.debug("Reached to end of input stream. Closing resources..."); @@ -410,7 +411,7 @@ public class SiteToSiteRestApiClient implements Closeable { return r; } }; - ((HttpInput)commSession.getInput()).setInputStream(streamCapture); + ((HttpInput) commSession.getInput()).setInputStream(streamCapture); startExtendingTtl(transactionUrl, httpIn, response); keepItOpen = true; @@ -431,10 +432,11 @@ public class SiteToSiteRestApiClient implements Closeable { private final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384; private Future postResult; private CountDownLatch transferDataLatch = new CountDownLatch(1); - public void openConnectionForSend(String transactionUrl, CommunicationsSession commSession) throws IOException { + + public void openConnectionForSend(final String transactionUrl, final CommunicationsSession commSession) throws IOException { final String flowFilesPath = transactionUrl + "/flow-files"; - HttpPost post = createPost(flowFilesPath); + final HttpPost post = createPost(flowFilesPath); post.setHeader("Content-Type", "application/octet-stream"); post.setHeader("Accept", "text/plain"); @@ -442,7 +444,7 @@ public class SiteToSiteRestApiClient implements Closeable { setHandshakeProperties(post); - CountDownLatch initConnectionLatch = new CountDownLatch(1); + final CountDownLatch initConnectionLatch = new CountDownLatch(1); final URI requestUri = post.getURI(); final PipedOutputStream outputStream = new PipedOutputStream(); @@ -463,7 +465,7 @@ public class SiteToSiteRestApiClient implements Closeable { // Pass the output stream so that Site-to-Site client thread can send // data packet through this connection. logger.debug("sending data to {} has started...", flowFilesPath); - ((HttpOutput)commSession.getOutput()).setOutputStream(outputStream); + ((HttpOutput) commSession.getOutput()).setOutputStream(outputStream); initConnectionLatch.countDown(); final BasicHttpEntity entity = new BasicHttpEntity(); @@ -474,7 +476,7 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException { + public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException { int totalRead = 0; int totalProduced = 0; @@ -501,7 +503,7 @@ public class SiteToSiteRestApiClient implements Closeable { final long totalWritten = commSession.getOutput().getBytesWritten(); logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.", - flowFilesPath, totalProduced, totalRead, totalWritten); + flowFilesPath, totalProduced, totalRead, totalWritten); if (totalRead != totalWritten || totalProduced != totalWritten) { final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong."; throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten)); @@ -513,12 +515,12 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public void requestCompleted(HttpContext context) { + public void requestCompleted(final HttpContext context) { logger.debug("Sending data to {} completed.", flowFilesPath); } @Override - public void failed(Exception ex) { + public void failed(final Exception ex) { logger.error("Sending data to {} has failed", flowFilesPath, ex); } @@ -554,13 +556,13 @@ public class SiteToSiteRestApiClient implements Closeable { transferDataLatch = new CountDownLatch(1); startExtendingTtl(transactionUrl, dataPacketChannel, null); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IOException("Awaiting initConnectionLatch has been interrupted.", e); } } - public void finishTransferFlowFiles(CommunicationsSession commSession) throws IOException { + public void finishTransferFlowFiles(final CommunicationsSession commSession) throws IOException { if (postResult == null) { new IllegalStateException("Data transfer has not started yet."); @@ -576,7 +578,7 @@ public class SiteToSiteRestApiClient implements Closeable { if (!transferDataLatch.await(requestExpirationMillis, TimeUnit.MILLISECONDS)) { throw new IOException("Awaiting transferDataLatch has been timeout."); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IOException("Awaiting transferDataLatch has been interrupted.", e); } @@ -585,24 +587,24 @@ public class SiteToSiteRestApiClient implements Closeable { final HttpResponse response; try { response = postResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS); - } catch (ExecutionException e) { + } catch (final ExecutionException e) { logger.debug("Something has happened at sending thread. {}", e.getMessage()); - Throwable cause = e.getCause(); + final Throwable cause = e.getCause(); if (cause instanceof IOException) { throw (IOException) cause; } else { throw new IOException(cause); } - } catch (TimeoutException|InterruptedException e) { + } catch (TimeoutException | InterruptedException e) { throw new IOException(e); } - int responseCode = response.getStatusLine().getStatusCode(); + final int responseCode = response.getStatusLine().getStatusCode(); switch (responseCode) { - case RESPONSE_CODE_ACCEPTED : - String receivedChecksum = EntityUtils.toString(response.getEntity()); - ((HttpInput)commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes())); - ((HttpCommunicationsSession)commSession).setChecksum(receivedChecksum); + case RESPONSE_CODE_ACCEPTED: + final String receivedChecksum = EntityUtils.toString(response.getEntity()); + ((HttpInput) commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes())); + ((HttpCommunicationsSession) commSession).setChecksum(receivedChecksum); logger.debug("receivedChecksum={}", receivedChecksum); break; @@ -623,17 +625,17 @@ public class SiteToSiteRestApiClient implements Closeable { extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator; extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis; extendingApiClient.readTimeoutMillis = this.readTimeoutMillis; - int extendFrequency = serverTransactionTtl / 2; + final int extendFrequency = serverTransactionTtl / 2; ttlExtendingThread = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { try { extendingApiClient.extendTransaction(transactionUrl); - } catch (Exception e) { + } catch (final Exception e) { logger.warn("Failed to extend transaction ttl", e); try { // Without disconnecting, Site-to-Site client keep reading data packet, // while server has already rollback. this.close(); - } catch (IOException ec) { + } catch (final IOException ec) { logger.warn("Failed to close", e); } } @@ -645,7 +647,7 @@ public class SiteToSiteRestApiClient implements Closeable { if (closeable != null) { closeable.close(); } - } catch (IOException e) { + } catch (final IOException e) { logger.warn("Got an exception during closing {}: {}", closeable, e.getMessage()); if (logger.isDebugEnabled()) { logger.warn("", e); @@ -653,7 +655,7 @@ public class SiteToSiteRestApiClient implements Closeable { } } - public TransactionResultEntity extendTransaction(String transactionUrl) throws IOException { + public TransactionResultEntity extendTransaction(final String transactionUrl) throws IOException { logger.debug("Sending extendTransaction request to transactionUrl: {}", transactionUrl); final HttpPut put = createPut(transactionUrl); @@ -663,15 +665,14 @@ public class SiteToSiteRestApiClient implements Closeable { setHandshakeProperties(put); - try (CloseableHttpResponse response = getHttpClient().execute(put)) { - int responseCode = response.getStatusLine().getStatusCode(); + try (final CloseableHttpResponse response = getHttpClient().execute(put)) { + final int responseCode = response.getStatusLine().getStatusCode(); logger.debug("extendTransaction responseCode={}", responseCode); - try (InputStream content = response.getEntity().getContent()) { + try (final InputStream content = response.getEntity().getContent()) { switch (responseCode) { - case RESPONSE_CODE_OK : + case RESPONSE_CODE_OK: return readResponse(content); - default: throw handleErrResponse(responseCode, content); } @@ -694,39 +695,41 @@ public class SiteToSiteRestApiClient implements Closeable { } private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException { - if(in == null) { + if (in == null) { return new IOException("Unexpected response code: " + responseCode); } - TransactionResultEntity errEntity = readResponse(in); - ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode()); + + final TransactionResultEntity errEntity = readResponse(in); + final ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode()); + switch (errCode) { case UNKNOWN_PORT: return new UnknownPortException(errEntity.getMessage()); case PORT_NOT_IN_VALID_STATE: return new PortNotRunningException(errEntity.getMessage()); default: - return new IOException("Unexpected response code: " + responseCode - + " errCode:" + errCode + " errMessage:" + errEntity.getMessage()); + return new IOException("Unexpected response code: " + responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage()); } } - private TransactionResultEntity readResponse(InputStream inputStream) throws IOException { + private TransactionResultEntity readResponse(final InputStream inputStream) throws IOException { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); StreamUtils.copy(inputStream, bos); String responseMessage = null; + try { responseMessage = new String(bos.toByteArray(), "UTF-8"); logger.debug("readResponse responseMessage={}", responseMessage); final ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(responseMessage, TransactionResultEntity.class); - } catch (JsonParseException | JsonMappingException e) { if (logger.isDebugEnabled()) { logger.debug("Failed to parse JSON.", e); } - TransactionResultEntity entity = new TransactionResultEntity(); + + final TransactionResultEntity entity = new TransactionResultEntity(); entity.setResponseCode(ResponseCode.ABORT.getCode()); entity.setMessage(responseMessage); return entity; @@ -736,90 +739,109 @@ public class SiteToSiteRestApiClient implements Closeable { private String readTransactionUrl(final CloseableHttpResponse response) { final Header locationUriIntentHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME); logger.debug("locationUriIntentHeader={}", locationUriIntentHeader); - if (locationUriIntentHeader != null) { - if (LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) { - Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME); - logger.debug("transactionUrl={}", transactionUrl); - if (transactionUrl != null) { - return transactionUrl.getValue(); - } + + if (locationUriIntentHeader != null && LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) { + final Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME); + logger.debug("transactionUrl={}", transactionUrl); + + if (transactionUrl != null) { + return transactionUrl.getValue(); } } + return null; } private void setHandshakeProperties(final HttpRequestBase httpRequest) { - if(compress) httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true"); - if(requestExpirationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis)); - if(batchCount > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount)); - if(batchSize > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize)); - if(batchDurationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis)); + if (compress) { + httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true"); + } + + if (requestExpirationMillis > 0) { + httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis)); + } + + if (batchCount > 0) { + httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount)); + } + + if (batchSize > 0) { + httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize)); + } + + if (batchDurationMillis > 0) { + httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis)); + } } - private HttpGet createGet(final String path) { - final URI url = getUri(path); - HttpGet get = new HttpGet(url); - get.setConfig(getRequestConfig()); - return get; - } - - private URI getUri(String path) { + private URI getUri(final String path) { final URI url; try { - if(HTTP_ABS_URL.matcher(path).find()){ + if (HTTP_ABS_URL.matcher(path).find()) { url = new URI(path); } else { - if(StringUtils.isEmpty(getBaseUrl())){ + if (StringUtils.isEmpty(getBaseUrl())) { throw new IllegalStateException("API baseUrl is not resolved yet, call setBaseUrl or resolveBaseUrl before sending requests with relative path."); } url = new URI(baseUrl + path); } - } catch (URISyntaxException e) { + } catch (final URISyntaxException e) { throw new IllegalArgumentException(e.getMessage()); } return url; } + + private HttpGet createGet(final String path) { + final URI url = getUri(path); + final HttpGet get = new HttpGet(url); + get.setConfig(getRequestConfig()); + return get; + } + private HttpPost createPost(final String path) { final URI url = getUri(path); - HttpPost post = new HttpPost(url); + final HttpPost post = new HttpPost(url); post.setConfig(getRequestConfig()); return post; } private HttpPut createPut(final String path) { final URI url = getUri(path); - HttpPut put = new HttpPut(url); + final HttpPut put = new HttpPut(url); put.setConfig(getRequestConfig()); return put; } private HttpDelete createDelete(final String path) { final URI url = getUri(path); - HttpDelete delete = new HttpDelete(url); + final HttpDelete delete = new HttpDelete(url); delete.setConfig(getRequestConfig()); return delete; } private String execute(final HttpGet get) throws IOException { + final CloseableHttpClient httpClient = getHttpClient(); - CloseableHttpClient httpClient = getHttpClient(); - try (CloseableHttpResponse response = httpClient.execute(get)) { - StatusLine statusLine = response.getStatusLine(); - int statusCode = statusLine.getStatusCode(); + try (final CloseableHttpResponse response = httpClient.execute(get)) { + final StatusLine statusLine = response.getStatusLine(); + final int statusCode = statusLine.getStatusCode(); if (RESPONSE_CODE_OK != statusCode) { throw new HttpGetFailedException(statusCode, statusLine.getReasonPhrase(), null); } - HttpEntity entity = response.getEntity(); - String responseMessage = EntityUtils.toString(entity); + final HttpEntity entity = response.getEntity(); + final String responseMessage = EntityUtils.toString(entity); return responseMessage; } } public class HttpGetFailedException extends IOException { + private static final long serialVersionUID = 7920714957269466946L; + private final int responseCode; private final String responseMessage; private final String explanation; + public HttpGetFailedException(final int responseCode, final String responseMessage, final String explanation) { super("response code " + responseCode + ":" + responseMessage + " with explanation: " + explanation); this.responseCode = responseCode; @@ -854,25 +876,25 @@ public class SiteToSiteRestApiClient implements Closeable { this.baseUrl = baseUrl; } - public void setConnectTimeoutMillis(int connectTimeoutMillis) { + public void setConnectTimeoutMillis(final int connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; } - public void setReadTimeoutMillis(int readTimeoutMillis) { + public void setReadTimeoutMillis(final int readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; } - public String resolveBaseUrl(String clusterUrl) { + public String resolveBaseUrl(final String clusterUrl) { URI clusterUri; try { clusterUri = new URI(clusterUrl); - } catch (URISyntaxException e) { + } catch (final URISyntaxException e) { throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e); } return this.resolveBaseUrl(clusterUri); } - public String resolveBaseUrl(URI clusterUrl) { + public String resolveBaseUrl(final URI clusterUrl) { String urlPath = clusterUrl.getPath(); if (urlPath.endsWith("/")) { urlPath = urlPath.substring(0, urlPath.length() - 1); @@ -884,33 +906,41 @@ public class SiteToSiteRestApiClient implements Closeable { return resolveBaseUrl(scheme, host, port, "/nifi-api"); } - public String resolveBaseUrl(final String scheme, final String host, final int port, String path) { - String baseUri = scheme + "://" + host + ":" + port + path; + public String resolveBaseUrl(final String scheme, final String host, final int port, final String path) { + final String baseUri = scheme + "://" + host + ":" + port + path; this.setBaseUrl(baseUri); return baseUri; } - public void setCompress(boolean compress) { + public void setCompress(final boolean compress) { this.compress = compress; } - public void setRequestExpirationMillis(long requestExpirationMillis) { - if(requestExpirationMillis < 0) throw new IllegalArgumentException("requestExpirationMillis can't be a negative value."); + public void setRequestExpirationMillis(final long requestExpirationMillis) { + if (requestExpirationMillis < 0) { + throw new IllegalArgumentException("requestExpirationMillis can't be a negative value."); + } this.requestExpirationMillis = requestExpirationMillis; } - public void setBatchCount(int batchCount) { - if(batchCount < 0) throw new IllegalArgumentException("batchCount can't be a negative value."); + public void setBatchCount(final int batchCount) { + if (batchCount < 0) { + throw new IllegalArgumentException("batchCount can't be a negative value."); + } this.batchCount = batchCount; } - public void setBatchSize(long batchSize) { - if(batchSize < 0) throw new IllegalArgumentException("batchSize can't be a negative value."); + public void setBatchSize(final long batchSize) { + if (batchSize < 0) { + throw new IllegalArgumentException("batchSize can't be a negative value."); + } this.batchSize = batchSize; } - public void setBatchDurationMillis(long batchDurationMillis) { - if(batchDurationMillis < 0) throw new IllegalArgumentException("batchDurationMillis can't be a negative value."); + public void setBatchDurationMillis(final long batchDurationMillis) { + if (batchDurationMillis < 0) { + throw new IllegalArgumentException("batchDurationMillis can't be a negative value."); + } this.batchDurationMillis = batchDurationMillis; } @@ -922,33 +952,33 @@ public class SiteToSiteRestApiClient implements Closeable { return this.trustedPeerDn; } - public TransactionResultEntity commitReceivingFlowFiles(String transactionUrl, ResponseCode clientResponse, String checksum) throws IOException { + public TransactionResultEntity commitReceivingFlowFiles(final String transactionUrl, final ResponseCode clientResponse, final String checksum) throws IOException { logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}", - transactionUrl, clientResponse, checksum); + transactionUrl, clientResponse, checksum); stopExtendingTtl(); - StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode()); + final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode()); if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) { urlBuilder.append("&checksum=").append(checksum); } - HttpDelete delete = createDelete(urlBuilder.toString()); + final HttpDelete delete = createDelete(urlBuilder.toString()); delete.setHeader("Accept", "application/json"); delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); setHandshakeProperties(delete); try (CloseableHttpResponse response = getHttpClient().execute(delete)) { - int responseCode = response.getStatusLine().getStatusCode(); + final int responseCode = response.getStatusLine().getStatusCode(); logger.debug("commitReceivingFlowFiles responseCode={}", responseCode); try (InputStream content = response.getEntity().getContent()) { switch (responseCode) { - case RESPONSE_CODE_OK : + case RESPONSE_CODE_OK: return readResponse(content); - case RESPONSE_CODE_BAD_REQUEST : + case RESPONSE_CODE_BAD_REQUEST: return readResponse(content); default: @@ -959,26 +989,26 @@ public class SiteToSiteRestApiClient implements Closeable { } - public TransactionResultEntity commitTransferFlowFiles(String transactionUrl, ResponseCode clientResponse) throws IOException { - String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode(); + public TransactionResultEntity commitTransferFlowFiles(final String transactionUrl, final ResponseCode clientResponse) throws IOException { + final String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode(); logger.debug("Sending commitTransferFlowFiles request to transactionUrl: {}", requestUrl); - HttpDelete delete = createDelete(requestUrl); + final HttpDelete delete = createDelete(requestUrl); delete.setHeader("Accept", "application/json"); delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); setHandshakeProperties(delete); try (CloseableHttpResponse response = getHttpClient().execute(delete)) { - int responseCode = response.getStatusLine().getStatusCode(); + final int responseCode = response.getStatusLine().getStatusCode(); logger.debug("commitTransferFlowFiles responseCode={}", responseCode); try (InputStream content = response.getEntity().getContent()) { switch (responseCode) { - case RESPONSE_CODE_OK : + case RESPONSE_CODE_OK: return readResponse(content); - case RESPONSE_CODE_BAD_REQUEST : + case RESPONSE_CODE_BAD_REQUEST: return readResponse(content); default: diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index ca820f84b1..4c0f0d6074 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -54,11 +54,11 @@ public class TestPeerSelector { @Test public void testFormulateDestinationListForOutput() throws IOException { final Set collection = new HashSet<>(); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096)); - collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240)); - collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024)); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096)); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240, true)); + collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024, true)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true)); PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); @@ -74,8 +74,8 @@ public class TestPeerSelector { @Test public void testFormulateDestinationListForOutputHugeDifference() throws IOException { final Set collection = new HashSet<>(); - collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500)); - collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000)); + collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500, true)); + collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000, true)); PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); @@ -90,11 +90,11 @@ public class TestPeerSelector { @Test public void testFormulateDestinationListForInputPorts() throws IOException { final Set collection = new HashSet<>(); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096)); - collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240)); - collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024)); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096)); - collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240, true)); + collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024, true)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true)); PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); @@ -110,8 +110,8 @@ public class TestPeerSelector { @Test public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { final Set collection = new HashSet<>(); - collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500)); - collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000)); + collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500, true)); + collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000, true)); PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java index e31832fbba..2a9b87c445 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java @@ -39,9 +39,6 @@ public class ConnectionResponse { private final int tryLaterSeconds; private final NodeIdentifier nodeIdentifier; private final DataFlow dataFlow; - private final Integer managerRemoteInputPort; - private final Integer managerRemoteInputHttpPort; - private final Boolean managerRemoteCommsSecure; private final String instanceId; private final List nodeStatuses; private final List componentRevisions; @@ -49,8 +46,7 @@ public class ConnectionResponse { private volatile String coordinatorDN; public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow, - final Integer managerRemoteInputPort, final Integer managerRemoteInputHttpPort, final Boolean managerRemoteCommsSecure, final String instanceId, - final List nodeStatuses, final List componentRevisions) { + final String instanceId, final List nodeStatuses, final List componentRevisions) { if (nodeIdentifier == null) { throw new IllegalArgumentException("Node identifier may not be empty or null."); @@ -61,9 +57,6 @@ public class ConnectionResponse { this.dataFlow = dataFlow; this.tryLaterSeconds = 0; this.rejectionReason = null; - this.managerRemoteInputPort = managerRemoteInputPort; - this.managerRemoteInputHttpPort = managerRemoteInputHttpPort; - this.managerRemoteCommsSecure = managerRemoteCommsSecure; this.instanceId = instanceId; this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses)); this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions)); @@ -77,9 +70,6 @@ public class ConnectionResponse { this.nodeIdentifier = null; this.tryLaterSeconds = tryLaterSeconds; this.rejectionReason = null; - this.managerRemoteInputPort = null; - this.managerRemoteInputHttpPort = null; - this.managerRemoteCommsSecure = null; this.instanceId = null; this.nodeStatuses = null; this.componentRevisions = null; @@ -90,9 +80,6 @@ public class ConnectionResponse { this.nodeIdentifier = null; this.tryLaterSeconds = 0; this.rejectionReason = rejectionReason; - this.managerRemoteInputPort = null; - this.managerRemoteInputHttpPort = null; - this.managerRemoteCommsSecure = null; this.instanceId = null; this.nodeStatuses = null; this.componentRevisions = null; @@ -130,18 +117,6 @@ public class ConnectionResponse { return nodeIdentifier; } - public Integer getManagerRemoteInputPort() { - return managerRemoteInputPort; - } - - public Integer getManagerRemoteInputHttpPort() { - return managerRemoteInputHttpPort; - } - - public Boolean isManagerRemoteCommsSecure() { - return managerRemoteCommsSecure; - } - public String getInstanceId() { return instanceId; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java index 9a53a7222d..513818b09c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java @@ -33,9 +33,6 @@ public class AdaptedConnectionResponse { private NodeIdentifier nodeIdentifier; private String rejectionReason; private int tryLaterSeconds; - private Integer managerRemoteInputPort; - private Integer managerRemoteInputHttpPort; - private Boolean managerRemoteCommsSecure; private String instanceId; private List nodeStatuses; private List componentRevisions; @@ -81,30 +78,6 @@ public class AdaptedConnectionResponse { return tryLaterSeconds > 0; } - public void setManagerRemoteInputPort(Integer managerRemoteInputPort) { - this.managerRemoteInputPort = managerRemoteInputPort; - } - - public Integer getManagerRemoteInputPort() { - return managerRemoteInputPort; - } - - public void setManagerRemoteInputHttpPort(Integer managerRemoteInputHttpPort) { - this.managerRemoteInputHttpPort = managerRemoteInputHttpPort; - } - - public Integer getManagerRemoteInputHttpPort() { - return managerRemoteInputHttpPort; - } - - public void setManagerRemoteCommsSecure(Boolean secure) { - this.managerRemoteCommsSecure = secure; - } - - public Boolean isManagerRemoteCommsSecure() { - return managerRemoteCommsSecure; - } - public void setInstanceId(String instanceId) { this.instanceId = instanceId; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java index cf64e71a56..470843e4bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java @@ -31,9 +31,6 @@ public class ConnectionResponseAdapter extends XmlAdapter nodeStatuses; private List componentRevisions; @@ -75,30 +72,6 @@ public class ReconnectionRequestMessage extends ProtocolMessage { return MessageType.RECONNECTION_REQUEST; } - public void setManagerRemoteSiteListeningPort(final Integer listeningPort) { - this.managerRemoteSiteListeningPort = listeningPort; - } - - public Integer getManagerRemoteSiteListeningPort() { - return managerRemoteSiteListeningPort; - } - - public void setManagerRemoteSiteListeningHttpPort(Integer managerRemoteSiteListeningHttpPort) { - this.managerRemoteSiteListeningHttpPort = managerRemoteSiteListeningHttpPort; - } - - public Integer getManagerRemoteSiteListeningHttpPort() { - return managerRemoteSiteListeningHttpPort; - } - - public void setManagerRemoteSiteCommsSecure(final Boolean remoteSiteCommsSecure) { - this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure; - } - - public Boolean isManagerRemoteSiteCommsSecure() { - return managerRemoteSiteCommsSecure; - } - public void setInstanceId(final String instanceId) { this.instanceId = instanceId; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java index cb51edabb3..955df170db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java @@ -49,7 +49,7 @@ public class TestJaxbProtocolUtils { final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); final List nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); final List componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1"))); - msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 9990, 8080, false, "instance-1", nodeStatuses, componentRevisions)); + msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisions)); JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos); final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 1b410d6287..d336558c8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -766,10 +766,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return new ConnectionResponse(tryAgainSeconds); } - // TODO: Remove the 'null' values here from the ConnectionResponse all together. These - // will no longer be needed for site-to-site once the NCM is gone. - return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, null, instanceId, - new ArrayList<>(nodeStatuses.values()), + return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()), revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 6fbb88c58d..4f860016c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote.protocol; import java.io.IOException; +import java.util.Optional; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; @@ -24,6 +25,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -131,9 +133,11 @@ public interface ServerProtocol extends VersionedRemoteResource { * a cluster, sends info about itself * * @param peer peer + * @param clusterNodeInfo the cluster information + * * @throws java.io.IOException ioe */ - void sendPeerList(Peer peer) throws IOException; + void sendPeerList(Peer peer, Optional clusterNodeInfo) throws IOException; void shutdown(Peer peer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java new file mode 100644 index 0000000000..9f8439cb33 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ClusterCoordinatorNodeInformant.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.cluster.NodeInformation; + +public class ClusterCoordinatorNodeInformant implements NodeInformant { + private final ClusterCoordinator clusterCoordinator; + + public ClusterCoordinatorNodeInformant(final ClusterCoordinator coordinator) { + this.clusterCoordinator = coordinator; + } + + @Override + public ClusterNodeInformation getNodeInformation() { + final List nodeInfoCollection = new ArrayList<>(); + final Set nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED); + + // TODO: Get total number of FlowFiles for each node + for (final NodeIdentifier nodeId : nodeIds) { + final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), + nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0); + nodeInfoCollection.add(nodeInfo); + } + + final ClusterNodeInformation nodeInfo = new ClusterNodeInformation(); + nodeInfo.setNodeInformation(nodeInfoCollection); + return nodeInfo; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index fd2f588967..ac004e1fda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -198,6 +198,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRootGroupPort; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; import org.apache.nifi.reporting.Bulletin; @@ -298,9 +299,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final Integer remoteInputSocketPort; private final Integer remoteInputHttpPort; private final Boolean isSiteToSiteSecure; - private Integer clusterManagerRemoteSitePort = null; - private Integer clusterManagerRemoteSiteHttpPort = null; - private Boolean clusterManagerRemoteSiteCommsSecure = null; private ProcessGroup rootGroup; private final List startConnectablesAfterInitialization; @@ -411,8 +409,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R clusterCoordinator, heartbeatMonitor); - flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); - return flowController; } @@ -525,11 +521,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } else { // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); - externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null)); + + final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null; + externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant)); } if (remoteInputHttpPort == null) { - LOG.info("Not enabling HTTP(S) Site-to-Site functionality because nifi.remote.input.html.enabled is not true"); + LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true"); } else { externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); } @@ -3895,45 +3893,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new ArrayList<>(history.getActions()); } - public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Integer managerListeningHttpPort, final Boolean commsSecure) { - writeLock.lock(); - try { - clusterManagerRemoteSitePort = managerListeningPort; - clusterManagerRemoteSiteHttpPort = managerListeningHttpPort; - clusterManagerRemoteSiteCommsSecure = commsSecure; - } finally { - writeLock.unlock(); - } - } - - public Integer getClusterManagerRemoteSiteListeningPort() { - readLock.lock(); - try { - return clusterManagerRemoteSitePort; - } finally { - readLock.unlock(); - } - } - - - public Integer getClusterManagerRemoteSiteListeningHttpPort() { - readLock.lock(); - try { - return clusterManagerRemoteSiteHttpPort; - } finally { - readLock.unlock(); - } - } - - public Boolean isClusterManagerRemoteSiteCommsSecure() { - readLock.lock(); - try { - return clusterManagerRemoteSiteCommsSecure; - } finally { - readLock.unlock(); - } - } - public Integer getRemoteSiteListeningPort() { return remoteInputSocketPort; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 801a4e2ae1..49f32c7150 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -467,7 +467,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setClustered(true, null); clusterCoordinator.setConnected(false); - controller.setClusterManagerRemoteSiteInfo(null, null, null); controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); /* @@ -586,9 +585,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // reconnect final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), - request.getManagerRemoteSiteListeningPort(), request.getManagerRemoteSiteListeningHttpPort(), - request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(), - request.getNodeConnectionStatuses(), request.getComponentRevisions()); + request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); connectionResponse.setCoordinatorDN(request.getRequestorDN()); loadFromConnectionResponse(connectionResponse); @@ -853,7 +850,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // mark the node as clustered controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN()); - controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure()); final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); final Set roles = status == null ? Collections.emptySet() : status.getRoles(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index fb9da32f76..a5f66ce57c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -1138,9 +1138,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void run() { - try ( - final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient(); - ){ + try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()){ try { final ControllerDTO dto = apiClient.getController(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java index 7de36c8b77..fb0ce7cf25 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java @@ -44,6 +44,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex private AuditService auditService; private StringEncryptor encryptor; private BulletinRepository bulletinRepository; + private ClusterCoordinator clusterCoordinator; @Override public Object getObject() throws Exception { @@ -53,7 +54,6 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex if (properties.isNode()) { final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class); final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class); - final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class); flowController = FlowController.createClusteredInstance( flowFileEventRepository, properties, @@ -114,4 +114,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex this.bulletinRepository = bulletinRepository; } + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { + this.clusterCoordinator = clusterCoordinator; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index 1004baf40a..3cd51596d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -40,6 +40,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java index acf7fc5717..08fb188389 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java @@ -47,7 +47,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { private final Map transactions = new ConcurrentHashMap<>(); private final ScheduledExecutorService taskExecutor; - private final int httpListenPort; private ProcessGroup rootGroup; private ScheduledFuture transactionMaintenanceTask; @@ -76,9 +75,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec); } transactionTtlSec = txTtlSec; - - httpListenPort = properties.getRemoteInputHttpPort() != null ? properties.getRemoteInputHttpPort() : 0; - } public static HttpRemoteSiteListener getInstance() { @@ -130,9 +126,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { try { Set transactionIds = transactions.keySet().stream().collect(Collectors.toSet()); transactionIds.stream().filter(tid -> !isTransactionActive(tid)) - .forEach(tid -> { - cancelTransaction(tid); - }); + .forEach(tid -> cancelTransaction(tid)); } catch (Exception e) { // Swallow exception so that this thread can keep working. logger.error("An exception occurred while maintaining transactions", e); @@ -161,10 +155,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { } } - @Override - public int getPort() { - return httpListenPort; - } @Override public void stop() { @@ -225,7 +215,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener { return transaction.transaction; } - public void extendsTransaction(final String transactionId) throws IllegalStateException { + public void extendTransaction(final String transactionId) throws IllegalStateException { if (!isTransactionActive(transactionId)){ throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java index 6f7b977059..1183fc5527 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java @@ -26,8 +26,5 @@ public interface RemoteSiteListener { void start() throws IOException; - int getPort(); - void stop(); - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index a5d4bbe588..814d0e6fdf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -43,6 +43,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -266,7 +267,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { protocol.getPort().receiveFlowFiles(peer, protocol); break; case REQUEST_PEER_LIST: - protocol.sendPeerList(peer); + protocol.sendPeerList(peer, nodeInformant == null ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation())); break; case SHUTDOWN: protocol.shutdown(peer); @@ -321,8 +322,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { listenerThread.start(); } - @Override - public int getPort() { + private int getPort() { return socketPort; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java similarity index 95% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java index f187625d27..c4f1f5c851 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java @@ -20,6 +20,7 @@ import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; @@ -37,8 +38,9 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Optional; -public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol { +public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol { public static final String RESOURCE_NAME = "HttpFlowFileProtocol"; @@ -46,7 +48,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc private final VersionNegotiator versionNegotiator; private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); - public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) { + public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { super(); this.versionNegotiator = versionNegotiator; } @@ -176,6 +178,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc return holdTransaction(peer, transaction); } + @Override public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException { logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum); HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); @@ -191,6 +194,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc return holdTransaction(peer, transaction); } + @Override public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException { logger.debug("{} Committing the receive transaction. peer={}", this, peer); HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); @@ -211,13 +215,11 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc } @Override - public void sendPeerList(final Peer peer) throws IOException { + public void sendPeerList(final Peer peer, final Optional clusterNodeInformation) throws IOException { } @Override public String getResourceName() { return RESOURCE_NAME; } - - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java deleted file mode 100644 index af6860baa5..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.RootGroupPort; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformant; -import org.apache.nifi.remote.cluster.NodeInformation; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.HandshakeProperty; -import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.remote.protocol.ResponseCode; -import org.apache.nifi.remote.protocol.ServerProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ClusterManagerServerProtocol implements ServerProtocol { - - public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; - - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); - private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class); - private NodeInformant nodeInformant; - - private String commsIdentifier; - private boolean shutdown = false; - private boolean handshakeCompleted = false; - private long requestExpirationMillis = 30000L; - - public ClusterManagerServerProtocol() { - } - - @Override - public void setNodeInformant(final NodeInformant nodeInformant) { - this.nodeInformant = nodeInformant; - } - - @Override - public void handshake(final Peer peer) throws IOException, HandshakeException { - if (handshakeCompleted) { - throw new IllegalStateException("Handshake has already been completed"); - } - if (shutdown) { - throw new IllegalStateException("Protocol is shutdown"); - } - - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - // read communications identifier - commsIdentifier = dis.readUTF(); - - // read all of the properties. we don't really care what the properties are. - final int numProperties = dis.readInt(); - for (int i = 0; i < numProperties; i++) { - final String propertyName = dis.readUTF(); - final String propertyValue = dis.readUTF(); - - final HandshakeProperty property; - try { - property = HandshakeProperty.valueOf(propertyName); - if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) { - requestExpirationMillis = Long.parseLong(propertyValue); - } - } catch (final Exception e) { - } - } - - // send "OK" response - ResponseCode.PROPERTIES_OK.writeResponse(dos); - - logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier); - handshakeCompleted = true; - } - - @Override - public boolean isHandshakeSuccessful() { - return handshakeCompleted; - } - - @Override - public void sendPeerList(final Peer peer) throws IOException { - if (!handshakeCompleted) { - throw new IllegalStateException("Handshake has not been completed"); - } - if (shutdown) { - throw new IllegalStateException("Protocol is shutdown"); - } - - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation(); - final Collection nodeInfos = clusterNodeInfo.getNodeInformation(); - - // determine how many nodes have Site-to-site enabled - int numPeers = 0; - for (final NodeInformation nodeInfo : nodeInfos) { - if (nodeInfo.getSiteToSitePort() != null) { - numPeers++; - } - } - - dos.writeInt(numPeers); - for (final NodeInformation nodeInfo : nodeInfos) { - if (nodeInfo.getSiteToSitePort() == null) { - continue; - } - - dos.writeUTF(nodeInfo.getSiteToSiteHostname()); - dos.writeInt(nodeInfo.getSiteToSitePort()); - dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); - dos.writeInt(nodeInfo.getTotalFlowFiles()); - } - - logger.info("Redirected {} to {} nodes", peer, numPeers); - - dos.flush(); - } - - @Override - public void shutdown(final Peer peer) { - shutdown = true; - } - - @Override - public boolean isShutdown() { - return shutdown; - } - - @Override - public FlowFileCodec negotiateCodec(Peer peer) { - throw new UnsupportedOperationException(); - } - - @Override - public FlowFileCodec getPreNegotiatedCodec() { - return null; - } - - @Override - public RequestType getRequestType(final Peer peer) throws IOException { - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - return RequestType.readRequestType(dis); - } - - @Override - public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public String getResourceName() { - return RESOURCE_NAME; - } - - @Override - public void setRootProcessGroup(final ProcessGroup rootGroup) { - } - - @Override - public RootGroupPort getPort() { - return null; - } - - @Override - public long getRequestExpiration() { - return requestExpirationMillis; - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index a2a72234da..fe7d16319e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -20,6 +20,8 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.RemoteResourceFactory; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; @@ -34,14 +36,19 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol { public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); + // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0 + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1); @Override protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException { @@ -147,7 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol } @Override - public void sendPeerList(final Peer peer) throws IOException { + public void sendPeerList(final Peer peer, final Optional clusterNodeInfo) throws IOException { if (!handshakeCompleted) { throw new IllegalStateException("Handshake has not been completed"); } @@ -167,12 +174,36 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol } logger.debug("{} Advertising Remote Input host name {}", this, peer); - // we have only 1 peer: ourselves. - dos.writeInt(1); - dos.writeUTF(remoteInputHost); - dos.writeInt(properties.getRemoteInputPort()); - dos.writeBoolean(properties.isSiteToSiteSecure()); - dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host. + List nodeInfos; + if (clusterNodeInfo.isPresent()) { + nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation()); + } else { + final NodeInformation self = new NodeInformation(remoteInputHost, properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.getRemoteInputHttpPort(), + properties.isSiteToSiteSecure(), 0); + nodeInfos = Collections.singletonList(self); + } + + // determine how many nodes have Site-to-site enabled + int numPeers = 0; + for (final NodeInformation nodeInfo : nodeInfos) { + if (nodeInfo.getSiteToSitePort() != null) { + numPeers++; + } + } + + dos.writeInt(numPeers); + for (final NodeInformation nodeInfo : nodeInfos) { + if (nodeInfo.getSiteToSitePort() == null) { + continue; + } + + dos.writeUTF(nodeInfo.getSiteToSiteHostname()); + dos.writeInt(nodeInfo.getSiteToSitePort()); + dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); + dos.writeInt(nodeInfo.getTotalFlowFiles()); + } + + logger.info("Sending list of {} peers back to client {}", numPeers, peer); dos.flush(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol index fe2182fe5e..67a7a9c7e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/resources/META-INF/services/org.apache.nifi.remote.protocol.ServerProtocol @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol -org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol \ No newline at end of file + +org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java index a8900c9458..4519ddd254 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java @@ -80,7 +80,7 @@ public class TestHttpFlowFileServerProtocol { final PeerDescription description = new PeerDescription("peer-host", 8080, false); final InputStream inputStream = new ByteArrayInputStream(new byte[]{}); final OutputStream outputStream = new ByteArrayOutputStream(); - final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId); + final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, "user"); commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false"); commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, "1234"); final String peerUrl = "http://peer-host:8080/"; @@ -90,7 +90,7 @@ public class TestHttpFlowFileServerProtocol { private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() { final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); - return new HttpFlowFileServerProtocolImpl(versionNegotiator); + return new StandardHttpFlowFileServerProtocol(versionNegotiator); } @Test @@ -101,7 +101,7 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.handshake(peer); fail(); - } catch (HandshakeException e) { + } catch (final HandshakeException e) { assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode()); } @@ -122,7 +122,7 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.handshake(peer); fail(); - } catch (HandshakeException e) { + } catch (final HandshakeException e) { assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode()); } @@ -147,7 +147,7 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.handshake(peer); fail(); - } catch (HandshakeException e) { + } catch (final HandshakeException e) { assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode()); } @@ -173,7 +173,7 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.handshake(peer); fail(); - } catch (HandshakeException e) { + } catch (final HandshakeException e) { assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode()); } @@ -196,7 +196,7 @@ public class TestHttpFlowFileServerProtocol { doReturn(true).when(authResult).isAuthorized(); doReturn(true).when(port).isValid(); doReturn(true).when(port).isRunning(); - Set connections = new HashSet<>(); + final Set connections = new HashSet<>(); final Connection connection = mock(Connection.class); connections.add(connection); doReturn(connections).when(port).getConnections(); @@ -208,7 +208,7 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.handshake(peer); fail(); - } catch (HandshakeException e) { + } catch (final HandshakeException e) { assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode()); } @@ -237,13 +237,13 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); fail("transferFlowFiles should fail since it's already shutdown."); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { } try { serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); fail("receiveFlowFiles should fail since it's already shutdown."); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { } } @@ -288,12 +288,12 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum"); fail(); - } catch (IOException e) { + } catch (final IOException e) { assertTrue(e.getMessage().contains("CRC32 Checksum")); } } - private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId) throws IOException { + private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); final Peer peer = getDefaultPeer(transactionId); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); @@ -312,21 +312,21 @@ public class TestHttpFlowFileServerProtocol { doReturn(flowFile).when(processSession).get(); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - String peerUrl = (String)invocation.getArguments()[1]; - String detail = (String)invocation.getArguments()[2]; + final String peerUrl = (String)invocation.getArguments()[1]; + final String detail = (String)invocation.getArguments()[2]; assertEquals("http://peer-host:8080/", peerUrl); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); doAnswer(invocation -> { - InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; + final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; callback.process(new java.io.ByteArrayInputStream("Server content".getBytes())); return null; }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class)); // Execute test using mock - int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); + final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded); assertEquals(1, flowFileSent); assertTrue(remoteSiteListener.isTransactionActive(transactionId)); @@ -360,8 +360,8 @@ public class TestHttpFlowFileServerProtocol { doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - String peerUrl = (String)invocation.getArguments()[1]; - String detail = (String)invocation.getArguments()[2]; + final String peerUrl = (String)invocation.getArguments()[1]; + final String detail = (String)invocation.getArguments()[2]; assertEquals("http://peer-host:8080/", peerUrl); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; @@ -369,15 +369,15 @@ public class TestHttpFlowFileServerProtocol { doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - String peerUrl = (String)invocation.getArguments()[1]; - String detail = (String)invocation.getArguments()[2]; + final String peerUrl = (String)invocation.getArguments()[1]; + final String detail = (String)invocation.getArguments()[2]; assertEquals("http://peer-host:8080/", peerUrl); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class)); doAnswer(invocation -> { - InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; + final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1]; callback.process(new java.io.ByteArrayInputStream("Server content".getBytes())); return null; }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class)); @@ -397,7 +397,7 @@ public class TestHttpFlowFileServerProtocol { final String contents = "Content from client."; final byte[] bytes = contents.getBytes(); final InputStream in = new ByteArrayInputStream(bytes); - Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("client-attr-1", "client-attr-1-value"); attributes.put("client-attr-2", "client-attr-2-value"); return new StandardDataPacket(attributes, in, bytes.length); @@ -458,12 +458,12 @@ public class TestHttpFlowFileServerProtocol { try { serverProtocol.commitReceiveTransaction(peer); fail(); - } catch (IOException e) { + } catch (final IOException e) { assertTrue(e.getMessage().contains("Received a BadChecksum response")); } } - private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId, Peer peer) throws IOException { + private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException { final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(); final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1"); @@ -479,7 +479,7 @@ public class TestHttpFlowFileServerProtocol { final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class); final FlowFile flowFile = mock(FlowFile.class); - DataPacket dataPacket = createClientDataPacket(); + final DataPacket dataPacket = createClientDataPacket(); final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream(); negotiatedCoded.encode(dataPacket, testDataOs); @@ -488,7 +488,7 @@ public class TestHttpFlowFileServerProtocol { ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); doAnswer(invocation -> { - InputStream is = (InputStream) invocation.getArguments()[0]; + final InputStream is = (InputStream) invocation.getArguments()[0]; for (int b; (b = is.read()) >= 0;) { // consume stream. } @@ -499,21 +499,21 @@ public class TestHttpFlowFileServerProtocol { doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - String peerUrl = (String)invocation.getArguments()[1]; - String detail = (String)invocation.getArguments()[3]; + final String peerUrl = (String)invocation.getArguments()[1]; + final String detail = (String)invocation.getArguments()[3]; assertEquals("http://peer-host:8080/", peerUrl); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter) .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class)); - Set relations = new HashSet<>(); + final Set relations = new HashSet<>(); final Relationship relationship = new Relationship.Builder().build(); relations.add(relationship); doReturn(relations).when(context).getAvailableRelationships(); // Execute test using mock - int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); + final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded); assertEquals(1, flowFileReceived); assertTrue(remoteSiteListener.isTransactionActive(transactionId)); @@ -549,7 +549,7 @@ public class TestHttpFlowFileServerProtocol { ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream); doAnswer(invocation -> { - InputStream is = (InputStream) invocation.getArguments()[0]; + final InputStream is = (InputStream) invocation.getArguments()[0]; for (int b; (b = is.read()) >= 0;) { // consume stream. } @@ -562,15 +562,15 @@ public class TestHttpFlowFileServerProtocol { .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class)); doReturn(provenanceReporter).when(processSession).getProvenanceReporter(); doAnswer(invocation -> { - String peerUrl = (String)invocation.getArguments()[1]; - String detail = (String)invocation.getArguments()[3]; + final String peerUrl = (String)invocation.getArguments()[1]; + final String detail = (String)invocation.getArguments()[3]; assertEquals("http://peer-host:8080/", peerUrl); assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail); return null; }).when(provenanceReporter) .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class)); - Set relations = new HashSet<>(); + final Set relations = new HashSet<>(); doReturn(relations).when(context).getAvailableRelationships(); // Execute test using mock diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 31087c9308..892718e069 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,28 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -194,26 +215,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -2157,15 +2159,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerDTO.setDisabledCount(counts.getDisabledCount()); // determine the site to site configuration - if (isClustered()) { - controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort()); - controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort()); - controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure()); - } else { - controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); - controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); - controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); - } + controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort()); + controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort()); + controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure()); return controllerDTO; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index aad8b4a3ed..e77d76998d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -49,7 +49,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; import org.apache.nifi.remote.protocol.HandshakeProperty; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; -import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl; +import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.web.api.entity.TransactionResultEntity; import org.slf4j.Logger; @@ -305,16 +305,18 @@ public class DataTransferResource extends ApplicationResource { } HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { - return new HttpFlowFileServerProtocolImpl(versionNegotiator); + return new StandardHttpFlowFileServerProtocol(versionNegotiator); } private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) { - String clientHostName = req.getRemoteHost(); - int clientPort = req.getRemotePort(); + final String clientHostName = req.getRemoteHost(); + final int clientPort = req.getRemotePort(); - PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure()); + final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure()); - HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final String userDn = user == null ? null : user.getIdentity(); + final HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, userDn); boolean useCompression = false; final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION); @@ -330,20 +332,28 @@ public class DataTransferResource extends ApplicationResource { commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId); commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression)); - if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration); - if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount); - if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize); - if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration); + if (!isEmpty(requestExpiration)) { + commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration); + } + if (!isEmpty(batchCount)) { + commSession.putHandshakeParam(BATCH_COUNT, batchCount); + } + if (!isEmpty(batchSize)) { + commSession.putHandshakeParam(BATCH_SIZE, batchSize); + } + if (!isEmpty(batchDuration)) { + commSession.putHandshakeParam(BATCH_DURATION, batchDuration); + } if(peerDescription.isSecure()){ - NiFiUser nifiUser = NiFiUserUtils.getNiFiUser(); + final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser(); logger.debug("initiating peer, nifiUser={}", nifiUser); commSession.setUserDn(nifiUser.getIdentity()); } // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl. - String peerUrl = "nifi://" + clientHostName + ":" + clientPort; - String clusterUrl = "nifi://localhost:" + req.getLocalPort(); + final String peerUrl = "nifi://" + clientHostName + ":" + clientPort; + final String clusterUrl = "nifi://localhost:" + req.getLocalPort(); return new Peer(peerDescription, commSession, peerUrl, clusterUrl); } @@ -771,7 +781,7 @@ public class DataTransferResource extends ApplicationResource { try { // Do handshake initiateServerProtocol(peer, transportProtocolVersion); - transactionManager.extendsTransaction(transactionId); + transactionManager.extendTransaction(transactionId); final TransactionResultEntity entity = new TransactionResultEntity(); entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java index 0565f94730..d16c6269cc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java @@ -16,11 +16,22 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AuthorizationRequest; @@ -31,6 +42,9 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; @@ -44,18 +58,11 @@ import org.apache.nifi.web.api.entity.PeersEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.util.ArrayList; - -import static org.apache.commons.lang3.StringUtils.isEmpty; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a SiteToSite connection. @@ -70,7 +77,11 @@ public class SiteToSiteResource extends ApplicationResource { private static final Logger logger = LoggerFactory.getLogger(SiteToSiteResource.class); private NiFiServiceFacade serviceFacade; + private ClusterCoordinator clusterCoordinator; private Authorizer authorizer; + public static final String CHECK_SUM = "checksum"; + public static final String RESPONSE_CODE = "responseCode"; + private final ResponseCreator responseCreator = new ResponseCreator(); private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); @@ -147,6 +158,7 @@ public class SiteToSiteResource extends ApplicationResource { return clusterContext(noCache(Response.ok(entity))).build(); } + /** * Returns the available Peers and its status of this NiFi. * @@ -185,52 +197,42 @@ public class SiteToSiteResource extends ApplicationResource { return responseCreator.badRequestResponse(e); } - ArrayList peers; - + final List peers = new ArrayList<>(); if (properties.isNode()) { - return responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is only accessible on NCM or Standalone NiFi instance."); - // TODO: NCM no longer exists. - /* - } else if (properties.isClusterManager()) { - ClusterNodeInformation clusterNodeInfo = clusterManager.getNodeInformation(); - final Collection nodeInfos = clusterNodeInfo.getNodeInformation(); - peers = new ArrayList<>(nodeInfos.size()); - for (NodeInformation nodeInfo : nodeInfos) { - if (nodeInfo.getSiteToSiteHttpApiPort() == null) { - continue; - } - PeerDTO peer = new PeerDTO(); - peer.setHostname(nodeInfo.getSiteToSiteHostname()); - peer.setPort(nodeInfo.getSiteToSiteHttpApiPort()); - peer.setSecure(nodeInfo.isSiteToSiteSecure()); - peer.setFlowFileCount(nodeInfo.getTotalFlowFiles()); + final Set nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED); + + // TODO: Get total number of FlowFiles for each node + for (final NodeIdentifier nodeId : nodeIds) { + final PeerDTO peer = new PeerDTO(); + final String siteToSiteAddress = nodeId.getSiteToSiteAddress(); + peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress); + peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort()); + peer.setSecure(nodeId.isSiteToSiteSecure()); + peer.setFlowFileCount(0); peers.add(peer); } - */ } else { // Standalone mode. - PeerDTO peer = new PeerDTO(); + final PeerDTO peer = new PeerDTO(); // req.getLocalName returns private IP address, that can't be accessed from client in some environments. // So, use the value defined in nifi.properties instead when it is defined. - String remoteInputHost = properties.getRemoteInputHost(); + final String remoteInputHost = properties.getRemoteInputHost(); peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost); peer.setPort(properties.getRemoteInputHttpPort()); peer.setSecure(properties.isSiteToSiteSecure()); peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host. - peers = new ArrayList<>(1); peers.add(peer); - } - PeersEntity entity = new PeersEntity(); + final PeersEntity entity = new PeersEntity(); entity.setPeers(peers); return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build(); } // setters - public void setServiceFacade(NiFiServiceFacade serviceFacade) { + public void setServiceFacade(final NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; } @@ -238,4 +240,9 @@ public class SiteToSiteResource extends ApplicationResource { this.authorizer = authorizer; } + @Override + public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { + super.setClusterCoordinator(clusterCoordinator); + this.clusterCoordinator = clusterCoordinator; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 62eca51fb3..4161657a94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -677,39 +677,6 @@ public class ControllerFacade implements Authorizable { flowService.saveFlowChanges(TimeUnit.SECONDS, writeDelaySeconds); } - /** - * Returns the socket port that the Cluster Manager is listening on for - * Site-to-Site communications - * - * @return the socket port that the Cluster Manager is listening on for - * Site-to-Site communications - */ - public Integer getClusterManagerRemoteSiteListeningPort() { - return flowController.getClusterManagerRemoteSiteListeningPort(); - } - - /** - * Returns the http(s) port that the Cluster Manager is listening on for - * Site-to-Site communications - * - * @return the socket port that the Cluster Manager is listening on for - * Site-to-Site communications - */ - public Integer getClusterManagerRemoteSiteListeningHttpPort() { - return flowController.getClusterManagerRemoteSiteListeningHttpPort(); - } - - /** - * Indicates whether or not Site-to-Site communications with the Cluster - * Manager are secure - * - * @return whether or not Site-to-Site communications with the Cluster - * Manager are secure - */ - public Boolean isClusterManagerRemoteSiteCommsSecure() { - return flowController.isClusterManagerRemoteSiteCommsSecure(); - } - /** * Returns the socket port that the local instance is listening on for * Site-to-Site communications diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js index c1fc9561ad..8522d5afa6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection-configuration.js @@ -1334,7 +1334,7 @@ nf.ConnectionConfiguration = (function () { }); // store the connection details - $('#connection-uri').val(connection.uri); + $('#connection-uri').val(connectionEntry.uri); // configure the button model $('#connection-configuration').modal('setButtonModel', [{