From 6541eac625ff1d721867152d728c53483eab822a Mon Sep 17 00:00:00 2001
From: Koji Kawamura
Date: Tue, 27 Aug 2019 16:53:02 +0900
Subject: [PATCH] NIFI-6598 Storing peers into managed-state - Fixed checkstyle
errors. - Added PeerPersistence interface. - Expose RemoteProcessGroup state
via REST API - Made stateManager transient.
This closes #3677.
Signed-off-by: Bryan Bende
---
.../org/apache/nifi/util/NiFiProperties.java | 12 --
.../client/AbstractPeerPersistence.java | 93 +++++++++++++
.../remote/client/FilePeerPersistence.java | 55 ++++++++
.../nifi/remote/client/PeerPersistence.java | 28 ++++
.../nifi/remote/client/PeerSelector.java | 100 +++++---------
.../remote/client/PeerStatusProvider.java | 7 +
.../nifi/remote/client/SiteToSiteClient.java | 54 +++++++-
.../remote/client/SiteToSiteClientConfig.java | 11 ++
.../remote/client/StatePeerPersistence.java | 65 +++++++++
.../nifi/remote/client/http/HttpClient.java | 8 +-
.../client/socket/EndpointConnectionPool.java | 15 +-
.../remote/client/socket/SocketClient.java | 2 +-
.../nifi/remote/util/PeerStatusCache.java | 14 +-
.../nifi/remote/client/TestPeerSelector.java | 128 ++++++++++++++++++
.../client/socket/TestSiteToSiteClient.java | 38 +++++-
.../nifi/groups/RemoteProcessGroup.java | 3 +
.../controller/flow/StandardFlowManager.java | 4 +-
.../nifi/groups/StandardProcessGroup.java | 8 ++
.../remote/StandardRemoteProcessGroup.java | 22 ++-
.../nifi/remote/StandardRemoteGroupPort.java | 9 +-
.../apache/nifi/web/NiFiServiceFacade.java | 8 ++
.../nifi/web/StandardNiFiServiceFacade.java | 10 ++
.../web/api/RemoteProcessGroupResource.java | 57 ++++++++
.../nifi/web/dao/ComponentStateDAO.java | 10 ++
.../nifi/web/dao/RemoteProcessGroupDAO.java | 10 ++
.../dao/impl/StandardComponentStateDAO.java | 6 +
.../impl/StandardRemoteProcessGroupDAO.java | 14 ++
.../main/resources/nifi-web-api-context.xml | 1 +
28 files changed, 674 insertions(+), 118 deletions(-)
create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index afcd2682b8..0315fdfaf7 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -77,7 +77,6 @@ public abstract class NiFiProperties {
public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration";
public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
- public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration";
public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout";
public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count";
@@ -272,7 +271,6 @@ public abstract class NiFiProperties {
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
- public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
@@ -729,16 +727,6 @@ public abstract class NiFiProperties {
DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT);
}
- public File getPersistentStateDirectory() {
- final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY,
- DEFAULT_PERSISTENT_STATE_DIRECTORY);
- final File file = new File(dirName);
- if (!file.exists()) {
- file.mkdirs();
- }
- return file;
- }
-
// getters for cluster node properties //
public boolean isNode() {
return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
new file mode 100644
index 0000000000..758b4255be
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public abstract class AbstractPeerPersistence implements PeerPersistence {
+
+ protected Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected PeerStatusCache restorePeerStatuses(final BufferedReader reader,
+ long cachedTimestamp) throws IOException {
+ final SiteToSiteTransportProtocol transportProtocol;
+ try {
+ transportProtocol = SiteToSiteTransportProtocol.valueOf(reader.readLine());
+ } catch (IllegalArgumentException e) {
+ logger.info("Discard stored peer statuses in {} because transport protocol is not stored",
+ this.getClass().getSimpleName());
+ return null;
+ }
+
+ final Set restoredStatuses = readPeerStatuses(reader);
+
+ if (!restoredStatuses.isEmpty()) {
+ logger.info("Restored peer statuses from {} {}", this.getClass().getSimpleName(), restoredStatuses);
+ return new PeerStatusCache(restoredStatuses, cachedTimestamp, transportProtocol);
+ }
+
+ return null;
+ }
+
+ private Set readPeerStatuses(final BufferedReader reader) throws IOException {
+ final Set statuses = new HashSet<>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String[] splits = line.split(Pattern.quote(":"));
+ if (splits.length != 3 && splits.length != 4) {
+ continue;
+ }
+
+ final String hostname = splits[0];
+ final int port = Integer.parseInt(splits[1]);
+ final boolean secure = Boolean.parseBoolean(splits[2]);
+
+ final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
+
+ statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
+ }
+
+ return statuses;
+ }
+
+
+ @FunctionalInterface
+ protected interface IOConsumer {
+ void accept(T value) throws IOException;
+ }
+
+ protected void write(final PeerStatusCache peerStatusCache, final IOConsumer consumer) throws IOException {
+ consumer.accept(peerStatusCache.getTransportProtocol().name() + "\n");
+ for (final PeerStatus status : peerStatusCache.getStatuses()) {
+ final PeerDescription description = status.getPeerDescription();
+ final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
+ consumer.accept(line);
+ }
+ }
+
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
new file mode 100644
index 0000000000..8fa167dca5
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class FilePeerPersistence extends AbstractPeerPersistence {
+
+ private final File persistenceFile;
+
+ public FilePeerPersistence(File persistenceFile) {
+ this.persistenceFile = persistenceFile;
+ }
+
+ @Override
+ public void save(final PeerStatusCache peerStatusCache) throws IOException {
+ try (final OutputStream fos = new FileOutputStream(persistenceFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+ write(peerStatusCache, line -> out.write(line.getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+
+ @Override
+ public PeerStatusCache restore() throws IOException {
+ try (final InputStream fis = new FileInputStream(persistenceFile);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
+ return restorePeerStatuses(reader, persistenceFile.lastModified());
+ }
+ }
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
new file mode 100644
index 0000000000..56982471c0
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.IOException;
+
+public interface PeerPersistence {
+
+ void save(final PeerStatusCache peerStatusCache) throws IOException;
+
+ PeerStatusCache restore() throws IOException;
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 14c163bace..8235a38a7a 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -21,20 +21,12 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -48,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.remote.util.EventReportUtil.error;
@@ -67,7 +58,7 @@ public class PeerSelector {
private volatile long peerRefreshTime = 0L;
private final AtomicLong peerIndex = new AtomicLong(0L);
private volatile PeerStatusCache peerStatusCache;
- private final File persistenceFile;
+ private final PeerPersistence peerPersistence;
private EventReporter eventReporter;
@@ -90,72 +81,43 @@ public class PeerSelector {
this.systemTime = systemTime;
}
- public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) {
+ public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
this.peerStatusProvider = peerStatusProvider;
- this.persistenceFile = persistenceFile;
- Set recoveredStatuses;
- if (persistenceFile != null && persistenceFile.exists()) {
- try {
- recoveredStatuses = recoverPersistedPeerStatuses(persistenceFile);
- this.peerStatusCache = new PeerStatusCache(recoveredStatuses, persistenceFile.lastModified());
- } catch (final IOException ioe) {
- logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe);
+ this.peerPersistence = peerPersistence;
+
+ try {
+ PeerStatusCache restoredPeerStatusCache = null;
+ if (peerPersistence != null) {
+ restoredPeerStatusCache = peerPersistence.restore();
+ if (restoredPeerStatusCache != null) {
+ final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
+ final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
+ if (!currentProtocol.equals(cachedProtocol)) {
+ logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
+ peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
+ restoredPeerStatusCache = null;
+ }
+ }
}
- } else {
- peerStatusCache = null;
+ this.peerStatusCache = restoredPeerStatusCache;
+
+ } catch (final IOException ioe) {
+ logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
+ peerPersistence.getClass().getSimpleName(), ioe);
}
}
- private void persistPeerStatuses(final Set statuses) {
- if (persistenceFile == null) {
- return;
- }
-
- try (final OutputStream fos = new FileOutputStream(persistenceFile);
- final OutputStream out = new BufferedOutputStream(fos)) {
-
- for (final PeerStatus status : statuses) {
- final PeerDescription description = status.getPeerDescription();
- final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
- out.write(line.getBytes(StandardCharsets.UTF_8));
- }
-
+ private void persistPeerStatuses() {
+ try {
+ peerPersistence.save(peerStatusCache);
} catch (final IOException e) {
- error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted and peer's NCM is down," +
- " may be unable to transfer data until communications with NCM are restored", e.toString());
+ error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
+ " and the nodes specified at the RPG are down," +
+ " may be unable to transfer data until communications with those nodes are restored", e.toString());
logger.error("", e);
}
}
- private static Set recoverPersistedPeerStatuses(final File file) throws IOException {
- if (!file.exists()) {
- return null;
- }
-
- final Set statuses = new HashSet<>();
- try (final InputStream fis = new FileInputStream(file);
- final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
-
- String line;
- while ((line = reader.readLine()) != null) {
- final String[] splits = line.split(Pattern.quote(":"));
- if (splits.length != 3 && splits.length != 4) {
- continue;
- }
-
- final String hostname = splits[0];
- final int port = Integer.parseInt(splits[1]);
- final boolean secure = Boolean.parseBoolean(splits[2]);
-
- final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
-
- statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
- }
- }
-
- return statuses;
- }
-
List formulateDestinationList(final Set statuses, final TransferDirection direction) {
final int numDestinations = Math.max(128, statuses.size());
@@ -340,8 +302,8 @@ public class PeerSelector {
try {
final Set statuses = fetchRemotePeerStatuses();
- persistPeerStatuses(statuses);
- peerStatusCache = new PeerStatusCache(statuses);
+ peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+ persistPeerStatuses();
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
} catch (Exception e) {
warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
index 817bccf071..64fd161382 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.io.IOException;
import java.util.Set;
@@ -57,4 +58,10 @@ public interface PeerStatusProvider {
*/
Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException;
+ /**
+ * Returns the transport protocol being used.
+ * @return the transport protocol
+ */
+ SiteToSiteTransportProtocol getTransportProtocol();
+
}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 29eb465c85..b805d03e7c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote.client;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -67,7 +68,7 @@ import java.util.concurrent.TimeUnit;
* interaction with the remote instance takes place. After data has been
* exchanged or it is determined that no data is available, the Transaction can
* then be canceled (via the {@link Transaction#cancel(String)} method) or can
- * be completed (via the {@link Transaction#complete(boolean)} method).
+ * be completed (via the {@link Transaction#complete()} method).
*
*
*
@@ -164,6 +165,7 @@ public interface SiteToSiteClient extends Closeable {
private KeystoreType truststoreType;
private EventReporter eventReporter = EventReporter.NO_OP;
private File peerPersistenceFile;
+ private StateManager stateManager;
private boolean useCompression;
private String portName;
private String portIdentifier;
@@ -482,8 +484,8 @@ public interface SiteToSiteClient extends Closeable {
/**
* Specifies a file that the client can write to in order to persist the
* list of nodes in the remote cluster and recover the list of nodes
- * upon restart. This allows the client to function if the remote
- * Cluster Manager is unavailable, even after a restart of the client
+ * upon restart. This allows the client to function if the remote nodes
+ * specified by the urls are unavailable, even after a restart of the client
* software. If not specified, the list of nodes will not be persisted
* and a failure of the Cluster Manager will result in not being able to
* communicate with the remote instance if a new client is created.
@@ -496,6 +498,32 @@ public interface SiteToSiteClient extends Closeable {
return this;
}
+ /**
+ *
Specifies StateManager that the client can persist the
+ * list of nodes in the remote cluster and recover the list of nodes
+ * upon restart. This allows the client to function if the remote nodes
+ * specified by the urls are unavailable, even after a restart of the client
+ * software. If not specified, the list of nodes will not be persisted
+ * and a failure of the Cluster Manager will result in not being able to
+ * communicate with the remote instance if a new client is created.
+ * Using a StateManager is preferable over using a File to persist the list of nodes
+ * if the SiteToSiteClient is used by a NiFi component having access to a NiFi context.
+ * So that the list of nodes can be persisted in the same manner with other stateful information.
+ * Since StateManager is not serializable, the specified StateManager
+ * will not be serialized, and a de-serialized SiteToSiteClientConfig
+ * instance will not have StateManager even if the original config has one.
+ * Use {@link #peerPersistenceFile(File)} instead
+ * if the same SiteToSiteClientConfig needs to be distributed among multiple
+ * clients via serialization, and also persistent connectivity is required
+ * in case of having no available remote node specified by the urls when a client restarts.
+ * @param stateManager state manager
+ * @return the builder
+ */
+ public Builder stateManager(final StateManager stateManager) {
+ this.stateManager = stateManager;
+ return this;
+ }
+
/**
* Specifies whether or not data should be compressed before being
* transferred to or from the remote instance.
@@ -748,6 +776,7 @@ public interface SiteToSiteClient extends Closeable {
private final KeystoreType truststoreType;
private final EventReporter eventReporter;
private final File peerPersistenceFile;
+ private final transient StateManager stateManager;
private final boolean useCompression;
private final SiteToSiteTransportProtocol transportProtocol;
private final String portName;
@@ -773,6 +802,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = null;
this.eventReporter = null;
this.peerPersistenceFile = null;
+ this.stateManager = null;
this.useCompression = false;
this.portName = null;
this.portIdentifier = null;
@@ -801,6 +831,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = builder.truststoreType;
this.eventReporter = builder.eventReporter;
this.peerPersistenceFile = builder.peerPersistenceFile;
+ this.stateManager = builder.stateManager;
this.useCompression = builder.useCompression;
this.portName = builder.portName;
this.portIdentifier = builder.portIdentifier;
@@ -921,6 +952,23 @@ public interface SiteToSiteClient extends Closeable {
return peerPersistenceFile;
}
+ @Override
+ public StateManager getStateManager() {
+ return stateManager;
+ }
+
+ @Override
+ public PeerPersistence getPeerPersistence() {
+ if (stateManager != null) {
+ return new StatePeerPersistence(stateManager);
+
+ } else if (peerPersistenceFile != null) {
+ return new FilePeerPersistence(peerPersistenceFile);
+ }
+
+ return null;
+ }
+
@Override
public EventReporter getEventReporter() {
return eventReporter;
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 8da5e706b1..604e078f8e 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -110,6 +111,16 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
File getPeerPersistenceFile();
+ /**
+ * @return the StateManager to be used for persisting the nodes of a remote
+ */
+ StateManager getStateManager();
+
+ /**
+ * @return a PeerPersistence implementation based on configured persistent target
+ */
+ PeerPersistence getPeerPersistence();
+
/**
* @return a boolean indicating whether or not compression will be used to
* transfer data to and from the remote instance
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
new file mode 100644
index 0000000000..185e8fd78e
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StatePeerPersistence extends AbstractPeerPersistence {
+
+ static final String STATE_KEY_PEERS = "peers";
+ static final String STATE_KEY_TRANSPORT_PROTOCOL = "protocol";
+ static final String STATE_KEY_PEERS_TIMESTAMP = "peers.ts";
+
+ private final StateManager stateManager;
+
+ public StatePeerPersistence(StateManager stateManager) {
+ this.stateManager = stateManager;
+ }
+
+ @Override
+ public void save(final PeerStatusCache peerStatusCache) throws IOException {
+ final StateMap state = stateManager.getState(Scope.LOCAL);
+ final Map stateMap = state.toMap();
+ final Map updatedStateMap = new HashMap<>(stateMap);
+ final StringBuilder peers = new StringBuilder();
+ write(peerStatusCache, peers::append);
+ updatedStateMap.put(STATE_KEY_PEERS, peers.toString());
+ updatedStateMap.put(STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ stateManager.setState(updatedStateMap, Scope.LOCAL);
+ }
+
+ @Override
+ public PeerStatusCache restore() throws IOException {
+ final StateMap state = stateManager.getState(Scope.LOCAL);
+ final String storedPeers = state.get(STATE_KEY_PEERS);
+ if (storedPeers != null && !storedPeers.isEmpty()) {
+ try (final BufferedReader reader = new BufferedReader(new StringReader(storedPeers))) {
+ return restorePeerStatuses(reader, Long.parseLong(state.get(STATE_KEY_PEERS_TIMESTAMP)));
+ }
+ }
+ return null;
+ }
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index e1516d2923..690cdfd6a9 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -34,6 +34,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
@@ -64,7 +65,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
public HttpClient(final SiteToSiteClientConfig config) {
super(config);
- peerSelector = new PeerSelector(this, config.getPeerPersistenceFile());
+ peerSelector = new PeerSelector(this, config.getPeerPersistence());
peerSelector.setEventReporter(config.getEventReporter());
taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@@ -246,4 +247,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
transaction.getCommunicant().getCommunicationsSession().interrupt();
}
}
+
+ @Override
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return SiteToSiteTransportProtocol.HTTP;
+ }
}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 17d66dab3f..53bd963e18 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerPersistence;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
@@ -36,6 +37,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
@@ -44,7 +46,6 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -91,8 +92,9 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private final InetAddress localAddress;
public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
- final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider,
- final InetAddress localAddress) {
+ final SSLContext sslContext, final EventReporter eventReporter,
+ final PeerPersistence peerPersistence, final SiteInfoProvider siteInfoProvider,
+ final InetAddress localAddress) {
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
this.remoteDestination = remoteDestination;
@@ -104,7 +106,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
this.siteInfoProvider = siteInfoProvider;
- peerSelector = new PeerSelector(this, persistenceFile);
+ peerSelector = new PeerSelector(this, peerPersistence);
peerSelector.setEventReporter(eventReporter);
// Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
@@ -563,5 +565,8 @@ public class EndpointConnectionPool implements PeerStatusProvider {
}
}
-
+ @Override
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return SiteToSiteTransportProtocol.RAW;
+ }
}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 0999d57fb4..ff8e0d6e4c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
commsTimeout,
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
- config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(),
+ config.getSslContext(), config.getEventReporter(), config.getPeerPersistence(),
siteInfoProvider, config.getLocalAddress()
);
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index c52b4b7a80..acca34eb96 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -19,19 +19,19 @@ package org.apache.nifi.remote.util;
import java.util.Set;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
public class PeerStatusCache {
private final Set statuses;
private final long timestamp;
+ private final SiteToSiteTransportProtocol transportProtocol;
- public PeerStatusCache(final Set statuses) {
- this(statuses, System.currentTimeMillis());
- }
-
- public PeerStatusCache(final Set statuses, final long timestamp) {
+ public PeerStatusCache(final Set statuses, final long timestamp,
+ final SiteToSiteTransportProtocol transportProtocol) {
this.statuses = statuses;
this.timestamp = timestamp;
+ this.transportProtocol = transportProtocol;
}
public Set getStatuses() {
@@ -41,4 +41,8 @@ public class PeerStatusCache {
public long getTimestamp() {
return timestamp;
}
+
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return transportProtocol;
+ }
}
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index e29efd85de..72dd9a6448 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -16,21 +16,34 @@
*/
package org.apache.nifi.remote.client;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
@@ -40,8 +53,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
public class TestPeerSelector {
@@ -255,4 +271,116 @@ public class TestPeerSelector {
assert(!peers.isEmpty());
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription());
}
+
+ @Test
+ public void testPeerStatusManagedCache() throws Exception {
+ final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final StateMap stateMap = Mockito.mock(StateMap.class);
+ final Map state = new HashMap<>();
+ state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+ when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+ when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
+ doAnswer(invocation -> {
+ final Map updatedMap = invocation.getArgument(0);
+ state.clear();
+ state.putAll(updatedMap);
+ return null;
+ }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+ when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+ // PeerSelector should restore peer statuses from managed cache.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should be used",
+ "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+
+ // If the stored state is too old, PeerSelector refreshes peers.
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis() - 120_000));
+ peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Peers should be refreshed",
+ "RAW\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+ }
+
+ @Test
+ public void testPeerStatusManagedCacheDifferentProtocol() throws Exception {
+ final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final StateMap stateMap = Mockito.mock(StateMap.class);
+ final Map state = new HashMap<>();
+ state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
+ when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+ when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
+ doAnswer(invocation -> {
+ final Map updatedMap = invocation.getArgument(0);
+ state.clear();
+ state.putAll(updatedMap);
+ return null;
+ }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+ when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+ // PeerSelector should NOT restore peer statuses from managed cache because protocol changed.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should NOT be used",
+ "HTTP\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+ }
+
+ @Test
+ public void testPeerStatusFileCache() throws Exception {
+ final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
+ when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+ when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
+
+ final File file = File.createTempFile("peers", "txt");
+ file.deleteOnExit();
+
+ try (final FileOutputStream fos = new FileOutputStream(file)) {
+ fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
+ }
+
+ final Supplier readFile = () -> {
+ try (final FileInputStream fin = new FileInputStream(file);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(fin))) {
+ final StringBuilder lines = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.append(line).append("\n");
+ }
+ return lines.toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // PeerSelector should restore peer statuses from managed cache.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should be used",
+ "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", readFile.get());
+
+ // If the stored state is too old, PeerSelector refreshes peers.
+ file.setLastModified(System.currentTimeMillis() - 120_000);
+ peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
+ peerSelector.refreshPeers();
+ assertEquals("Peers should be refreshed",
+ "RAW\nnifi0:8081:false:true\n", readFile.get());
+ }
}
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index c0b5e83f5a..7c70a7a3d1 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -19,19 +19,21 @@ package org.apache.nifi.remote.client.socket;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
@@ -136,6 +138,40 @@ public class TestSiteToSiteClient {
}
}
+ @Test
+ public void testSerializationWithStateManager() {
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("input")
+ .stateManager(stateManager)
+ .buildConfig();
+
+ final Kryo kryo = new Kryo();
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Output output = new Output(out);
+
+ try {
+ kryo.writeObject(output, clientConfig);
+ } finally {
+ output.close();
+ }
+
+ final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ final Input input = new Input(in);
+
+ try {
+ SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
+ Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls());
+ // Serialization works, but the state manager is not serialized.
+ Assert.assertNotNull(clientConfig.getStateManager());
+ Assert.assertNull(clientConfig2.getStateManager());
+ } finally {
+ input.close();
+ }
+ }
+
@Test
public void testGetUrlBackwardCompatibility() {
final Set urls = new LinkedHashSet<>();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 39be045bd3..f9c1021098 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -19,6 +19,7 @@ package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.VersionedComponent;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
@@ -240,4 +241,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
void verifyCanStopTransmitting();
void verifyCanUpdate();
+
+ StateManager getStateManager();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index a4fe16a447..fd74d54a63 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -211,7 +211,9 @@ public class StandardFlowManager implements FlowManager {
}
public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
- return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties);
+ return new StandardRemoteProcessGroup(requireNonNull(id), uris, null,
+ processScheduler, bulletinRepository, sslContext, nifiProperties,
+ flowController.getStateManagerProvider().getStateManager(id));
}
public void setRootGroup(final ProcessGroup rootGroup) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 52565521f8..82c51ab384 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -853,6 +853,14 @@ public final class StandardProcessGroup implements ProcessGroup {
remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
+ final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
+ scheduler.submitFrameworkTask(new Runnable() {
+ @Override
+ public void run() {
+ stateManagerProvider.onComponentRemoved(remoteGroup.getIdentifier());
+ }
+ });
+
remoteGroups.remove(remoteGroupId);
LOG.info("{} removed from flow", remoteProcessGroup);
} finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 48e21271c2..8514b5d06a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,7 +18,6 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
@@ -54,6 +53,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
@@ -100,6 +100,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ProcessScheduler scheduler;
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
+ private final StateManager stateManager;
private final long remoteContentsCacheExpiration;
private volatile boolean initialized = false;
@@ -146,8 +147,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler,
- final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties) {
+ final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties,
+ final StateManager stateManager) {
this.nifiProperties = nifiProperties;
+ this.stateManager = stateManager;
this.id = requireNonNull(id);
this.targetUris = targetUris;
@@ -234,11 +237,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void onRemove() {
backgroundThreadExecutor.shutdown();
-
- final File file = getPeerPersistenceFile();
- if (file.exists() && !file.delete()) {
- logger.warn("Failed to remove {}. This file should be removed manually.", file);
- }
}
@Override
@@ -1366,11 +1364,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- private File getPeerPersistenceFile() {
- final File stateDir = nifiProperties.getPersistentStateDirectory();
- return new File(stateDir, getIdentifier() + ".peers");
- }
-
@Override
public Optional getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());
@@ -1393,4 +1386,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
}
+
+ @Override
+ public StateManager getStateManager() {
+ return stateManager;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3b4d6308d1..bd2687e476 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -42,7 +42,6 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
@@ -56,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -125,11 +123,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.targetId = targetId;
}
- private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties, final SiteToSiteTransportProtocol transportProtocol) {
- final File stateDir = nifiProperties.getPersistentStateDirectory();
- return new File(stateDir, String.format("%s_%s.peers", portId, transportProtocol.name()));
- }
-
@Override
public boolean isTargetRunning() {
return targetRunning.get();
@@ -181,7 +174,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.sslContext(sslContext)
.useCompression(isUseCompression())
.eventReporter(remoteGroup.getEventReporter())
- .peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), nifiProperties, remoteGroup.getTransportProtocol()))
+ .stateManager(remoteGroup.getStateManager())
.nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS)
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol())
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 46fcd80cbd..86ec526e67 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1675,6 +1675,14 @@ public interface NiFiServiceFacade {
*/
void clearReportingTaskState(String reportingTaskId);
+ /**
+ * Gets the state for the specified RemoteProcessGroup.
+ *
+ * @param remoteProcessGroupId the RemoteProcessGroup id
+ * @return the component state
+ */
+ ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
+
// ----------------------------------------
// Label methods
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 6adc662f82..9c96cc5cc2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1536,6 +1536,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
reportingTaskDAO.clearState(reportingTaskId);
}
+ @Override
+ public ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId) {
+ final StateMap clusterState = isClustered() ? remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.CLUSTER) : null;
+ final StateMap localState = remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.LOCAL);
+
+ // processor will be non null as it was already found when getting the state
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+ return dtoFactory.createComponentStateDTO(remoteProcessGroupId, remoteProcessGroup.getClass(), localState, clusterState);
+ }
+
@Override
public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
final Connection connection = connectionDAO.getConnection(connectionId);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index f034082e58..575e01be06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -30,10 +30,12 @@ import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
@@ -827,6 +829,61 @@ public class RemoteProcessGroupResource extends ApplicationResource {
);
}
+ /**
+ * Gets the state for a RemoteProcessGroup.
+ *
+ * @param id The id of the RemoteProcessGroup
+ * @return a componentStateEntity
+ * @throws InterruptedException if interrupted
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/state")
+ @ApiOperation(
+ value = "Gets the state for a RemoteProcessGroup",
+ response = ComponentStateEntity.class,
+ authorizations = {
+ @Authorization(value = "Write - /remote-process-groups/{uuid}")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getState(
+ @ApiParam(
+ value = "The processor id.",
+ required = true
+ )
+ @PathParam("id") final String id) throws InterruptedException {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable authorizable = lookup.getRemoteProcessGroup(id);
+ authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ });
+
+ // get the component state
+ final ComponentStateDTO state = serviceFacade.getRemoteProcessGroupState(id);
+
+ // generate the response entity
+ final ComponentStateEntity entity = new ComponentStateEntity();
+ entity.setComponentState(state);
+
+ // generate the response
+ return generateOkResponse(entity).build();
+ }
+
private RemoteProcessGroupDTO createDTOWithDesiredRunStatus(final String id, final RemotePortRunStatusEntity entity) {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
dto.setId(id);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index 636addb02f..adb50af3dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
public interface ComponentStateDAO {
@@ -71,4 +72,13 @@ public interface ComponentStateDAO {
* @param reportingTask reporting task
*/
void clearState(ReportingTaskNode reportingTask);
+
+ /**
+ * Gets the state map for the specified RemoteProcessGroup.
+ *
+ * @param remoteProcessGroup RemoteProcessGroup
+ * @param scope scope
+ * @return state map
+ */
+ StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
index 25421855f5..7446a34f34 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.dao;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@@ -120,4 +122,12 @@ public interface RemoteProcessGroupDAO {
* @param remoteProcessGroupId The remote process group id
*/
void deleteRemoteProcessGroup(String remoteProcessGroupId);
+
+ /**
+ * Gets the specified RemoteProcessGroupId.
+ *
+ * @param remoteProcessGroupId RemoteProcessGroupId id
+ * @return state map
+ */
+ StateMap getState(String remoteProcessGroupId, Scope scope);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index f0a9094daf..c48186bf51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -90,6 +91,11 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
clearState(reportingTask.getIdentifier());
}
+ @Override
+ public StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope) {
+ return getState(remoteProcessGroup.getIdentifier(), scope);
+ }
+
/* setters */
public void setStateManagerProvider(StateManagerProvider stateManagerProvider) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index b8399f7f30..274b5e45b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -17,6 +17,8 @@
package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
@@ -31,6 +33,7 @@ import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import java.util.ArrayList;
@@ -43,6 +46,7 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
private FlowController flowController;
+ private ComponentStateDAO componentStateDAO;
private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
@@ -465,7 +469,17 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
}
+ @Override
+ public StateMap getState(String remoteProcessGroupId, Scope scope) {
+ final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
+ return componentStateDAO.getState(remoteProcessGroup, scope);
+ }
+
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
+
+ public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
+ this.componentStateDAO = componentStateDAO;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 72ffc2ca33..56b8d4d1c1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -82,6 +82,7 @@
+