mirror of https://github.com/apache/nifi.git
NIFI-1992:
- Updated site-to-site client and server to support clustered nifi instances NIFI-2274: - Ensuring we use the correct URI when updating a connection. This closes #530
This commit is contained in:
parent
b67c9b6f76
commit
c81dc1959a
|
@ -20,10 +20,12 @@ public class PeerStatus {
|
|||
|
||||
private final PeerDescription description;
|
||||
private final int numFlowFiles;
|
||||
private final boolean queryForPeers;
|
||||
|
||||
public PeerStatus(final PeerDescription description, final int numFlowFiles) {
|
||||
public PeerStatus(final PeerDescription description, final int numFlowFiles, final boolean queryForPeers) {
|
||||
this.description = description;
|
||||
this.numFlowFiles = numFlowFiles;
|
||||
this.queryForPeers = queryForPeers;
|
||||
}
|
||||
|
||||
public PeerDescription getPeerDescription() {
|
||||
|
@ -34,6 +36,13 @@ public class PeerStatus {
|
|||
return numFlowFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return <code>true</code> if this node can be queried for its peers, <code>false</code> otherwise.
|
||||
*/
|
||||
public boolean isQueryForPeers() {
|
||||
return queryForPeers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort()
|
||||
|
|
|
@ -97,7 +97,7 @@ public class PeerSelector {
|
|||
|
||||
for (final PeerStatus status : statuses) {
|
||||
final PeerDescription description = status.getPeerDescription();
|
||||
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + "\n";
|
||||
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
|
||||
out.write(line.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ public class PeerSelector {
|
|||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
final String[] splits = line.split(Pattern.quote(":"));
|
||||
if (splits.length != 3) {
|
||||
if (splits.length != 3 && splits.length != 4) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,9 @@ public class PeerSelector {
|
|||
final int port = Integer.parseInt(splits[1]);
|
||||
final boolean secure = Boolean.parseBoolean(splits[2]);
|
||||
|
||||
statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1));
|
||||
final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
|
||||
|
||||
statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,7 +174,7 @@ public class PeerSelector {
|
|||
final int index = n % destinations.size();
|
||||
PeerStatus status = destinations.get(index);
|
||||
if (status == null) {
|
||||
status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount());
|
||||
status = new PeerStatus(nodeInfo.getPeerDescription(), nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
|
||||
destinations.set(index, status);
|
||||
break;
|
||||
} else {
|
||||
|
@ -306,7 +308,7 @@ public class PeerSelector {
|
|||
if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) {
|
||||
final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size());
|
||||
for (final PeerStatus status : cache.getStatuses()) {
|
||||
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1);
|
||||
final PeerStatus equalizedStatus = new PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
|
||||
equalizedSet.add(equalizedStatus);
|
||||
}
|
||||
|
||||
|
|
|
@ -93,45 +93,44 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
|||
final URI clusterUrl;
|
||||
try {
|
||||
clusterUrl = new URI(config.getUrl());
|
||||
} catch (URISyntaxException e) {
|
||||
} catch (final URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
|
||||
}
|
||||
|
||||
try (
|
||||
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())
|
||||
) {
|
||||
String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
|
||||
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
|
||||
final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
|
||||
|
||||
int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
||||
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
||||
apiClient.setConnectTimeoutMillis(timeoutMillis);
|
||||
apiClient.setReadTimeoutMillis(timeoutMillis);
|
||||
Collection<PeerDTO> peers = apiClient.getPeers();
|
||||
|
||||
final Collection<PeerDTO> peers = apiClient.getPeers();
|
||||
if(peers == null || peers.size() == 0){
|
||||
throw new IOException("Couldn't get any peer to communicate with. " + clusterApiUrl + " returned zero peers.");
|
||||
}
|
||||
|
||||
return peers.stream()
|
||||
.map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount()))
|
||||
// Convert the PeerDTO's to PeerStatus objects. Use 'true' for the query-peer-for-peers flag because Site-to-Site over HTTP
|
||||
// was added in NiFi 1.0.0, which means that peer-to-peer queries are always allowed.
|
||||
return peers.stream().map(p -> new PeerStatus(new PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), p.getFlowFileCount(), true))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
|
||||
|
||||
int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
||||
public Transaction createTransaction(final TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException {
|
||||
final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
|
||||
|
||||
PeerStatus peerStatus;
|
||||
while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != null) {
|
||||
logger.debug("peerStatus={}", peerStatus);
|
||||
|
||||
CommunicationsSession commSession = new HttpCommunicationsSession();
|
||||
String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
|
||||
final CommunicationsSession commSession = new HttpCommunicationsSession();
|
||||
final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
|
||||
commSession.setUri(nodeApiUrl);
|
||||
String clusterUrl = config.getUrl();
|
||||
Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
|
||||
final String clusterUrl = config.getUrl();
|
||||
final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
|
||||
|
||||
int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
|
||||
final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
|
||||
String portId = config.getPortIdentifier();
|
||||
if (StringUtils.isEmpty(portId)) {
|
||||
portId = siteInfoProvider.getPortIdentifier(config.getPortName(), direction);
|
||||
|
@ -141,7 +140,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
|||
}
|
||||
}
|
||||
|
||||
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
|
||||
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
|
||||
|
||||
apiClient.setBaseUrl(peer.getUrl());
|
||||
apiClient.setConnectTimeoutMillis(timeoutMillis);
|
||||
|
@ -157,7 +156,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
|||
try {
|
||||
transactionUrl = apiClient.initiateTransaction(direction, portId);
|
||||
commSession.setUserDn(apiClient.getTrustedPeerDn());
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
apiClient.close();
|
||||
logger.debug("Penalizing a peer due to {}", e.getMessage());
|
||||
peerSelector.penalize(peer, penaltyMillis);
|
||||
|
||||
|
@ -170,8 +170,8 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
|||
}
|
||||
|
||||
// We found a valid peer to communicate with.
|
||||
Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
|
||||
HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
|
||||
final Integer transactionProtocolVersion = apiClient.getTransactionProtocolVersion();
|
||||
final HttpClientTransaction transaction = new HttpClientTransaction(transactionProtocolVersion, peer, direction,
|
||||
config.isUseCompression(), portId, penaltyMillis, config.getEventReporter());
|
||||
transaction.initialize(apiClient, transactionUrl);
|
||||
|
||||
|
@ -183,7 +183,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
|
|||
|
||||
}
|
||||
|
||||
private String resolveNodeApiUrl(PeerDescription description) {
|
||||
private String resolveNodeApiUrl(final PeerDescription description) {
|
||||
return (description.isSecure() ? "https" : "http") + "://" + description.getHostname() + ":" + description.getPort() + "/nifi-api";
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,35 @@
|
|||
*/
|
||||
package org.apache.nifi.remote.client.socket;
|
||||
|
||||
import static org.apache.nifi.remote.util.EventReportUtil.error;
|
||||
import static org.apache.nifi.remote.util.EventReportUtil.warn;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.remote.Peer;
|
||||
import org.apache.nifi.remote.PeerDescription;
|
||||
|
@ -40,33 +69,6 @@ import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.remote.util.EventReportUtil.error;
|
||||
import static org.apache.nifi.remote.util.EventReportUtil.warn;
|
||||
|
||||
public class EndpointConnectionPool implements PeerStatusProvider {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
|
||||
|
@ -84,6 +86,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
|
||||
private volatile int commsTimeout;
|
||||
private volatile boolean shutdown = false;
|
||||
private volatile Set<PeerStatus> lastFetchedQueryablePeers;
|
||||
|
||||
private final SiteInfoProvider siteInfoProvider;
|
||||
private final PeerSelector peerSelector;
|
||||
|
@ -145,8 +148,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
return getEndpointConnection(direction, null);
|
||||
}
|
||||
|
||||
public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config)
|
||||
throws IOException {
|
||||
public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException {
|
||||
//
|
||||
// Attempt to get a connection state that already exists for this URL.
|
||||
//
|
||||
|
@ -358,15 +360,13 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
}
|
||||
}
|
||||
|
||||
public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
|
||||
final String hostname = clusterUrl.getHost();
|
||||
final Integer port = siteInfoProvider.getSiteToSitePort();
|
||||
if (port == null) {
|
||||
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
|
||||
}
|
||||
private Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
|
||||
final String hostname = peerDescription.getHostname();
|
||||
final int port = peerDescription.getPort();
|
||||
|
||||
final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
|
||||
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
|
||||
|
||||
final Peer peer = new Peer(clusterPeerDescription, commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
|
||||
final SocketClientProtocol clientProtocol = new SocketClientProtocol();
|
||||
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
||||
|
@ -414,6 +414,50 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
return peerStatuses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
|
||||
final Set<PeerDescription> peersToRequestClusterInfoFrom = new HashSet<>();
|
||||
|
||||
// Look at all of the peers that we fetched last time.
|
||||
final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
|
||||
if (lastFetched != null && !lastFetched.isEmpty()) {
|
||||
lastFetched.stream().map(peer -> peer.getPeerDescription())
|
||||
.forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
|
||||
}
|
||||
|
||||
// Always add the configured node info to the list of peers to communicate with
|
||||
final String hostname = clusterUrl.getHost();
|
||||
final Integer port = siteInfoProvider.getSiteToSitePort();
|
||||
if (port == null) {
|
||||
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
|
||||
}
|
||||
|
||||
final boolean secure = siteInfoProvider.isSecure();
|
||||
peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, secure));
|
||||
|
||||
Exception lastFailure = null;
|
||||
for (final PeerDescription peerDescription : peersToRequestClusterInfoFrom) {
|
||||
try {
|
||||
final Set<PeerStatus> statuses = fetchRemotePeerStatuses(peerDescription);
|
||||
lastFetchedQueryablePeers = statuses.stream()
|
||||
.filter(p -> p.isQueryForPeers())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return statuses;
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), peerDescription.getPort());
|
||||
lastFailure = e;
|
||||
}
|
||||
}
|
||||
|
||||
final IOException ioe = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
|
||||
if (lastFailure != null) {
|
||||
ioe.addSuppressed(lastFailure);
|
||||
}
|
||||
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException {
|
||||
final PeerDescription description = peerStatus.getPeerDescription();
|
||||
return establishSiteToSiteConnection(description.getHostname(), description.getPort());
|
||||
|
|
|
@ -32,11 +32,12 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
|
|||
private Transaction.TransactionState status = Transaction.TransactionState.TRANSACTION_STARTED;
|
||||
private ResponseCode responseCode;
|
||||
|
||||
public HttpServerCommunicationsSession(InputStream inputStream, OutputStream outputStream, String transactionId){
|
||||
public HttpServerCommunicationsSession(final InputStream inputStream, final OutputStream outputStream, final String transactionId, final String userDn) {
|
||||
super();
|
||||
input.setInputStream(inputStream);
|
||||
output.setOutputStream(outputStream);
|
||||
this.transactionId = transactionId;
|
||||
setUserDn(userDn);
|
||||
}
|
||||
|
||||
// This status is only needed by HttpFlowFileServerProtocol, HttpClientTransaction has its own status.
|
||||
|
@ -46,7 +47,7 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
|
|||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(Transaction.TransactionState status) {
|
||||
public void setStatus(final Transaction.TransactionState status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
|
@ -58,11 +59,11 @@ public class HttpServerCommunicationsSession extends HttpCommunicationsSession {
|
|||
return responseCode;
|
||||
}
|
||||
|
||||
public void setResponseCode(ResponseCode responseCode) {
|
||||
public void setResponseCode(final ResponseCode responseCode) {
|
||||
this.responseCode = responseCode;
|
||||
}
|
||||
|
||||
public void putHandshakeParam(HandshakeProperty key, String value) {
|
||||
public void putHandshakeParam(final HandshakeProperty key, final String value) {
|
||||
handshakeParams.put(key.name(), value);
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,8 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public class SocketClientProtocol implements ClientProtocol {
|
||||
|
||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
|
||||
// Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
|
||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
|
||||
|
||||
private RemoteDestination destination;
|
||||
private boolean useCompression = false;
|
||||
|
@ -217,6 +218,8 @@ public class SocketClientProtocol implements ClientProtocol {
|
|||
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
||||
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
|
||||
|
||||
final boolean queryPeersForOtherPeers = getVersionNegotiator().getVersion() >= 6;
|
||||
|
||||
RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
|
||||
dos.flush();
|
||||
final int numPeers = dis.readInt();
|
||||
|
@ -226,7 +229,7 @@ public class SocketClientProtocol implements ClientProtocol {
|
|||
final int port = dis.readInt();
|
||||
final boolean secure = dis.readBoolean();
|
||||
final int flowFileCount = dis.readInt();
|
||||
peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount));
|
||||
peers.add(new PeerStatus(new PeerDescription(hostname, port, secure), flowFileCount, queryPeersForOtherPeers));
|
||||
}
|
||||
|
||||
logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
|
||||
|
|
|
@ -125,11 +125,8 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
private static final int RESPONSE_CODE_OK = 200;
|
||||
private static final int RESPONSE_CODE_CREATED = 201;
|
||||
private static final int RESPONSE_CODE_ACCEPTED = 202;
|
||||
private static final int RESPONSE_CODE_SEE_OTHER = 303;
|
||||
private static final int RESPONSE_CODE_BAD_REQUEST = 400;
|
||||
private static final int RESPONSE_CODE_UNAUTHORIZED = 401;
|
||||
private static final int RESPONSE_CODE_NOT_FOUND = 404;
|
||||
private static final int RESPONSE_CODE_SERVICE_UNAVAILABLE = 503;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SiteToSiteRestApiClient.class);
|
||||
|
||||
|
@ -161,6 +158,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
|
||||
this.sslContext = sslContext;
|
||||
this.proxy = proxy;
|
||||
|
||||
ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
||||
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
|
||||
|
||||
|
@ -168,6 +166,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
public Thread newThread(final Runnable r) {
|
||||
final Thread thread = defaultFactory.newThread(r);
|
||||
thread.setName(Thread.currentThread().getName() + " TTLExtend");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
|
@ -210,9 +209,9 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
private void setupRequestConfig() {
|
||||
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
|
||||
.setConnectionRequestTimeout(connectTimeoutMillis)
|
||||
.setConnectTimeout(connectTimeoutMillis)
|
||||
.setSocketTimeout(readTimeoutMillis);
|
||||
.setConnectionRequestTimeout(connectTimeoutMillis)
|
||||
.setConnectTimeout(connectTimeoutMillis)
|
||||
.setSocketTimeout(readTimeoutMillis);
|
||||
|
||||
if (proxy != null) {
|
||||
requestConfigBuilder.setProxy(proxy.getHttpHost());
|
||||
|
@ -226,8 +225,8 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
if (proxy != null) {
|
||||
if (!isEmpty(proxy.getUsername()) && !isEmpty(proxy.getPassword())) {
|
||||
credentialsProvider.setCredentials(
|
||||
new AuthScope(proxy.getHttpHost()),
|
||||
new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
|
||||
new AuthScope(proxy.getHttpHost()),
|
||||
new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -242,7 +241,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
httpClient = clientBuilder
|
||||
.setDefaultCredentialsProvider(getCredentialsProvider()).build();
|
||||
.setDefaultCredentialsProvider(getCredentialsProvider()).build();
|
||||
}
|
||||
|
||||
private void setupAsyncClient() {
|
||||
|
@ -268,9 +267,9 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
final SSLSession sslSession;
|
||||
if (conn instanceof ManagedHttpClientConnection) {
|
||||
sslSession = ((ManagedHttpClientConnection)conn).getSSLSession();
|
||||
sslSession = ((ManagedHttpClientConnection) conn).getSSLSession();
|
||||
} else if (conn instanceof ManagedNHttpClientConnection) {
|
||||
sslSession = ((ManagedNHttpClientConnection)conn).getSSLSession();
|
||||
sslSession = ((ManagedNHttpClientConnection) conn).getSSLSession();
|
||||
} else {
|
||||
throw new RuntimeException("Unexpected connection type was used, " + conn);
|
||||
}
|
||||
|
@ -285,7 +284,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
try {
|
||||
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
|
||||
trustedPeerDn = cert.getSubjectDN().getName().trim();
|
||||
} catch (CertificateException e) {
|
||||
} catch (final CertificateException e) {
|
||||
final String msg = "Could not extract subject DN from SSL session peer certificate";
|
||||
logger.warn(msg);
|
||||
throw new SSLPeerUnverifiedException(msg);
|
||||
|
@ -296,14 +295,14 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
public ControllerDTO getController() throws IOException {
|
||||
try {
|
||||
HttpGet get = createGet("/site-to-site");
|
||||
final HttpGet get = createGet("/site-to-site");
|
||||
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
return execute(get, ControllerEntity.class).getController();
|
||||
|
||||
} catch (HttpGetFailedException e) {
|
||||
} catch (final HttpGetFailedException e) {
|
||||
if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) {
|
||||
logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url...");
|
||||
HttpGet get = createGet("/controller");
|
||||
final HttpGet get = createGet("/controller");
|
||||
return execute(get, ControllerEntity.class).getController();
|
||||
}
|
||||
throw e;
|
||||
|
@ -311,12 +310,12 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
public Collection<PeerDTO> getPeers() throws IOException {
|
||||
HttpGet get = createGet("/site-to-site/peers");
|
||||
final HttpGet get = createGet("/site-to-site/peers");
|
||||
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
return execute(get, PeersEntity.class).getPeers();
|
||||
}
|
||||
|
||||
public String initiateTransaction(TransferDirection direction, String portId) throws IOException {
|
||||
public String initiateTransaction(final TransferDirection direction, final String portId) throws IOException {
|
||||
if (TransferDirection.RECEIVE.equals(direction)) {
|
||||
return initiateTransaction("output-ports", portId);
|
||||
} else {
|
||||
|
@ -324,10 +323,9 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private String initiateTransaction(String portType, String portId) throws IOException {
|
||||
private String initiateTransaction(final String portType, final String portId) throws IOException {
|
||||
logger.debug("initiateTransaction handshaking portType={}, portId={}", portType, portId);
|
||||
HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions");
|
||||
|
||||
final HttpPost post = createPost("/data-transfer/" + portType + "/" + portId + "/transactions");
|
||||
|
||||
post.setHeader("Accept", "application/json");
|
||||
post.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
|
@ -335,27 +333,27 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
setHandshakeProperties(post);
|
||||
|
||||
try (CloseableHttpResponse response = getHttpClient().execute(post)) {
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
logger.debug("initiateTransaction responseCode={}", responseCode);
|
||||
|
||||
String transactionUrl;
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_CREATED :
|
||||
case RESPONSE_CODE_CREATED:
|
||||
EntityUtils.consume(response.getEntity());
|
||||
|
||||
transactionUrl = readTransactionUrl(response);
|
||||
if (isEmpty(transactionUrl)) {
|
||||
throw new ProtocolException("Server returned RESPONSE_CODE_CREATED without Location header");
|
||||
}
|
||||
Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
|
||||
final Header transportProtocolVersionHeader = response.getFirstHeader(HttpHeaders.PROTOCOL_VERSION);
|
||||
if (transportProtocolVersionHeader == null) {
|
||||
throw new ProtocolException("Server didn't return confirmed protocol version");
|
||||
}
|
||||
Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
|
||||
final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
|
||||
logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
|
||||
transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
|
||||
|
||||
Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
|
||||
final Header serverTransactionTtlHeader = response.getFirstHeader(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
|
||||
if (serverTransactionTtlHeader == null) {
|
||||
throw new ProtocolException("Server didn't return " + HttpHeaders.SERVER_SIDE_TRANSACTION_TTL);
|
||||
}
|
||||
|
@ -373,33 +371,36 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
}
|
||||
|
||||
public boolean openConnectionForReceive(String transactionUrl, CommunicationsSession commSession) throws IOException {
|
||||
public boolean openConnectionForReceive(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
|
||||
|
||||
HttpGet get = createGet(transactionUrl + "/flow-files");
|
||||
final HttpGet get = createGet(transactionUrl + "/flow-files");
|
||||
get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
|
||||
setHandshakeProperties(get);
|
||||
|
||||
CloseableHttpResponse response = getHttpClient().execute(get);
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
final CloseableHttpResponse response = getHttpClient().execute(get);
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
logger.debug("responseCode={}", responseCode);
|
||||
|
||||
boolean keepItOpen = false;
|
||||
try {
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_OK :
|
||||
case RESPONSE_CODE_OK:
|
||||
logger.debug("Server returned RESPONSE_CODE_OK, indicating there was no data.");
|
||||
EntityUtils.consume(response.getEntity());
|
||||
return false;
|
||||
|
||||
case RESPONSE_CODE_ACCEPTED :
|
||||
InputStream httpIn = response.getEntity().getContent();
|
||||
InputStream streamCapture = new InputStream() {
|
||||
case RESPONSE_CODE_ACCEPTED:
|
||||
final InputStream httpIn = response.getEntity().getContent();
|
||||
final InputStream streamCapture = new InputStream() {
|
||||
boolean closed = false;
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if(closed) return -1;
|
||||
int r = httpIn.read();
|
||||
if (closed) {
|
||||
return -1;
|
||||
}
|
||||
final int r = httpIn.read();
|
||||
if (r < 0) {
|
||||
closed = true;
|
||||
logger.debug("Reached to end of input stream. Closing resources...");
|
||||
|
@ -410,7 +411,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
return r;
|
||||
}
|
||||
};
|
||||
((HttpInput)commSession.getInput()).setInputStream(streamCapture);
|
||||
((HttpInput) commSession.getInput()).setInputStream(streamCapture);
|
||||
|
||||
startExtendingTtl(transactionUrl, httpIn, response);
|
||||
keepItOpen = true;
|
||||
|
@ -431,10 +432,11 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
private final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384;
|
||||
private Future<HttpResponse> postResult;
|
||||
private CountDownLatch transferDataLatch = new CountDownLatch(1);
|
||||
public void openConnectionForSend(String transactionUrl, CommunicationsSession commSession) throws IOException {
|
||||
|
||||
public void openConnectionForSend(final String transactionUrl, final CommunicationsSession commSession) throws IOException {
|
||||
|
||||
final String flowFilesPath = transactionUrl + "/flow-files";
|
||||
HttpPost post = createPost(flowFilesPath);
|
||||
final HttpPost post = createPost(flowFilesPath);
|
||||
|
||||
post.setHeader("Content-Type", "application/octet-stream");
|
||||
post.setHeader("Accept", "text/plain");
|
||||
|
@ -442,7 +444,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
setHandshakeProperties(post);
|
||||
|
||||
CountDownLatch initConnectionLatch = new CountDownLatch(1);
|
||||
final CountDownLatch initConnectionLatch = new CountDownLatch(1);
|
||||
|
||||
final URI requestUri = post.getURI();
|
||||
final PipedOutputStream outputStream = new PipedOutputStream();
|
||||
|
@ -463,7 +465,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
// Pass the output stream so that Site-to-Site client thread can send
|
||||
// data packet through this connection.
|
||||
logger.debug("sending data to {} has started...", flowFilesPath);
|
||||
((HttpOutput)commSession.getOutput()).setOutputStream(outputStream);
|
||||
((HttpOutput) commSession.getOutput()).setOutputStream(outputStream);
|
||||
initConnectionLatch.countDown();
|
||||
|
||||
final BasicHttpEntity entity = new BasicHttpEntity();
|
||||
|
@ -474,7 +476,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException {
|
||||
public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException {
|
||||
|
||||
int totalRead = 0;
|
||||
int totalProduced = 0;
|
||||
|
@ -501,7 +503,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
final long totalWritten = commSession.getOutput().getBytesWritten();
|
||||
logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.",
|
||||
flowFilesPath, totalProduced, totalRead, totalWritten);
|
||||
flowFilesPath, totalProduced, totalRead, totalWritten);
|
||||
if (totalRead != totalWritten || totalProduced != totalWritten) {
|
||||
final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong.";
|
||||
throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten));
|
||||
|
@ -513,12 +515,12 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void requestCompleted(HttpContext context) {
|
||||
public void requestCompleted(final HttpContext context) {
|
||||
logger.debug("Sending data to {} completed.", flowFilesPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception ex) {
|
||||
public void failed(final Exception ex) {
|
||||
logger.error("Sending data to {} has failed", flowFilesPath, ex);
|
||||
}
|
||||
|
||||
|
@ -554,13 +556,13 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
transferDataLatch = new CountDownLatch(1);
|
||||
startExtendingTtl(transactionUrl, dataPacketChannel, null);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
} catch (final InterruptedException e) {
|
||||
throw new IOException("Awaiting initConnectionLatch has been interrupted.", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void finishTransferFlowFiles(CommunicationsSession commSession) throws IOException {
|
||||
public void finishTransferFlowFiles(final CommunicationsSession commSession) throws IOException {
|
||||
|
||||
if (postResult == null) {
|
||||
new IllegalStateException("Data transfer has not started yet.");
|
||||
|
@ -576,7 +578,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
if (!transferDataLatch.await(requestExpirationMillis, TimeUnit.MILLISECONDS)) {
|
||||
throw new IOException("Awaiting transferDataLatch has been timeout.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} catch (final InterruptedException e) {
|
||||
throw new IOException("Awaiting transferDataLatch has been interrupted.", e);
|
||||
}
|
||||
|
||||
|
@ -585,24 +587,24 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
final HttpResponse response;
|
||||
try {
|
||||
response = postResult.get(readTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
} catch (ExecutionException e) {
|
||||
} catch (final ExecutionException e) {
|
||||
logger.debug("Something has happened at sending thread. {}", e.getMessage());
|
||||
Throwable cause = e.getCause();
|
||||
final Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException) {
|
||||
throw (IOException) cause;
|
||||
} else {
|
||||
throw new IOException(cause);
|
||||
}
|
||||
} catch (TimeoutException|InterruptedException e) {
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_ACCEPTED :
|
||||
String receivedChecksum = EntityUtils.toString(response.getEntity());
|
||||
((HttpInput)commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes()));
|
||||
((HttpCommunicationsSession)commSession).setChecksum(receivedChecksum);
|
||||
case RESPONSE_CODE_ACCEPTED:
|
||||
final String receivedChecksum = EntityUtils.toString(response.getEntity());
|
||||
((HttpInput) commSession.getInput()).setInputStream(new ByteArrayInputStream(receivedChecksum.getBytes()));
|
||||
((HttpCommunicationsSession) commSession).setChecksum(receivedChecksum);
|
||||
logger.debug("receivedChecksum={}", receivedChecksum);
|
||||
break;
|
||||
|
||||
|
@ -623,17 +625,17 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
|
||||
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
|
||||
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
|
||||
int extendFrequency = serverTransactionTtl / 2;
|
||||
final int extendFrequency = serverTransactionTtl / 2;
|
||||
ttlExtendingThread = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
|
||||
try {
|
||||
extendingApiClient.extendTransaction(transactionUrl);
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to extend transaction ttl", e);
|
||||
try {
|
||||
// Without disconnecting, Site-to-Site client keep reading data packet,
|
||||
// while server has already rollback.
|
||||
this.close();
|
||||
} catch (IOException ec) {
|
||||
} catch (final IOException ec) {
|
||||
logger.warn("Failed to close", e);
|
||||
}
|
||||
}
|
||||
|
@ -645,7 +647,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
if (closeable != null) {
|
||||
closeable.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
logger.warn("Got an exception during closing {}: {}", closeable, e.getMessage());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.warn("", e);
|
||||
|
@ -653,7 +655,7 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public TransactionResultEntity extendTransaction(String transactionUrl) throws IOException {
|
||||
public TransactionResultEntity extendTransaction(final String transactionUrl) throws IOException {
|
||||
logger.debug("Sending extendTransaction request to transactionUrl: {}", transactionUrl);
|
||||
|
||||
final HttpPut put = createPut(transactionUrl);
|
||||
|
@ -663,15 +665,14 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
setHandshakeProperties(put);
|
||||
|
||||
try (CloseableHttpResponse response = getHttpClient().execute(put)) {
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
try (final CloseableHttpResponse response = getHttpClient().execute(put)) {
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
logger.debug("extendTransaction responseCode={}", responseCode);
|
||||
|
||||
try (InputStream content = response.getEntity().getContent()) {
|
||||
try (final InputStream content = response.getEntity().getContent()) {
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_OK :
|
||||
case RESPONSE_CODE_OK:
|
||||
return readResponse(content);
|
||||
|
||||
default:
|
||||
throw handleErrResponse(responseCode, content);
|
||||
}
|
||||
|
@ -694,39 +695,41 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
}
|
||||
|
||||
private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException {
|
||||
if(in == null) {
|
||||
if (in == null) {
|
||||
return new IOException("Unexpected response code: " + responseCode);
|
||||
}
|
||||
TransactionResultEntity errEntity = readResponse(in);
|
||||
ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode());
|
||||
|
||||
final TransactionResultEntity errEntity = readResponse(in);
|
||||
final ResponseCode errCode = ResponseCode.fromCode(errEntity.getResponseCode());
|
||||
|
||||
switch (errCode) {
|
||||
case UNKNOWN_PORT:
|
||||
return new UnknownPortException(errEntity.getMessage());
|
||||
case PORT_NOT_IN_VALID_STATE:
|
||||
return new PortNotRunningException(errEntity.getMessage());
|
||||
default:
|
||||
return new IOException("Unexpected response code: " + responseCode
|
||||
+ " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
|
||||
return new IOException("Unexpected response code: " + responseCode + " errCode:" + errCode + " errMessage:" + errEntity.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private TransactionResultEntity readResponse(InputStream inputStream) throws IOException {
|
||||
private TransactionResultEntity readResponse(final InputStream inputStream) throws IOException {
|
||||
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
StreamUtils.copy(inputStream, bos);
|
||||
String responseMessage = null;
|
||||
|
||||
try {
|
||||
responseMessage = new String(bos.toByteArray(), "UTF-8");
|
||||
logger.debug("readResponse responseMessage={}", responseMessage);
|
||||
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.readValue(responseMessage, TransactionResultEntity.class);
|
||||
|
||||
} catch (JsonParseException | JsonMappingException e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Failed to parse JSON.", e);
|
||||
}
|
||||
TransactionResultEntity entity = new TransactionResultEntity();
|
||||
|
||||
final TransactionResultEntity entity = new TransactionResultEntity();
|
||||
entity.setResponseCode(ResponseCode.ABORT.getCode());
|
||||
entity.setMessage(responseMessage);
|
||||
return entity;
|
||||
|
@ -736,90 +739,109 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
private String readTransactionUrl(final CloseableHttpResponse response) {
|
||||
final Header locationUriIntentHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME);
|
||||
logger.debug("locationUriIntentHeader={}", locationUriIntentHeader);
|
||||
if (locationUriIntentHeader != null) {
|
||||
if (LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
|
||||
Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME);
|
||||
logger.debug("transactionUrl={}", transactionUrl);
|
||||
if (transactionUrl != null) {
|
||||
return transactionUrl.getValue();
|
||||
}
|
||||
|
||||
if (locationUriIntentHeader != null && LOCATION_URI_INTENT_VALUE.equals(locationUriIntentHeader.getValue())) {
|
||||
final Header transactionUrl = response.getFirstHeader(LOCATION_HEADER_NAME);
|
||||
logger.debug("transactionUrl={}", transactionUrl);
|
||||
|
||||
if (transactionUrl != null) {
|
||||
return transactionUrl.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void setHandshakeProperties(final HttpRequestBase httpRequest) {
|
||||
if(compress) httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
|
||||
if(requestExpirationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis));
|
||||
if(batchCount > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount));
|
||||
if(batchSize > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize));
|
||||
if(batchDurationMillis > 0) httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis));
|
||||
if (compress) {
|
||||
httpRequest.setHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION, "true");
|
||||
}
|
||||
|
||||
if (requestExpirationMillis > 0) {
|
||||
httpRequest.setHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION, String.valueOf(requestExpirationMillis));
|
||||
}
|
||||
|
||||
if (batchCount > 0) {
|
||||
httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_COUNT, String.valueOf(batchCount));
|
||||
}
|
||||
|
||||
if (batchSize > 0) {
|
||||
httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_SIZE, String.valueOf(batchSize));
|
||||
}
|
||||
|
||||
if (batchDurationMillis > 0) {
|
||||
httpRequest.setHeader(HANDSHAKE_PROPERTY_BATCH_DURATION, String.valueOf(batchDurationMillis));
|
||||
}
|
||||
}
|
||||
|
||||
private HttpGet createGet(final String path) {
|
||||
final URI url = getUri(path);
|
||||
HttpGet get = new HttpGet(url);
|
||||
get.setConfig(getRequestConfig());
|
||||
return get;
|
||||
}
|
||||
|
||||
private URI getUri(String path) {
|
||||
private URI getUri(final String path) {
|
||||
final URI url;
|
||||
try {
|
||||
if(HTTP_ABS_URL.matcher(path).find()){
|
||||
if (HTTP_ABS_URL.matcher(path).find()) {
|
||||
url = new URI(path);
|
||||
} else {
|
||||
if(StringUtils.isEmpty(getBaseUrl())){
|
||||
if (StringUtils.isEmpty(getBaseUrl())) {
|
||||
throw new IllegalStateException("API baseUrl is not resolved yet, call setBaseUrl or resolveBaseUrl before sending requests with relative path.");
|
||||
}
|
||||
url = new URI(baseUrl + path);
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
} catch (final URISyntaxException e) {
|
||||
throw new IllegalArgumentException(e.getMessage());
|
||||
}
|
||||
return url;
|
||||
}
|
||||
|
||||
|
||||
private HttpGet createGet(final String path) {
|
||||
final URI url = getUri(path);
|
||||
final HttpGet get = new HttpGet(url);
|
||||
get.setConfig(getRequestConfig());
|
||||
return get;
|
||||
}
|
||||
|
||||
private HttpPost createPost(final String path) {
|
||||
final URI url = getUri(path);
|
||||
HttpPost post = new HttpPost(url);
|
||||
final HttpPost post = new HttpPost(url);
|
||||
post.setConfig(getRequestConfig());
|
||||
return post;
|
||||
}
|
||||
|
||||
private HttpPut createPut(final String path) {
|
||||
final URI url = getUri(path);
|
||||
HttpPut put = new HttpPut(url);
|
||||
final HttpPut put = new HttpPut(url);
|
||||
put.setConfig(getRequestConfig());
|
||||
return put;
|
||||
}
|
||||
|
||||
private HttpDelete createDelete(final String path) {
|
||||
final URI url = getUri(path);
|
||||
HttpDelete delete = new HttpDelete(url);
|
||||
final HttpDelete delete = new HttpDelete(url);
|
||||
delete.setConfig(getRequestConfig());
|
||||
return delete;
|
||||
}
|
||||
|
||||
private String execute(final HttpGet get) throws IOException {
|
||||
final CloseableHttpClient httpClient = getHttpClient();
|
||||
|
||||
CloseableHttpClient httpClient = getHttpClient();
|
||||
try (CloseableHttpResponse response = httpClient.execute(get)) {
|
||||
StatusLine statusLine = response.getStatusLine();
|
||||
int statusCode = statusLine.getStatusCode();
|
||||
try (final CloseableHttpResponse response = httpClient.execute(get)) {
|
||||
final StatusLine statusLine = response.getStatusLine();
|
||||
final int statusCode = statusLine.getStatusCode();
|
||||
if (RESPONSE_CODE_OK != statusCode) {
|
||||
throw new HttpGetFailedException(statusCode, statusLine.getReasonPhrase(), null);
|
||||
}
|
||||
HttpEntity entity = response.getEntity();
|
||||
String responseMessage = EntityUtils.toString(entity);
|
||||
final HttpEntity entity = response.getEntity();
|
||||
final String responseMessage = EntityUtils.toString(entity);
|
||||
return responseMessage;
|
||||
}
|
||||
}
|
||||
|
||||
public class HttpGetFailedException extends IOException {
|
||||
private static final long serialVersionUID = 7920714957269466946L;
|
||||
|
||||
private final int responseCode;
|
||||
private final String responseMessage;
|
||||
private final String explanation;
|
||||
|
||||
public HttpGetFailedException(final int responseCode, final String responseMessage, final String explanation) {
|
||||
super("response code " + responseCode + ":" + responseMessage + " with explanation: " + explanation);
|
||||
this.responseCode = responseCode;
|
||||
|
@ -854,25 +876,25 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
this.baseUrl = baseUrl;
|
||||
}
|
||||
|
||||
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||
public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
|
||||
this.connectTimeoutMillis = connectTimeoutMillis;
|
||||
}
|
||||
|
||||
public void setReadTimeoutMillis(int readTimeoutMillis) {
|
||||
public void setReadTimeoutMillis(final int readTimeoutMillis) {
|
||||
this.readTimeoutMillis = readTimeoutMillis;
|
||||
}
|
||||
|
||||
public String resolveBaseUrl(String clusterUrl) {
|
||||
public String resolveBaseUrl(final String clusterUrl) {
|
||||
URI clusterUri;
|
||||
try {
|
||||
clusterUri = new URI(clusterUrl);
|
||||
} catch (URISyntaxException e) {
|
||||
} catch (final URISyntaxException e) {
|
||||
throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e);
|
||||
}
|
||||
return this.resolveBaseUrl(clusterUri);
|
||||
}
|
||||
|
||||
public String resolveBaseUrl(URI clusterUrl) {
|
||||
public String resolveBaseUrl(final URI clusterUrl) {
|
||||
String urlPath = clusterUrl.getPath();
|
||||
if (urlPath.endsWith("/")) {
|
||||
urlPath = urlPath.substring(0, urlPath.length() - 1);
|
||||
|
@ -884,33 +906,41 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
return resolveBaseUrl(scheme, host, port, "/nifi-api");
|
||||
}
|
||||
|
||||
public String resolveBaseUrl(final String scheme, final String host, final int port, String path) {
|
||||
String baseUri = scheme + "://" + host + ":" + port + path;
|
||||
public String resolveBaseUrl(final String scheme, final String host, final int port, final String path) {
|
||||
final String baseUri = scheme + "://" + host + ":" + port + path;
|
||||
this.setBaseUrl(baseUri);
|
||||
return baseUri;
|
||||
}
|
||||
|
||||
public void setCompress(boolean compress) {
|
||||
public void setCompress(final boolean compress) {
|
||||
this.compress = compress;
|
||||
}
|
||||
|
||||
public void setRequestExpirationMillis(long requestExpirationMillis) {
|
||||
if(requestExpirationMillis < 0) throw new IllegalArgumentException("requestExpirationMillis can't be a negative value.");
|
||||
public void setRequestExpirationMillis(final long requestExpirationMillis) {
|
||||
if (requestExpirationMillis < 0) {
|
||||
throw new IllegalArgumentException("requestExpirationMillis can't be a negative value.");
|
||||
}
|
||||
this.requestExpirationMillis = requestExpirationMillis;
|
||||
}
|
||||
|
||||
public void setBatchCount(int batchCount) {
|
||||
if(batchCount < 0) throw new IllegalArgumentException("batchCount can't be a negative value.");
|
||||
public void setBatchCount(final int batchCount) {
|
||||
if (batchCount < 0) {
|
||||
throw new IllegalArgumentException("batchCount can't be a negative value.");
|
||||
}
|
||||
this.batchCount = batchCount;
|
||||
}
|
||||
|
||||
public void setBatchSize(long batchSize) {
|
||||
if(batchSize < 0) throw new IllegalArgumentException("batchSize can't be a negative value.");
|
||||
public void setBatchSize(final long batchSize) {
|
||||
if (batchSize < 0) {
|
||||
throw new IllegalArgumentException("batchSize can't be a negative value.");
|
||||
}
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public void setBatchDurationMillis(long batchDurationMillis) {
|
||||
if(batchDurationMillis < 0) throw new IllegalArgumentException("batchDurationMillis can't be a negative value.");
|
||||
public void setBatchDurationMillis(final long batchDurationMillis) {
|
||||
if (batchDurationMillis < 0) {
|
||||
throw new IllegalArgumentException("batchDurationMillis can't be a negative value.");
|
||||
}
|
||||
this.batchDurationMillis = batchDurationMillis;
|
||||
}
|
||||
|
||||
|
@ -922,33 +952,33 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
return this.trustedPeerDn;
|
||||
}
|
||||
|
||||
public TransactionResultEntity commitReceivingFlowFiles(String transactionUrl, ResponseCode clientResponse, String checksum) throws IOException {
|
||||
public TransactionResultEntity commitReceivingFlowFiles(final String transactionUrl, final ResponseCode clientResponse, final String checksum) throws IOException {
|
||||
logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}",
|
||||
transactionUrl, clientResponse, checksum);
|
||||
transactionUrl, clientResponse, checksum);
|
||||
|
||||
stopExtendingTtl();
|
||||
|
||||
StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
|
||||
final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
|
||||
if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) {
|
||||
urlBuilder.append("&checksum=").append(checksum);
|
||||
}
|
||||
|
||||
HttpDelete delete = createDelete(urlBuilder.toString());
|
||||
final HttpDelete delete = createDelete(urlBuilder.toString());
|
||||
delete.setHeader("Accept", "application/json");
|
||||
delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
|
||||
setHandshakeProperties(delete);
|
||||
|
||||
try (CloseableHttpResponse response = getHttpClient().execute(delete)) {
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
logger.debug("commitReceivingFlowFiles responseCode={}", responseCode);
|
||||
|
||||
try (InputStream content = response.getEntity().getContent()) {
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_OK :
|
||||
case RESPONSE_CODE_OK:
|
||||
return readResponse(content);
|
||||
|
||||
case RESPONSE_CODE_BAD_REQUEST :
|
||||
case RESPONSE_CODE_BAD_REQUEST:
|
||||
return readResponse(content);
|
||||
|
||||
default:
|
||||
|
@ -959,26 +989,26 @@ public class SiteToSiteRestApiClient implements Closeable {
|
|||
|
||||
}
|
||||
|
||||
public TransactionResultEntity commitTransferFlowFiles(String transactionUrl, ResponseCode clientResponse) throws IOException {
|
||||
String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode();
|
||||
public TransactionResultEntity commitTransferFlowFiles(final String transactionUrl, final ResponseCode clientResponse) throws IOException {
|
||||
final String requestUrl = transactionUrl + "?responseCode=" + clientResponse.getCode();
|
||||
logger.debug("Sending commitTransferFlowFiles request to transactionUrl: {}", requestUrl);
|
||||
|
||||
HttpDelete delete = createDelete(requestUrl);
|
||||
final HttpDelete delete = createDelete(requestUrl);
|
||||
delete.setHeader("Accept", "application/json");
|
||||
delete.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion()));
|
||||
|
||||
setHandshakeProperties(delete);
|
||||
|
||||
try (CloseableHttpResponse response = getHttpClient().execute(delete)) {
|
||||
int responseCode = response.getStatusLine().getStatusCode();
|
||||
final int responseCode = response.getStatusLine().getStatusCode();
|
||||
logger.debug("commitTransferFlowFiles responseCode={}", responseCode);
|
||||
|
||||
try (InputStream content = response.getEntity().getContent()) {
|
||||
switch (responseCode) {
|
||||
case RESPONSE_CODE_OK :
|
||||
case RESPONSE_CODE_OK:
|
||||
return readResponse(content);
|
||||
|
||||
case RESPONSE_CODE_BAD_REQUEST :
|
||||
case RESPONSE_CODE_BAD_REQUEST:
|
||||
return readResponse(content);
|
||||
|
||||
default:
|
||||
|
|
|
@ -54,11 +54,11 @@ public class TestPeerSelector {
|
|||
@Test
|
||||
public void testFormulateDestinationListForOutput() throws IOException {
|
||||
final Set<PeerStatus> collection = new HashSet<>();
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 10240, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 3333, true), 1024, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
|
||||
|
||||
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
|
||||
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
|
||||
|
@ -74,8 +74,8 @@ public class TestPeerSelector {
|
|||
@Test
|
||||
public void testFormulateDestinationListForOutputHugeDifference() throws IOException {
|
||||
final Set<PeerStatus> collection = new HashSet<>();
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 1111, true), 500, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 2222, true), 50000, true));
|
||||
|
||||
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
|
||||
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
|
||||
|
@ -90,11 +90,11 @@ public class TestPeerSelector {
|
|||
@Test
|
||||
public void testFormulateDestinationListForInputPorts() throws IOException {
|
||||
final Set<PeerStatus> collection = new HashSet<>();
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 1111, true), 4096, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 10240, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 3333, true), 1024, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 4444, true), 4096, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasMedium", 5555, true), 4096, true));
|
||||
|
||||
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
|
||||
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
|
||||
|
@ -110,8 +110,8 @@ public class TestPeerSelector {
|
|||
@Test
|
||||
public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException {
|
||||
final Set<PeerStatus> collection = new HashSet<>();
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLots", 1111, true), 500, true));
|
||||
collection.add(new PeerStatus(new PeerDescription("HasLittle", 2222, true), 50000, true));
|
||||
|
||||
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
|
||||
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
|
||||
|
|
|
@ -39,9 +39,6 @@ public class ConnectionResponse {
|
|||
private final int tryLaterSeconds;
|
||||
private final NodeIdentifier nodeIdentifier;
|
||||
private final DataFlow dataFlow;
|
||||
private final Integer managerRemoteInputPort;
|
||||
private final Integer managerRemoteInputHttpPort;
|
||||
private final Boolean managerRemoteCommsSecure;
|
||||
private final String instanceId;
|
||||
private final List<NodeConnectionStatus> nodeStatuses;
|
||||
private final List<ComponentRevision> componentRevisions;
|
||||
|
@ -49,8 +46,7 @@ public class ConnectionResponse {
|
|||
private volatile String coordinatorDN;
|
||||
|
||||
public ConnectionResponse(final NodeIdentifier nodeIdentifier, final DataFlow dataFlow,
|
||||
final Integer managerRemoteInputPort, final Integer managerRemoteInputHttpPort, final Boolean managerRemoteCommsSecure, final String instanceId,
|
||||
final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
|
||||
final String instanceId, final List<NodeConnectionStatus> nodeStatuses, final List<ComponentRevision> componentRevisions) {
|
||||
|
||||
if (nodeIdentifier == null) {
|
||||
throw new IllegalArgumentException("Node identifier may not be empty or null.");
|
||||
|
@ -61,9 +57,6 @@ public class ConnectionResponse {
|
|||
this.dataFlow = dataFlow;
|
||||
this.tryLaterSeconds = 0;
|
||||
this.rejectionReason = null;
|
||||
this.managerRemoteInputPort = managerRemoteInputPort;
|
||||
this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
|
||||
this.managerRemoteCommsSecure = managerRemoteCommsSecure;
|
||||
this.instanceId = instanceId;
|
||||
this.nodeStatuses = Collections.unmodifiableList(new ArrayList<>(nodeStatuses));
|
||||
this.componentRevisions = componentRevisions == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(componentRevisions));
|
||||
|
@ -77,9 +70,6 @@ public class ConnectionResponse {
|
|||
this.nodeIdentifier = null;
|
||||
this.tryLaterSeconds = tryLaterSeconds;
|
||||
this.rejectionReason = null;
|
||||
this.managerRemoteInputPort = null;
|
||||
this.managerRemoteInputHttpPort = null;
|
||||
this.managerRemoteCommsSecure = null;
|
||||
this.instanceId = null;
|
||||
this.nodeStatuses = null;
|
||||
this.componentRevisions = null;
|
||||
|
@ -90,9 +80,6 @@ public class ConnectionResponse {
|
|||
this.nodeIdentifier = null;
|
||||
this.tryLaterSeconds = 0;
|
||||
this.rejectionReason = rejectionReason;
|
||||
this.managerRemoteInputPort = null;
|
||||
this.managerRemoteInputHttpPort = null;
|
||||
this.managerRemoteCommsSecure = null;
|
||||
this.instanceId = null;
|
||||
this.nodeStatuses = null;
|
||||
this.componentRevisions = null;
|
||||
|
@ -130,18 +117,6 @@ public class ConnectionResponse {
|
|||
return nodeIdentifier;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteInputPort() {
|
||||
return managerRemoteInputPort;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteInputHttpPort() {
|
||||
return managerRemoteInputHttpPort;
|
||||
}
|
||||
|
||||
public Boolean isManagerRemoteCommsSecure() {
|
||||
return managerRemoteCommsSecure;
|
||||
}
|
||||
|
||||
public String getInstanceId() {
|
||||
return instanceId;
|
||||
}
|
||||
|
|
|
@ -33,9 +33,6 @@ public class AdaptedConnectionResponse {
|
|||
private NodeIdentifier nodeIdentifier;
|
||||
private String rejectionReason;
|
||||
private int tryLaterSeconds;
|
||||
private Integer managerRemoteInputPort;
|
||||
private Integer managerRemoteInputHttpPort;
|
||||
private Boolean managerRemoteCommsSecure;
|
||||
private String instanceId;
|
||||
private List<NodeConnectionStatus> nodeStatuses;
|
||||
private List<ComponentRevision> componentRevisions;
|
||||
|
@ -81,30 +78,6 @@ public class AdaptedConnectionResponse {
|
|||
return tryLaterSeconds > 0;
|
||||
}
|
||||
|
||||
public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
|
||||
this.managerRemoteInputPort = managerRemoteInputPort;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteInputPort() {
|
||||
return managerRemoteInputPort;
|
||||
}
|
||||
|
||||
public void setManagerRemoteInputHttpPort(Integer managerRemoteInputHttpPort) {
|
||||
this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteInputHttpPort() {
|
||||
return managerRemoteInputHttpPort;
|
||||
}
|
||||
|
||||
public void setManagerRemoteCommsSecure(Boolean secure) {
|
||||
this.managerRemoteCommsSecure = secure;
|
||||
}
|
||||
|
||||
public Boolean isManagerRemoteCommsSecure() {
|
||||
return managerRemoteCommsSecure;
|
||||
}
|
||||
|
||||
public void setInstanceId(String instanceId) {
|
||||
this.instanceId = instanceId;
|
||||
}
|
||||
|
|
|
@ -31,9 +31,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
|
|||
aCr.setNodeIdentifier(cr.getNodeIdentifier());
|
||||
aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
|
||||
aCr.setRejectionReason(cr.getRejectionReason());
|
||||
aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
|
||||
aCr.setManagerRemoteInputHttpPort(cr.getManagerRemoteInputHttpPort());
|
||||
aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
|
||||
aCr.setInstanceId(cr.getInstanceId());
|
||||
aCr.setNodeConnectionStatuses(cr.getNodeConnectionStatuses());
|
||||
aCr.setComponentRevisions(cr.getComponentRevisions());
|
||||
|
@ -49,7 +46,6 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo
|
|||
return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
|
||||
} else {
|
||||
return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(),
|
||||
aCr.getManagerRemoteInputPort(), aCr.getManagerRemoteInputHttpPort(), aCr.isManagerRemoteCommsSecure(),
|
||||
aCr.getInstanceId(), aCr.getNodeConnectionStatuses(), aCr.getComponentRevisions());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,16 +23,12 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
|||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.cluster.protocol.jaxb.message.NodeConnectionStatusAdapter;
|
||||
import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Message to indicate that the status of a node in the cluster has changed
|
||||
*/
|
||||
@XmlRootElement(name = "nodeStatusChange")
|
||||
public class NodeStatusChangeMessage extends ProtocolMessage {
|
||||
private static final Logger logger = LoggerFactory.getLogger(NodeStatusChangeMessage.class);
|
||||
|
||||
private NodeConnectionStatus connectionStatus;
|
||||
private NodeIdentifier nodeId;
|
||||
|
||||
|
|
|
@ -35,9 +35,6 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
|
|||
private NodeIdentifier nodeId;
|
||||
private StandardDataFlow dataFlow;
|
||||
private boolean primary;
|
||||
private Integer managerRemoteSiteListeningPort;
|
||||
private Integer managerRemoteSiteListeningHttpPort;
|
||||
private Boolean managerRemoteSiteCommsSecure;
|
||||
private String instanceId;
|
||||
private List<NodeConnectionStatus> nodeStatuses;
|
||||
private List<ComponentRevision> componentRevisions;
|
||||
|
@ -75,30 +72,6 @@ public class ReconnectionRequestMessage extends ProtocolMessage {
|
|||
return MessageType.RECONNECTION_REQUEST;
|
||||
}
|
||||
|
||||
public void setManagerRemoteSiteListeningPort(final Integer listeningPort) {
|
||||
this.managerRemoteSiteListeningPort = listeningPort;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteSiteListeningPort() {
|
||||
return managerRemoteSiteListeningPort;
|
||||
}
|
||||
|
||||
public void setManagerRemoteSiteListeningHttpPort(Integer managerRemoteSiteListeningHttpPort) {
|
||||
this.managerRemoteSiteListeningHttpPort = managerRemoteSiteListeningHttpPort;
|
||||
}
|
||||
|
||||
public Integer getManagerRemoteSiteListeningHttpPort() {
|
||||
return managerRemoteSiteListeningHttpPort;
|
||||
}
|
||||
|
||||
public void setManagerRemoteSiteCommsSecure(final Boolean remoteSiteCommsSecure) {
|
||||
this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure;
|
||||
}
|
||||
|
||||
public Boolean isManagerRemoteSiteCommsSecure() {
|
||||
return managerRemoteSiteCommsSecure;
|
||||
}
|
||||
|
||||
public void setInstanceId(final String instanceId) {
|
||||
this.instanceId = instanceId;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class TestJaxbProtocolUtils {
|
|||
final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
|
||||
final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
|
||||
final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1")));
|
||||
msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 9990, 8080, false, "instance-1", nodeStatuses, componentRevisions));
|
||||
msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, "instance-1", nodeStatuses, componentRevisions));
|
||||
|
||||
JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
|
||||
final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray()));
|
||||
|
|
|
@ -766,10 +766,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
|||
return new ConnectionResponse(tryAgainSeconds);
|
||||
}
|
||||
|
||||
// TODO: Remove the 'null' values here from the ConnectionResponse all together. These
|
||||
// will no longer be needed for site-to-site once the NCM is gone.
|
||||
return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, null, instanceId,
|
||||
new ArrayList<>(nodeStatuses.values()),
|
||||
return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()),
|
||||
revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.remote.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
@ -24,6 +25,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.remote.Peer;
|
||||
import org.apache.nifi.remote.RootGroupPort;
|
||||
import org.apache.nifi.remote.VersionedRemoteResource;
|
||||
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
|
||||
import org.apache.nifi.remote.cluster.NodeInformant;
|
||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||
import org.apache.nifi.remote.exception.HandshakeException;
|
||||
|
@ -131,9 +133,11 @@ public interface ServerProtocol extends VersionedRemoteResource {
|
|||
* a cluster, sends info about itself
|
||||
*
|
||||
* @param peer peer
|
||||
* @param clusterNodeInfo the cluster information
|
||||
*
|
||||
* @throws java.io.IOException ioe
|
||||
*/
|
||||
void sendPeerList(Peer peer) throws IOException;
|
||||
void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException;
|
||||
|
||||
void shutdown(Peer peer);
|
||||
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.controller;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
|
||||
import org.apache.nifi.remote.cluster.NodeInformant;
|
||||
import org.apache.nifi.remote.cluster.NodeInformation;
|
||||
|
||||
public class ClusterCoordinatorNodeInformant implements NodeInformant {
|
||||
private final ClusterCoordinator clusterCoordinator;
|
||||
|
||||
public ClusterCoordinatorNodeInformant(final ClusterCoordinator coordinator) {
|
||||
this.clusterCoordinator = coordinator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterNodeInformation getNodeInformation() {
|
||||
final List<NodeInformation> nodeInfoCollection = new ArrayList<>();
|
||||
final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
|
||||
|
||||
// TODO: Get total number of FlowFiles for each node
|
||||
for (final NodeIdentifier nodeId : nodeIds) {
|
||||
final NodeInformation nodeInfo = new NodeInformation(nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
|
||||
nodeId.getSiteToSiteHttpApiPort(), nodeId.getApiPort(), nodeId.isSiteToSiteSecure(), 0);
|
||||
nodeInfoCollection.add(nodeInfo);
|
||||
}
|
||||
|
||||
final ClusterNodeInformation nodeInfo = new ClusterNodeInformation();
|
||||
nodeInfo.setNodeInformation(nodeInfoCollection);
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
}
|
|
@ -198,6 +198,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup;
|
|||
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
|
||||
import org.apache.nifi.remote.StandardRootGroupPort;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.cluster.NodeInformant;
|
||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||
import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
|
||||
import org.apache.nifi.reporting.Bulletin;
|
||||
|
@ -298,9 +299,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
private final Integer remoteInputSocketPort;
|
||||
private final Integer remoteInputHttpPort;
|
||||
private final Boolean isSiteToSiteSecure;
|
||||
private Integer clusterManagerRemoteSitePort = null;
|
||||
private Integer clusterManagerRemoteSiteHttpPort = null;
|
||||
private Boolean clusterManagerRemoteSiteCommsSecure = null;
|
||||
|
||||
private ProcessGroup rootGroup;
|
||||
private final List<Connectable> startConnectablesAfterInitialization;
|
||||
|
@ -411,8 +409,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
clusterCoordinator,
|
||||
heartbeatMonitor);
|
||||
|
||||
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
|
||||
|
||||
return flowController;
|
||||
}
|
||||
|
||||
|
@ -525,11 +521,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
} else {
|
||||
// Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
|
||||
RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
|
||||
externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null));
|
||||
|
||||
final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
|
||||
externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant));
|
||||
}
|
||||
|
||||
if (remoteInputHttpPort == null) {
|
||||
LOG.info("Not enabling HTTP(S) Site-to-Site functionality because nifi.remote.input.html.enabled is not true");
|
||||
LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true");
|
||||
} else {
|
||||
externalSiteListeners.add(HttpRemoteSiteListener.getInstance());
|
||||
}
|
||||
|
@ -3895,45 +3893,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
return new ArrayList<>(history.getActions());
|
||||
}
|
||||
|
||||
public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Integer managerListeningHttpPort, final Boolean commsSecure) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
clusterManagerRemoteSitePort = managerListeningPort;
|
||||
clusterManagerRemoteSiteHttpPort = managerListeningHttpPort;
|
||||
clusterManagerRemoteSiteCommsSecure = commsSecure;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getClusterManagerRemoteSiteListeningPort() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return clusterManagerRemoteSitePort;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Integer getClusterManagerRemoteSiteListeningHttpPort() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return clusterManagerRemoteSiteHttpPort;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Boolean isClusterManagerRemoteSiteCommsSecure() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return clusterManagerRemoteSiteCommsSecure;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getRemoteSiteListeningPort() {
|
||||
return remoteInputSocketPort;
|
||||
}
|
||||
|
|
|
@ -467,7 +467,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
controller.setClustered(true, null);
|
||||
clusterCoordinator.setConnected(false);
|
||||
|
||||
controller.setClusterManagerRemoteSiteInfo(null, null, null);
|
||||
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
|
||||
|
||||
/*
|
||||
|
@ -586,9 +585,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
|
||||
// reconnect
|
||||
final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
|
||||
request.getManagerRemoteSiteListeningPort(), request.getManagerRemoteSiteListeningHttpPort(),
|
||||
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(),
|
||||
request.getNodeConnectionStatuses(), request.getComponentRevisions());
|
||||
request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
|
||||
|
||||
connectionResponse.setCoordinatorDN(request.getRequestorDN());
|
||||
loadFromConnectionResponse(connectionResponse);
|
||||
|
@ -853,7 +850,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
|
||||
// mark the node as clustered
|
||||
controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN());
|
||||
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure());
|
||||
|
||||
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
|
||||
final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();
|
||||
|
|
|
@ -1138,9 +1138,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try (
|
||||
final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient();
|
||||
){
|
||||
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()){
|
||||
try {
|
||||
final ControllerDTO dto = apiClient.getController();
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
|
|||
private AuditService auditService;
|
||||
private StringEncryptor encryptor;
|
||||
private BulletinRepository bulletinRepository;
|
||||
private ClusterCoordinator clusterCoordinator;
|
||||
|
||||
@Override
|
||||
public Object getObject() throws Exception {
|
||||
|
@ -53,7 +54,6 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
|
|||
if (properties.isNode()) {
|
||||
final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
|
||||
final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
|
||||
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
|
||||
flowController = FlowController.createClusteredInstance(
|
||||
flowFileEventRepository,
|
||||
properties,
|
||||
|
@ -114,4 +114,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
|
|||
this.bulletinRepository = bulletinRepository;
|
||||
}
|
||||
|
||||
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
|
||||
this.clusterCoordinator = clusterCoordinator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
<property name="auditService" ref="auditService" />
|
||||
<property name="encryptor" ref="stringEncryptor" />
|
||||
<property name="bulletinRepository" ref="bulletinRepository" />
|
||||
<property name="clusterCoordinator" ref="clusterCoordinator" />
|
||||
</bean>
|
||||
|
||||
<!-- flow service -->
|
||||
|
|
|
@ -47,7 +47,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
|
||||
private final Map<String, TransactionWrapper> transactions = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService taskExecutor;
|
||||
private final int httpListenPort;
|
||||
private ProcessGroup rootGroup;
|
||||
private ScheduledFuture<?> transactionMaintenanceTask;
|
||||
|
||||
|
@ -76,9 +75,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec);
|
||||
}
|
||||
transactionTtlSec = txTtlSec;
|
||||
|
||||
httpListenPort = properties.getRemoteInputHttpPort() != null ? properties.getRemoteInputHttpPort() : 0;
|
||||
|
||||
}
|
||||
|
||||
public static HttpRemoteSiteListener getInstance() {
|
||||
|
@ -130,9 +126,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
try {
|
||||
Set<String> transactionIds = transactions.keySet().stream().collect(Collectors.toSet());
|
||||
transactionIds.stream().filter(tid -> !isTransactionActive(tid))
|
||||
.forEach(tid -> {
|
||||
cancelTransaction(tid);
|
||||
});
|
||||
.forEach(tid -> cancelTransaction(tid));
|
||||
} catch (Exception e) {
|
||||
// Swallow exception so that this thread can keep working.
|
||||
logger.error("An exception occurred while maintaining transactions", e);
|
||||
|
@ -161,10 +155,6 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
return httpListenPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
@ -225,7 +215,7 @@ public class HttpRemoteSiteListener implements RemoteSiteListener {
|
|||
return transaction.transaction;
|
||||
}
|
||||
|
||||
public void extendsTransaction(final String transactionId) throws IllegalStateException {
|
||||
public void extendTransaction(final String transactionId) throws IllegalStateException {
|
||||
if (!isTransactionActive(transactionId)){
|
||||
throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId);
|
||||
}
|
||||
|
|
|
@ -26,8 +26,5 @@ public interface RemoteSiteListener {
|
|||
|
||||
void start() throws IOException;
|
||||
|
||||
int getPort();
|
||||
|
||||
void stop();
|
||||
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
|
|||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -266,7 +267,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
|
|||
protocol.getPort().receiveFlowFiles(peer, protocol);
|
||||
break;
|
||||
case REQUEST_PEER_LIST:
|
||||
protocol.sendPeerList(peer);
|
||||
protocol.sendPeerList(peer, nodeInformant == null ? Optional.empty() : Optional.of(nodeInformant.getNodeInformation()));
|
||||
break;
|
||||
case SHUTDOWN:
|
||||
protocol.shutdown(peer);
|
||||
|
@ -321,8 +322,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
|
|||
listenerThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort() {
|
||||
private int getPort() {
|
||||
return socketPort;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.nifi.remote.HttpRemoteSiteListener;
|
|||
import org.apache.nifi.remote.Peer;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.VersionNegotiator;
|
||||
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
|
||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
|
||||
import org.apache.nifi.remote.exception.HandshakeException;
|
||||
|
@ -37,8 +38,9 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
|
||||
public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
|
||||
|
||||
public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
|
||||
|
||||
|
@ -46,7 +48,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc
|
|||
private final VersionNegotiator versionNegotiator;
|
||||
private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
|
||||
|
||||
public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) {
|
||||
public StandardHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
|
||||
super();
|
||||
this.versionNegotiator = versionNegotiator;
|
||||
}
|
||||
|
@ -176,6 +178,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc
|
|||
return holdTransaction(peer, transaction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int commitTransferTransaction(Peer peer, String clientChecksum) throws IOException, IllegalStateException {
|
||||
logger.debug("{} Committing the transfer transaction. peer={} clientChecksum={}", this, peer, clientChecksum);
|
||||
HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||
|
@ -191,6 +194,7 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc
|
|||
return holdTransaction(peer, transaction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int commitReceiveTransaction(Peer peer) throws IOException, IllegalStateException {
|
||||
logger.debug("{} Committing the receive transaction. peer={}", this, peer);
|
||||
HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||
|
@ -211,13 +215,11 @@ public class HttpFlowFileServerProtocolImpl extends AbstractFlowFileServerProtoc
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendPeerList(final Peer peer) throws IOException {
|
||||
public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInformation) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getResourceName() {
|
||||
return RESOURCE_NAME;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,209 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.remote.protocol.socket;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.remote.Peer;
|
||||
import org.apache.nifi.remote.RootGroupPort;
|
||||
import org.apache.nifi.remote.StandardVersionNegotiator;
|
||||
import org.apache.nifi.remote.VersionNegotiator;
|
||||
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
|
||||
import org.apache.nifi.remote.cluster.NodeInformant;
|
||||
import org.apache.nifi.remote.cluster.NodeInformation;
|
||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||
import org.apache.nifi.remote.exception.HandshakeException;
|
||||
import org.apache.nifi.remote.protocol.CommunicationsSession;
|
||||
import org.apache.nifi.remote.protocol.HandshakeProperty;
|
||||
import org.apache.nifi.remote.protocol.RequestType;
|
||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||
import org.apache.nifi.remote.protocol.ServerProtocol;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ClusterManagerServerProtocol implements ServerProtocol {
|
||||
|
||||
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
|
||||
|
||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
|
||||
private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class);
|
||||
private NodeInformant nodeInformant;
|
||||
|
||||
private String commsIdentifier;
|
||||
private boolean shutdown = false;
|
||||
private boolean handshakeCompleted = false;
|
||||
private long requestExpirationMillis = 30000L;
|
||||
|
||||
public ClusterManagerServerProtocol() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeInformant(final NodeInformant nodeInformant) {
|
||||
this.nodeInformant = nodeInformant;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handshake(final Peer peer) throws IOException, HandshakeException {
|
||||
if (handshakeCompleted) {
|
||||
throw new IllegalStateException("Handshake has already been completed");
|
||||
}
|
||||
if (shutdown) {
|
||||
throw new IllegalStateException("Protocol is shutdown");
|
||||
}
|
||||
|
||||
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
||||
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
||||
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
|
||||
|
||||
// read communications identifier
|
||||
commsIdentifier = dis.readUTF();
|
||||
|
||||
// read all of the properties. we don't really care what the properties are.
|
||||
final int numProperties = dis.readInt();
|
||||
for (int i = 0; i < numProperties; i++) {
|
||||
final String propertyName = dis.readUTF();
|
||||
final String propertyValue = dis.readUTF();
|
||||
|
||||
final HandshakeProperty property;
|
||||
try {
|
||||
property = HandshakeProperty.valueOf(propertyName);
|
||||
if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) {
|
||||
requestExpirationMillis = Long.parseLong(propertyValue);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
}
|
||||
}
|
||||
|
||||
// send "OK" response
|
||||
ResponseCode.PROPERTIES_OK.writeResponse(dos);
|
||||
|
||||
logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier);
|
||||
handshakeCompleted = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHandshakeSuccessful() {
|
||||
return handshakeCompleted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPeerList(final Peer peer) throws IOException {
|
||||
if (!handshakeCompleted) {
|
||||
throw new IllegalStateException("Handshake has not been completed");
|
||||
}
|
||||
if (shutdown) {
|
||||
throw new IllegalStateException("Protocol is shutdown");
|
||||
}
|
||||
|
||||
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
||||
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
|
||||
|
||||
final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation();
|
||||
final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
|
||||
|
||||
// determine how many nodes have Site-to-site enabled
|
||||
int numPeers = 0;
|
||||
for (final NodeInformation nodeInfo : nodeInfos) {
|
||||
if (nodeInfo.getSiteToSitePort() != null) {
|
||||
numPeers++;
|
||||
}
|
||||
}
|
||||
|
||||
dos.writeInt(numPeers);
|
||||
for (final NodeInformation nodeInfo : nodeInfos) {
|
||||
if (nodeInfo.getSiteToSitePort() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dos.writeUTF(nodeInfo.getSiteToSiteHostname());
|
||||
dos.writeInt(nodeInfo.getSiteToSitePort());
|
||||
dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
|
||||
dos.writeInt(nodeInfo.getTotalFlowFiles());
|
||||
}
|
||||
|
||||
logger.info("Redirected {} to {} nodes", peer, numPeers);
|
||||
|
||||
dos.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final Peer peer) {
|
||||
shutdown = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return shutdown;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowFileCodec negotiateCodec(Peer peer) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowFileCodec getPreNegotiatedCodec() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestType getRequestType(final Peer peer) throws IOException {
|
||||
final CommunicationsSession commsSession = peer.getCommunicationsSession();
|
||||
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
|
||||
return RequestType.readRequestType(dis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionNegotiator getVersionNegotiator() {
|
||||
return versionNegotiator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getResourceName() {
|
||||
return RESOURCE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRootProcessGroup(final ProcessGroup rootGroup) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RootGroupPort getPort() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestExpiration() {
|
||||
return requestExpirationMillis;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ import org.apache.nifi.remote.Peer;
|
|||
import org.apache.nifi.remote.RemoteResourceFactory;
|
||||
import org.apache.nifi.remote.StandardVersionNegotiator;
|
||||
import org.apache.nifi.remote.VersionNegotiator;
|
||||
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
|
||||
import org.apache.nifi.remote.cluster.NodeInformation;
|
||||
import org.apache.nifi.remote.codec.FlowFileCodec;
|
||||
import org.apache.nifi.remote.exception.HandshakeException;
|
||||
import org.apache.nifi.remote.exception.ProtocolException;
|
||||
|
@ -34,14 +36,19 @@ import java.io.DataInputStream;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol {
|
||||
|
||||
public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
|
||||
|
||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
|
||||
// Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0
|
||||
private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
|
||||
|
||||
@Override
|
||||
protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
|
||||
|
@ -147,7 +154,7 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendPeerList(final Peer peer) throws IOException {
|
||||
public void sendPeerList(final Peer peer, final Optional<ClusterNodeInformation> clusterNodeInfo) throws IOException {
|
||||
if (!handshakeCompleted) {
|
||||
throw new IllegalStateException("Handshake has not been completed");
|
||||
}
|
||||
|
@ -167,12 +174,36 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
|
|||
}
|
||||
logger.debug("{} Advertising Remote Input host name {}", this, peer);
|
||||
|
||||
// we have only 1 peer: ourselves.
|
||||
dos.writeInt(1);
|
||||
dos.writeUTF(remoteInputHost);
|
||||
dos.writeInt(properties.getRemoteInputPort());
|
||||
dos.writeBoolean(properties.isSiteToSiteSecure());
|
||||
dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host.
|
||||
List<NodeInformation> nodeInfos;
|
||||
if (clusterNodeInfo.isPresent()) {
|
||||
nodeInfos = new ArrayList<>(clusterNodeInfo.get().getNodeInformation());
|
||||
} else {
|
||||
final NodeInformation self = new NodeInformation(remoteInputHost, properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.getRemoteInputHttpPort(),
|
||||
properties.isSiteToSiteSecure(), 0);
|
||||
nodeInfos = Collections.singletonList(self);
|
||||
}
|
||||
|
||||
// determine how many nodes have Site-to-site enabled
|
||||
int numPeers = 0;
|
||||
for (final NodeInformation nodeInfo : nodeInfos) {
|
||||
if (nodeInfo.getSiteToSitePort() != null) {
|
||||
numPeers++;
|
||||
}
|
||||
}
|
||||
|
||||
dos.writeInt(numPeers);
|
||||
for (final NodeInformation nodeInfo : nodeInfos) {
|
||||
if (nodeInfo.getSiteToSitePort() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dos.writeUTF(nodeInfo.getSiteToSiteHostname());
|
||||
dos.writeInt(nodeInfo.getSiteToSitePort());
|
||||
dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
|
||||
dos.writeInt(nodeInfo.getTotalFlowFiles());
|
||||
}
|
||||
|
||||
logger.info("Sending list of {} peers back to client {}", numPeers, peer);
|
||||
dos.flush();
|
||||
}
|
||||
|
||||
|
|
|
@ -12,5 +12,5 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
|
||||
org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol
|
||||
|
||||
org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol
|
|
@ -80,7 +80,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
final PeerDescription description = new PeerDescription("peer-host", 8080, false);
|
||||
final InputStream inputStream = new ByteArrayInputStream(new byte[]{});
|
||||
final OutputStream outputStream = new ByteArrayOutputStream();
|
||||
final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
|
||||
final HttpServerCommunicationsSession commsSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, "user");
|
||||
commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false");
|
||||
commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, "1234");
|
||||
final String peerUrl = "http://peer-host:8080/";
|
||||
|
@ -90,7 +90,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
|
||||
private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() {
|
||||
final StandardVersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
|
||||
return new HttpFlowFileServerProtocolImpl(versionNegotiator);
|
||||
return new StandardHttpFlowFileServerProtocol(versionNegotiator);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -101,7 +101,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (HandshakeException e) {
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode());
|
||||
}
|
||||
|
||||
|
@ -122,7 +122,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (HandshakeException e) {
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode());
|
||||
}
|
||||
|
||||
|
@ -147,7 +147,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (HandshakeException e) {
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode());
|
||||
}
|
||||
|
||||
|
@ -173,7 +173,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (HandshakeException e) {
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode());
|
||||
}
|
||||
|
||||
|
@ -196,7 +196,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(true).when(authResult).isAuthorized();
|
||||
doReturn(true).when(port).isValid();
|
||||
doReturn(true).when(port).isRunning();
|
||||
Set<Connection> connections = new HashSet<>();
|
||||
final Set<Connection> connections = new HashSet<>();
|
||||
final Connection connection = mock(Connection.class);
|
||||
connections.add(connection);
|
||||
doReturn(connections).when(port).getConnections();
|
||||
|
@ -208,7 +208,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (HandshakeException e) {
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode());
|
||||
}
|
||||
|
||||
|
@ -237,13 +237,13 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
fail("transferFlowFiles should fail since it's already shutdown.");
|
||||
} catch (IllegalStateException e) {
|
||||
} catch (final IllegalStateException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
fail("receiveFlowFiles should fail since it's already shutdown.");
|
||||
} catch (IllegalStateException e) {
|
||||
} catch (final IllegalStateException e) {
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -288,12 +288,12 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum");
|
||||
fail();
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
assertTrue(e.getMessage().contains("CRC32 Checksum"));
|
||||
}
|
||||
}
|
||||
|
||||
private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId) throws IOException {
|
||||
private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException {
|
||||
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
|
||||
final Peer peer = getDefaultPeer(transactionId);
|
||||
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||
|
@ -312,21 +312,21 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(flowFile).when(processSession).get();
|
||||
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
|
||||
doAnswer(invocation -> {
|
||||
String peerUrl = (String)invocation.getArguments()[1];
|
||||
String detail = (String)invocation.getArguments()[2];
|
||||
final String peerUrl = (String)invocation.getArguments()[1];
|
||||
final String detail = (String)invocation.getArguments()[2];
|
||||
assertEquals("http://peer-host:8080/", peerUrl);
|
||||
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
|
||||
return null;
|
||||
}).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
|
||||
|
||||
doAnswer(invocation -> {
|
||||
InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
|
||||
final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
|
||||
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
|
||||
return null;
|
||||
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
|
||||
|
||||
// Execute test using mock
|
||||
int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
assertEquals(1, flowFileSent);
|
||||
|
||||
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
|
||||
|
@ -360,8 +360,8 @@ public class TestHttpFlowFileServerProtocol {
|
|||
|
||||
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
|
||||
doAnswer(invocation -> {
|
||||
String peerUrl = (String)invocation.getArguments()[1];
|
||||
String detail = (String)invocation.getArguments()[2];
|
||||
final String peerUrl = (String)invocation.getArguments()[1];
|
||||
final String detail = (String)invocation.getArguments()[2];
|
||||
assertEquals("http://peer-host:8080/", peerUrl);
|
||||
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
|
||||
return null;
|
||||
|
@ -369,15 +369,15 @@ public class TestHttpFlowFileServerProtocol {
|
|||
|
||||
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
|
||||
doAnswer(invocation -> {
|
||||
String peerUrl = (String)invocation.getArguments()[1];
|
||||
String detail = (String)invocation.getArguments()[2];
|
||||
final String peerUrl = (String)invocation.getArguments()[1];
|
||||
final String detail = (String)invocation.getArguments()[2];
|
||||
assertEquals("http://peer-host:8080/", peerUrl);
|
||||
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
|
||||
return null;
|
||||
}).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
|
||||
|
||||
doAnswer(invocation -> {
|
||||
InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
|
||||
final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
|
||||
callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
|
||||
return null;
|
||||
}).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
|
||||
|
@ -397,7 +397,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
final String contents = "Content from client.";
|
||||
final byte[] bytes = contents.getBytes();
|
||||
final InputStream in = new ByteArrayInputStream(bytes);
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("client-attr-1", "client-attr-1-value");
|
||||
attributes.put("client-attr-2", "client-attr-2-value");
|
||||
return new StandardDataPacket(attributes, in, bytes.length);
|
||||
|
@ -458,12 +458,12 @@ public class TestHttpFlowFileServerProtocol {
|
|||
try {
|
||||
serverProtocol.commitReceiveTransaction(peer);
|
||||
fail();
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
assertTrue(e.getMessage().contains("Received a BadChecksum response"));
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, String transactionId, Peer peer) throws IOException {
|
||||
private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException {
|
||||
final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance();
|
||||
final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
|
||||
commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
|
||||
|
@ -479,7 +479,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
|
||||
final FlowFile flowFile = mock(FlowFile.class);
|
||||
|
||||
DataPacket dataPacket = createClientDataPacket();
|
||||
final DataPacket dataPacket = createClientDataPacket();
|
||||
|
||||
final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
|
||||
negotiatedCoded.encode(dataPacket, testDataOs);
|
||||
|
@ -488,7 +488,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
InputStream is = (InputStream) invocation.getArguments()[0];
|
||||
final InputStream is = (InputStream) invocation.getArguments()[0];
|
||||
for (int b; (b = is.read()) >= 0;) {
|
||||
// consume stream.
|
||||
}
|
||||
|
@ -499,21 +499,21 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
|
||||
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
|
||||
doAnswer(invocation -> {
|
||||
String peerUrl = (String)invocation.getArguments()[1];
|
||||
String detail = (String)invocation.getArguments()[3];
|
||||
final String peerUrl = (String)invocation.getArguments()[1];
|
||||
final String detail = (String)invocation.getArguments()[3];
|
||||
assertEquals("http://peer-host:8080/", peerUrl);
|
||||
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
|
||||
return null;
|
||||
}).when(provenanceReporter)
|
||||
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
|
||||
|
||||
Set<Relationship> relations = new HashSet<>();
|
||||
final Set<Relationship> relations = new HashSet<>();
|
||||
final Relationship relationship = new Relationship.Builder().build();
|
||||
relations.add(relationship);
|
||||
doReturn(relations).when(context).getAvailableRelationships();
|
||||
|
||||
// Execute test using mock
|
||||
int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
assertEquals(1, flowFileReceived);
|
||||
|
||||
assertTrue(remoteSiteListener.isTransactionActive(transactionId));
|
||||
|
@ -549,7 +549,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
|
||||
|
||||
doAnswer(invocation -> {
|
||||
InputStream is = (InputStream) invocation.getArguments()[0];
|
||||
final InputStream is = (InputStream) invocation.getArguments()[0];
|
||||
for (int b; (b = is.read()) >= 0;) {
|
||||
// consume stream.
|
||||
}
|
||||
|
@ -562,15 +562,15 @@ public class TestHttpFlowFileServerProtocol {
|
|||
.when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
|
||||
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
|
||||
doAnswer(invocation -> {
|
||||
String peerUrl = (String)invocation.getArguments()[1];
|
||||
String detail = (String)invocation.getArguments()[3];
|
||||
final String peerUrl = (String)invocation.getArguments()[1];
|
||||
final String detail = (String)invocation.getArguments()[3];
|
||||
assertEquals("http://peer-host:8080/", peerUrl);
|
||||
assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
|
||||
return null;
|
||||
}).when(provenanceReporter)
|
||||
.receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
|
||||
|
||||
Set<Relationship> relations = new HashSet<>();
|
||||
final Set<Relationship> relations = new HashSet<>();
|
||||
doReturn(relations).when(context).getAvailableRelationships();
|
||||
|
||||
// Execute test using mock
|
||||
|
|
|
@ -16,7 +16,28 @@
|
|||
*/
|
||||
package org.apache.nifi.web;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.nifi.action.Action;
|
||||
import org.apache.nifi.action.Component;
|
||||
import org.apache.nifi.action.FlowChangeAction;
|
||||
|
@ -194,26 +215,7 @@ import org.apache.nifi.web.util.SnippetUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* Implementation of NiFiServiceFacade that performs revision checking.
|
||||
|
@ -2157,15 +2159,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
controllerDTO.setDisabledCount(counts.getDisabledCount());
|
||||
|
||||
// determine the site to site configuration
|
||||
if (isClustered()) {
|
||||
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
|
||||
controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort());
|
||||
controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
|
||||
} else {
|
||||
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
|
||||
controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
|
||||
controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
|
||||
}
|
||||
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
|
||||
controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
|
||||
controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
|
||||
|
||||
return controllerDTO;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
|
|||
import org.apache.nifi.remote.protocol.HandshakeProperty;
|
||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
|
||||
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
|
||||
import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
|
||||
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||
import org.apache.nifi.web.api.entity.TransactionResultEntity;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -305,16 +305,18 @@ public class DataTransferResource extends ApplicationResource {
|
|||
}
|
||||
|
||||
HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
|
||||
return new HttpFlowFileServerProtocolImpl(versionNegotiator);
|
||||
return new StandardHttpFlowFileServerProtocol(versionNegotiator);
|
||||
}
|
||||
|
||||
private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
|
||||
String clientHostName = req.getRemoteHost();
|
||||
int clientPort = req.getRemotePort();
|
||||
final String clientHostName = req.getRemoteHost();
|
||||
final int clientPort = req.getRemotePort();
|
||||
|
||||
PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
|
||||
final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
|
||||
|
||||
HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
|
||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
final String userDn = user == null ? null : user.getIdentity();
|
||||
final HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId, userDn);
|
||||
|
||||
boolean useCompression = false;
|
||||
final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
|
||||
|
@ -330,20 +332,28 @@ public class DataTransferResource extends ApplicationResource {
|
|||
commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId);
|
||||
commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression));
|
||||
|
||||
if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
|
||||
if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount);
|
||||
if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize);
|
||||
if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
|
||||
if (!isEmpty(requestExpiration)) {
|
||||
commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
|
||||
}
|
||||
if (!isEmpty(batchCount)) {
|
||||
commSession.putHandshakeParam(BATCH_COUNT, batchCount);
|
||||
}
|
||||
if (!isEmpty(batchSize)) {
|
||||
commSession.putHandshakeParam(BATCH_SIZE, batchSize);
|
||||
}
|
||||
if (!isEmpty(batchDuration)) {
|
||||
commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
|
||||
}
|
||||
|
||||
if(peerDescription.isSecure()){
|
||||
NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
|
||||
final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
|
||||
logger.debug("initiating peer, nifiUser={}", nifiUser);
|
||||
commSession.setUserDn(nifiUser.getIdentity());
|
||||
}
|
||||
|
||||
// TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
|
||||
String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
|
||||
String clusterUrl = "nifi://localhost:" + req.getLocalPort();
|
||||
final String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
|
||||
final String clusterUrl = "nifi://localhost:" + req.getLocalPort();
|
||||
return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
|
||||
}
|
||||
|
||||
|
@ -771,7 +781,7 @@ public class DataTransferResource extends ApplicationResource {
|
|||
try {
|
||||
// Do handshake
|
||||
initiateServerProtocol(peer, transportProtocolVersion);
|
||||
transactionManager.extendsTransaction(transactionId);
|
||||
transactionManager.extendTransaction(transactionId);
|
||||
|
||||
final TransactionResultEntity entity = new TransactionResultEntity();
|
||||
entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());
|
||||
|
|
|
@ -16,11 +16,22 @@
|
|||
*/
|
||||
package org.apache.nifi.web.api;
|
||||
|
||||
import com.wordnik.swagger.annotations.Api;
|
||||
import com.wordnik.swagger.annotations.ApiOperation;
|
||||
import com.wordnik.swagger.annotations.ApiResponse;
|
||||
import com.wordnik.swagger.annotations.ApiResponses;
|
||||
import com.wordnik.swagger.annotations.Authorization;
|
||||
import static org.apache.commons.lang3.StringUtils.isEmpty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.AccessDeniedException;
|
||||
import org.apache.nifi.authorization.AuthorizationRequest;
|
||||
|
@ -31,6 +42,9 @@ import org.apache.nifi.authorization.RequestAction;
|
|||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||
import org.apache.nifi.authorization.user.NiFiUser;
|
||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.remote.HttpRemoteSiteListener;
|
||||
import org.apache.nifi.remote.VersionNegotiator;
|
||||
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
|
||||
|
@ -44,18 +58,11 @@ import org.apache.nifi.web.api.entity.PeersEntity;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isEmpty;
|
||||
import com.wordnik.swagger.annotations.Api;
|
||||
import com.wordnik.swagger.annotations.ApiOperation;
|
||||
import com.wordnik.swagger.annotations.ApiResponse;
|
||||
import com.wordnik.swagger.annotations.ApiResponses;
|
||||
import com.wordnik.swagger.annotations.Authorization;
|
||||
|
||||
/**
|
||||
* RESTful endpoint for managing a SiteToSite connection.
|
||||
|
@ -70,7 +77,11 @@ public class SiteToSiteResource extends ApplicationResource {
|
|||
private static final Logger logger = LoggerFactory.getLogger(SiteToSiteResource.class);
|
||||
|
||||
private NiFiServiceFacade serviceFacade;
|
||||
private ClusterCoordinator clusterCoordinator;
|
||||
private Authorizer authorizer;
|
||||
public static final String CHECK_SUM = "checksum";
|
||||
public static final String RESPONSE_CODE = "responseCode";
|
||||
|
||||
private final ResponseCreator responseCreator = new ResponseCreator();
|
||||
private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
|
||||
private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
|
||||
|
@ -147,6 +158,7 @@ public class SiteToSiteResource extends ApplicationResource {
|
|||
return clusterContext(noCache(Response.ok(entity))).build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the available Peers and its status of this NiFi.
|
||||
*
|
||||
|
@ -185,52 +197,42 @@ public class SiteToSiteResource extends ApplicationResource {
|
|||
return responseCreator.badRequestResponse(e);
|
||||
}
|
||||
|
||||
ArrayList<PeerDTO> peers;
|
||||
|
||||
final List<PeerDTO> peers = new ArrayList<>();
|
||||
if (properties.isNode()) {
|
||||
return responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is only accessible on NCM or Standalone NiFi instance.");
|
||||
// TODO: NCM no longer exists.
|
||||
/*
|
||||
} else if (properties.isClusterManager()) {
|
||||
ClusterNodeInformation clusterNodeInfo = clusterManager.getNodeInformation();
|
||||
final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
|
||||
peers = new ArrayList<>(nodeInfos.size());
|
||||
for (NodeInformation nodeInfo : nodeInfos) {
|
||||
if (nodeInfo.getSiteToSiteHttpApiPort() == null) {
|
||||
continue;
|
||||
}
|
||||
PeerDTO peer = new PeerDTO();
|
||||
peer.setHostname(nodeInfo.getSiteToSiteHostname());
|
||||
peer.setPort(nodeInfo.getSiteToSiteHttpApiPort());
|
||||
peer.setSecure(nodeInfo.isSiteToSiteSecure());
|
||||
peer.setFlowFileCount(nodeInfo.getTotalFlowFiles());
|
||||
final Set<NodeIdentifier> nodeIds = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
|
||||
|
||||
// TODO: Get total number of FlowFiles for each node
|
||||
for (final NodeIdentifier nodeId : nodeIds) {
|
||||
final PeerDTO peer = new PeerDTO();
|
||||
final String siteToSiteAddress = nodeId.getSiteToSiteAddress();
|
||||
peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress);
|
||||
peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
|
||||
peer.setSecure(nodeId.isSiteToSiteSecure());
|
||||
peer.setFlowFileCount(0);
|
||||
peers.add(peer);
|
||||
}
|
||||
*/
|
||||
} else {
|
||||
// Standalone mode.
|
||||
PeerDTO peer = new PeerDTO();
|
||||
final PeerDTO peer = new PeerDTO();
|
||||
// req.getLocalName returns private IP address, that can't be accessed from client in some environments.
|
||||
// So, use the value defined in nifi.properties instead when it is defined.
|
||||
String remoteInputHost = properties.getRemoteInputHost();
|
||||
final String remoteInputHost = properties.getRemoteInputHost();
|
||||
peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost);
|
||||
peer.setPort(properties.getRemoteInputHttpPort());
|
||||
peer.setSecure(properties.isSiteToSiteSecure());
|
||||
peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host.
|
||||
|
||||
peers = new ArrayList<>(1);
|
||||
peers.add(peer);
|
||||
|
||||
}
|
||||
|
||||
PeersEntity entity = new PeersEntity();
|
||||
final PeersEntity entity = new PeersEntity();
|
||||
entity.setPeers(peers);
|
||||
|
||||
return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
|
||||
}
|
||||
|
||||
// setters
|
||||
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
|
||||
public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
|
||||
this.serviceFacade = serviceFacade;
|
||||
}
|
||||
|
||||
|
@ -238,4 +240,9 @@ public class SiteToSiteResource extends ApplicationResource {
|
|||
this.authorizer = authorizer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
|
||||
super.setClusterCoordinator(clusterCoordinator);
|
||||
this.clusterCoordinator = clusterCoordinator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -677,39 +677,6 @@ public class ControllerFacade implements Authorizable {
|
|||
flowService.saveFlowChanges(TimeUnit.SECONDS, writeDelaySeconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the socket port that the Cluster Manager is listening on for
|
||||
* Site-to-Site communications
|
||||
*
|
||||
* @return the socket port that the Cluster Manager is listening on for
|
||||
* Site-to-Site communications
|
||||
*/
|
||||
public Integer getClusterManagerRemoteSiteListeningPort() {
|
||||
return flowController.getClusterManagerRemoteSiteListeningPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the http(s) port that the Cluster Manager is listening on for
|
||||
* Site-to-Site communications
|
||||
*
|
||||
* @return the socket port that the Cluster Manager is listening on for
|
||||
* Site-to-Site communications
|
||||
*/
|
||||
public Integer getClusterManagerRemoteSiteListeningHttpPort() {
|
||||
return flowController.getClusterManagerRemoteSiteListeningHttpPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether or not Site-to-Site communications with the Cluster
|
||||
* Manager are secure
|
||||
*
|
||||
* @return whether or not Site-to-Site communications with the Cluster
|
||||
* Manager are secure
|
||||
*/
|
||||
public Boolean isClusterManagerRemoteSiteCommsSecure() {
|
||||
return flowController.isClusterManagerRemoteSiteCommsSecure();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the socket port that the local instance is listening on for
|
||||
* Site-to-Site communications
|
||||
|
|
|
@ -1334,7 +1334,7 @@ nf.ConnectionConfiguration = (function () {
|
|||
});
|
||||
|
||||
// store the connection details
|
||||
$('#connection-uri').val(connection.uri);
|
||||
$('#connection-uri').val(connectionEntry.uri);
|
||||
|
||||
// configure the button model
|
||||
$('#connection-configuration').modal('setButtonModel', [{
|
||||
|
|
Loading…
Reference in New Issue