NIFI-6598 Storing peers into managed-state

- Fixed checkstyle errors.
- Added PeerPersistence interface.
- Expose RemoteProcessGroup state via REST API
- Made stateManager transient.

This closes #3677.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2019-08-27 16:53:02 +09:00 committed by Bryan Bende
parent 2f3ed5c40c
commit 6541eac625
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
28 changed files with 674 additions and 118 deletions

View File

@ -77,7 +77,6 @@ public abstract class NiFiProperties {
public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration";
public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration";
public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration";
public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout"; public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout";
public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count"; public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count";
@ -272,7 +271,6 @@ public abstract class NiFiProperties {
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L; public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB"; public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec"; public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis"; public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs"; public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
@ -729,16 +727,6 @@ public abstract class NiFiProperties {
DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT); DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT);
} }
public File getPersistentStateDirectory() {
final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY,
DEFAULT_PERSISTENT_STATE_DIRECTORY);
final File file = new File(dirName);
if (!file.exists()) {
file.mkdirs();
}
return file;
}
// getters for cluster node properties // // getters for cluster node properties //
public boolean isNode() { public boolean isNode() {
return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
public abstract class AbstractPeerPersistence implements PeerPersistence {
protected Logger logger = LoggerFactory.getLogger(getClass());
protected PeerStatusCache restorePeerStatuses(final BufferedReader reader,
long cachedTimestamp) throws IOException {
final SiteToSiteTransportProtocol transportProtocol;
try {
transportProtocol = SiteToSiteTransportProtocol.valueOf(reader.readLine());
} catch (IllegalArgumentException e) {
logger.info("Discard stored peer statuses in {} because transport protocol is not stored",
this.getClass().getSimpleName());
return null;
}
final Set<PeerStatus> restoredStatuses = readPeerStatuses(reader);
if (!restoredStatuses.isEmpty()) {
logger.info("Restored peer statuses from {} {}", this.getClass().getSimpleName(), restoredStatuses);
return new PeerStatusCache(restoredStatuses, cachedTimestamp, transportProtocol);
}
return null;
}
private Set<PeerStatus> readPeerStatuses(final BufferedReader reader) throws IOException {
final Set<PeerStatus> statuses = new HashSet<>();
String line;
while ((line = reader.readLine()) != null) {
final String[] splits = line.split(Pattern.quote(":"));
if (splits.length != 3 && splits.length != 4) {
continue;
}
final String hostname = splits[0];
final int port = Integer.parseInt(splits[1]);
final boolean secure = Boolean.parseBoolean(splits[2]);
final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
}
return statuses;
}
@FunctionalInterface
protected interface IOConsumer<T> {
void accept(T value) throws IOException;
}
protected void write(final PeerStatusCache peerStatusCache, final IOConsumer<String> consumer) throws IOException {
consumer.accept(peerStatusCache.getTransportProtocol().name() + "\n");
for (final PeerStatus status : peerStatusCache.getStatuses()) {
final PeerDescription description = status.getPeerDescription();
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
consumer.accept(line);
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.util.PeerStatusCache;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class FilePeerPersistence extends AbstractPeerPersistence {
private final File persistenceFile;
public FilePeerPersistence(File persistenceFile) {
this.persistenceFile = persistenceFile;
}
@Override
public void save(final PeerStatusCache peerStatusCache) throws IOException {
try (final OutputStream fos = new FileOutputStream(persistenceFile);
final OutputStream out = new BufferedOutputStream(fos)) {
write(peerStatusCache, line -> out.write(line.getBytes(StandardCharsets.UTF_8)));
}
}
@Override
public PeerStatusCache restore() throws IOException {
try (final InputStream fis = new FileInputStream(persistenceFile);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
return restorePeerStatuses(reader, persistenceFile.lastModified());
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.util.PeerStatusCache;
import java.io.IOException;
public interface PeerPersistence {
void save(final PeerStatusCache peerStatusCache) throws IOException;
PeerStatusCache restore() throws IOException;
}

View File

@ -21,20 +21,12 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.PeerStatusCache; import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -48,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.nifi.remote.util.EventReportUtil.error; import static org.apache.nifi.remote.util.EventReportUtil.error;
@ -67,7 +58,7 @@ public class PeerSelector {
private volatile long peerRefreshTime = 0L; private volatile long peerRefreshTime = 0L;
private final AtomicLong peerIndex = new AtomicLong(0L); private final AtomicLong peerIndex = new AtomicLong(0L);
private volatile PeerStatusCache peerStatusCache; private volatile PeerStatusCache peerStatusCache;
private final File persistenceFile; private final PeerPersistence peerPersistence;
private EventReporter eventReporter; private EventReporter eventReporter;
@ -90,72 +81,43 @@ public class PeerSelector {
this.systemTime = systemTime; this.systemTime = systemTime;
} }
public PeerSelector(final PeerStatusProvider peerStatusProvider, final File persistenceFile) { public PeerSelector(final PeerStatusProvider peerStatusProvider, final PeerPersistence peerPersistence) {
this.peerStatusProvider = peerStatusProvider; this.peerStatusProvider = peerStatusProvider;
this.persistenceFile = persistenceFile; this.peerPersistence = peerPersistence;
Set<PeerStatus> recoveredStatuses;
if (persistenceFile != null && persistenceFile.exists()) {
try { try {
recoveredStatuses = recoverPersistedPeerStatuses(persistenceFile); PeerStatusCache restoredPeerStatusCache = null;
this.peerStatusCache = new PeerStatusCache(recoveredStatuses, persistenceFile.lastModified()); if (peerPersistence != null) {
restoredPeerStatusCache = peerPersistence.restore();
if (restoredPeerStatusCache != null) {
final SiteToSiteTransportProtocol currentProtocol = peerStatusProvider.getTransportProtocol();
final SiteToSiteTransportProtocol cachedProtocol = restoredPeerStatusCache.getTransportProtocol();
if (!currentProtocol.equals(cachedProtocol)) {
logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}",
peerPersistence.getClass().getSimpleName(), cachedProtocol, currentProtocol);
restoredPeerStatusCache = null;
}
}
}
this.peerStatusCache = restoredPeerStatusCache;
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file",
} peerPersistence.getClass().getSimpleName(), ioe);
} else {
peerStatusCache = null;
} }
} }
private void persistPeerStatuses(final Set<PeerStatus> statuses) { private void persistPeerStatuses() {
if (persistenceFile == null) { try {
return; peerPersistence.save(peerStatusCache);
}
try (final OutputStream fos = new FileOutputStream(persistenceFile);
final OutputStream out = new BufferedOutputStream(fos)) {
for (final PeerStatus status : statuses) {
final PeerDescription description = status.getPeerDescription();
final String line = description.getHostname() + ":" + description.getPort() + ":" + description.isSecure() + ":" + status.isQueryForPeers() + "\n";
out.write(line.getBytes(StandardCharsets.UTF_8));
}
} catch (final IOException e) { } catch (final IOException e) {
error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted and peer's NCM is down," + error(logger, eventReporter, "Failed to persist list of Peers due to {}; if restarted" +
" may be unable to transfer data until communications with NCM are restored", e.toString()); " and the nodes specified at the RPG are down," +
" may be unable to transfer data until communications with those nodes are restored", e.toString());
logger.error("", e); logger.error("", e);
} }
} }
private static Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException {
if (!file.exists()) {
return null;
}
final Set<PeerStatus> statuses = new HashSet<>();
try (final InputStream fis = new FileInputStream(file);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
String line;
while ((line = reader.readLine()) != null) {
final String[] splits = line.split(Pattern.quote(":"));
if (splits.length != 3 && splits.length != 4) {
continue;
}
final String hostname = splits[0];
final int port = Integer.parseInt(splits[1]);
final boolean secure = Boolean.parseBoolean(splits[2]);
final boolean supportQueryForPeer = splits.length == 4 && Boolean.parseBoolean(splits[3]);
statuses.add(new PeerStatus(new PeerDescription(hostname, port, secure), 1, supportQueryForPeer));
}
}
return statuses;
}
List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) { List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, final TransferDirection direction) {
final int numDestinations = Math.max(128, statuses.size()); final int numDestinations = Math.max(128, statuses.size());
@ -340,8 +302,8 @@ public class PeerSelector {
try { try {
final Set<PeerStatus> statuses = fetchRemotePeerStatuses(); final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
persistPeerStatuses(statuses); peerStatusCache = new PeerStatusCache(statuses, System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
peerStatusCache = new PeerStatusCache(statuses); persistPeerStatuses();
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
} catch (Exception e) { } catch (Exception e) {
warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage()); warn(logger, eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
@ -57,4 +58,10 @@ public interface PeerStatusProvider {
*/ */
Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException; Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException;
/**
* Returns the transport protocol being used.
* @return the transport protocol
*/
SiteToSiteTransportProtocol getTransportProtocol();
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.remote.client; package org.apache.nifi.remote.client;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
@ -67,7 +68,7 @@ import java.util.concurrent.TimeUnit;
* interaction with the remote instance takes place. After data has been * interaction with the remote instance takes place. After data has been
* exchanged or it is determined that no data is available, the Transaction can * exchanged or it is determined that no data is available, the Transaction can
* then be canceled (via the {@link Transaction#cancel(String)} method) or can * then be canceled (via the {@link Transaction#cancel(String)} method) or can
* be completed (via the {@link Transaction#complete(boolean)} method). * be completed (via the {@link Transaction#complete()} method).
* </p> * </p>
* *
* <p> * <p>
@ -164,6 +165,7 @@ public interface SiteToSiteClient extends Closeable {
private KeystoreType truststoreType; private KeystoreType truststoreType;
private EventReporter eventReporter = EventReporter.NO_OP; private EventReporter eventReporter = EventReporter.NO_OP;
private File peerPersistenceFile; private File peerPersistenceFile;
private StateManager stateManager;
private boolean useCompression; private boolean useCompression;
private String portName; private String portName;
private String portIdentifier; private String portIdentifier;
@ -482,8 +484,8 @@ public interface SiteToSiteClient extends Closeable {
/** /**
* Specifies a file that the client can write to in order to persist the * Specifies a file that the client can write to in order to persist the
* list of nodes in the remote cluster and recover the list of nodes * list of nodes in the remote cluster and recover the list of nodes
* upon restart. This allows the client to function if the remote * upon restart. This allows the client to function if the remote nodes
* Cluster Manager is unavailable, even after a restart of the client * specified by the urls are unavailable, even after a restart of the client
* software. If not specified, the list of nodes will not be persisted * software. If not specified, the list of nodes will not be persisted
* and a failure of the Cluster Manager will result in not being able to * and a failure of the Cluster Manager will result in not being able to
* communicate with the remote instance if a new client is created. * communicate with the remote instance if a new client is created.
@ -496,6 +498,32 @@ public interface SiteToSiteClient extends Closeable {
return this; return this;
} }
/**
* <p>Specifies StateManager that the client can persist the
* list of nodes in the remote cluster and recover the list of nodes
* upon restart. This allows the client to function if the remote nodes
* specified by the urls are unavailable, even after a restart of the client
* software. If not specified, the list of nodes will not be persisted
* and a failure of the Cluster Manager will result in not being able to
* communicate with the remote instance if a new client is created.</p>
* <p>Using a StateManager is preferable over using a File to persist the list of nodes
* if the SiteToSiteClient is used by a NiFi component having access to a NiFi context.
* So that the list of nodes can be persisted in the same manner with other stateful information.</p>
* <p>Since StateManager is not serializable, the specified StateManager
* will not be serialized, and a de-serialized SiteToSiteClientConfig
* instance will not have StateManager even if the original config has one.
* Use {@link #peerPersistenceFile(File)} instead
* if the same SiteToSiteClientConfig needs to be distributed among multiple
* clients via serialization, and also persistent connectivity is required
* in case of having no available remote node specified by the urls when a client restarts.</p>
* @param stateManager state manager
* @return the builder
*/
public Builder stateManager(final StateManager stateManager) {
this.stateManager = stateManager;
return this;
}
/** /**
* Specifies whether or not data should be compressed before being * Specifies whether or not data should be compressed before being
* transferred to or from the remote instance. * transferred to or from the remote instance.
@ -748,6 +776,7 @@ public interface SiteToSiteClient extends Closeable {
private final KeystoreType truststoreType; private final KeystoreType truststoreType;
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final File peerPersistenceFile; private final File peerPersistenceFile;
private final transient StateManager stateManager;
private final boolean useCompression; private final boolean useCompression;
private final SiteToSiteTransportProtocol transportProtocol; private final SiteToSiteTransportProtocol transportProtocol;
private final String portName; private final String portName;
@ -773,6 +802,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = null; this.truststoreType = null;
this.eventReporter = null; this.eventReporter = null;
this.peerPersistenceFile = null; this.peerPersistenceFile = null;
this.stateManager = null;
this.useCompression = false; this.useCompression = false;
this.portName = null; this.portName = null;
this.portIdentifier = null; this.portIdentifier = null;
@ -801,6 +831,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = builder.truststoreType; this.truststoreType = builder.truststoreType;
this.eventReporter = builder.eventReporter; this.eventReporter = builder.eventReporter;
this.peerPersistenceFile = builder.peerPersistenceFile; this.peerPersistenceFile = builder.peerPersistenceFile;
this.stateManager = builder.stateManager;
this.useCompression = builder.useCompression; this.useCompression = builder.useCompression;
this.portName = builder.portName; this.portName = builder.portName;
this.portIdentifier = builder.portIdentifier; this.portIdentifier = builder.portIdentifier;
@ -921,6 +952,23 @@ public interface SiteToSiteClient extends Closeable {
return peerPersistenceFile; return peerPersistenceFile;
} }
@Override
public StateManager getStateManager() {
return stateManager;
}
@Override
public PeerPersistence getPeerPersistence() {
if (stateManager != null) {
return new StatePeerPersistence(stateManager);
} else if (peerPersistenceFile != null) {
return new FilePeerPersistence(peerPersistenceFile);
}
return null;
}
@Override @Override
public EventReporter getEventReporter() { public EventReporter getEventReporter() {
return eventReporter; return eventReporter;

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@ -110,6 +111,16 @@ public interface SiteToSiteClientConfig extends Serializable {
*/ */
File getPeerPersistenceFile(); File getPeerPersistenceFile();
/**
* @return the StateManager to be used for persisting the nodes of a remote
*/
StateManager getStateManager();
/**
* @return a PeerPersistence implementation based on configured persistent target
*/
PeerPersistence getPeerPersistence();
/** /**
* @return a boolean indicating whether or not compression will be used to * @return a boolean indicating whether or not compression will be used to
* transfer data to and from the remote instance * transfer data to and from the remote instance

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.remote.util.PeerStatusCache;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
public class StatePeerPersistence extends AbstractPeerPersistence {
static final String STATE_KEY_PEERS = "peers";
static final String STATE_KEY_TRANSPORT_PROTOCOL = "protocol";
static final String STATE_KEY_PEERS_TIMESTAMP = "peers.ts";
private final StateManager stateManager;
public StatePeerPersistence(StateManager stateManager) {
this.stateManager = stateManager;
}
@Override
public void save(final PeerStatusCache peerStatusCache) throws IOException {
final StateMap state = stateManager.getState(Scope.LOCAL);
final Map<String, String> stateMap = state.toMap();
final Map<String, String> updatedStateMap = new HashMap<>(stateMap);
final StringBuilder peers = new StringBuilder();
write(peerStatusCache, peers::append);
updatedStateMap.put(STATE_KEY_PEERS, peers.toString());
updatedStateMap.put(STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
stateManager.setState(updatedStateMap, Scope.LOCAL);
}
@Override
public PeerStatusCache restore() throws IOException {
final StateMap state = stateManager.getState(Scope.LOCAL);
final String storedPeers = state.get(STATE_KEY_PEERS);
if (storedPeers != null && !storedPeers.isEmpty()) {
try (final BufferedReader reader = new BufferedReader(new StringReader(storedPeers))) {
return restorePeerStatuses(reader, Long.parseLong(state.get(STATE_KEY_PEERS_TIMESTAMP)));
}
}
return null;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpClientTransaction; import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.remote.PeerDTO; import org.apache.nifi.web.api.dto.remote.PeerDTO;
@ -64,7 +65,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
public HttpClient(final SiteToSiteClientConfig config) { public HttpClient(final SiteToSiteClientConfig config) {
super(config); super(config);
peerSelector = new PeerSelector(this, config.getPeerPersistenceFile()); peerSelector = new PeerSelector(this, config.getPeerPersistence());
peerSelector.setEventReporter(config.getEventReporter()); peerSelector.setEventReporter(config.getEventReporter());
taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@ -246,4 +247,9 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
transaction.getCommunicant().getCommunicationsSession().interrupt(); transaction.getCommunicant().getCommunicationsSession().interrupt();
} }
} }
@Override
public SiteToSiteTransportProtocol getTransportProtocol() {
return SiteToSiteTransportProtocol.HTTP;
}
} }

View File

@ -23,6 +23,7 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator; import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerPersistence;
import org.apache.nifi.remote.client.PeerSelector; import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider; import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider; import org.apache.nifi.remote.client.SiteInfoProvider;
@ -36,6 +37,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException; import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,7 +46,6 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -91,7 +92,8 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private final InetAddress localAddress; private final InetAddress localAddress;
public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider, final SSLContext sslContext, final EventReporter eventReporter,
final PeerPersistence peerPersistence, final SiteInfoProvider siteInfoProvider,
final InetAddress localAddress) { final InetAddress localAddress) {
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
@ -104,7 +106,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
this.siteInfoProvider = siteInfoProvider; this.siteInfoProvider = siteInfoProvider;
peerSelector = new PeerSelector(this, persistenceFile); peerSelector = new PeerSelector(this, peerPersistence);
peerSelector.setEventReporter(eventReporter); peerSelector.setEventReporter(eventReporter);
// Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused
@ -563,5 +565,8 @@ public class EndpointConnectionPool implements PeerStatusProvider {
} }
} }
@Override
public SiteToSiteTransportProtocol getTransportProtocol() {
return SiteToSiteTransportProtocol.RAW;
}
} }

View File

@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
createRemoteDestination(config.getPortIdentifier(), config.getPortName()), createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
commsTimeout, commsTimeout,
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(), config.getSslContext(), config.getEventReporter(), config.getPeerPersistence(),
siteInfoProvider, config.getLocalAddress() siteInfoProvider, config.getLocalAddress()
); );

View File

@ -19,19 +19,19 @@ package org.apache.nifi.remote.util;
import java.util.Set; import java.util.Set;
import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
public class PeerStatusCache { public class PeerStatusCache {
private final Set<PeerStatus> statuses; private final Set<PeerStatus> statuses;
private final long timestamp; private final long timestamp;
private final SiteToSiteTransportProtocol transportProtocol;
public PeerStatusCache(final Set<PeerStatus> statuses) { public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp,
this(statuses, System.currentTimeMillis()); final SiteToSiteTransportProtocol transportProtocol) {
}
public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) {
this.statuses = statuses; this.statuses = statuses;
this.timestamp = timestamp; this.timestamp = timestamp;
this.transportProtocol = transportProtocol;
} }
public Set<PeerStatus> getStatuses() { public Set<PeerStatus> getStatuses() {
@ -41,4 +41,8 @@ public class PeerStatusCache {
public long getTimestamp() { public long getTimestamp() {
return timestamp; return timestamp;
} }
public SiteToSiteTransportProtocol getTransportProtocol() {
return transportProtocol;
}
} }

View File

@ -16,21 +16,34 @@
*/ */
package org.apache.nifi.remote.client; package org.apache.nifi.remote.client;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.groupingBy;
@ -40,8 +53,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
public class TestPeerSelector { public class TestPeerSelector {
@ -255,4 +271,116 @@ public class TestPeerSelector {
assert(!peers.isEmpty()); assert(!peers.isEmpty());
assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription()); assertEquals("Node1 should be returned since node 1 is the only available node.", bootstrapNode, peers.get(0).getPeerDescription());
} }
@Test
public void testPeerStatusManagedCache() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final StateManager stateManager = Mockito.mock(StateManager.class);
final StateMap stateMap = Mockito.mock(StateMap.class);
final Map<String, String> state = new HashMap<>();
state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
doAnswer(invocation -> {
final Map<String, String> updatedMap = invocation.getArgument(0);
state.clear();
state.putAll(updatedMap);
return null;
}).when(stateManager).setState(any(), eq(Scope.LOCAL));
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
// PeerSelector should restore peer statuses from managed cache.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Restored peers should be used",
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
// If the stored state is too old, PeerSelector refreshes peers.
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis() - 120_000));
peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Peers should be refreshed",
"RAW\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
}
@Test
public void testPeerStatusManagedCacheDifferentProtocol() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final StateManager stateManager = Mockito.mock(StateManager.class);
final StateMap stateMap = Mockito.mock(StateMap.class);
final Map<String, String> state = new HashMap<>();
state.put(StatePeerPersistence.STATE_KEY_PEERS, "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
when(stateMap.get(anyString())).thenAnswer(invocation -> state.get(invocation.getArgument(0)));
doAnswer(invocation -> {
final Map<String, String> updatedMap = invocation.getArgument(0);
state.clear();
state.putAll(updatedMap);
return null;
}).when(stateManager).setState(any(), eq(Scope.LOCAL));
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
// PeerSelector should NOT restore peer statuses from managed cache because protocol changed.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new StatePeerPersistence(stateManager));
peerSelector.refreshPeers();
assertEquals("Restored peers should NOT be used",
"HTTP\nnifi0:8081:false:true\n", stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
}
@Test
public void testPeerStatusFileCache() throws Exception {
final PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
final PeerDescription bootstrapPeer = new PeerDescription("nifi0", 8081, false);
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
.thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1, true)));
final File file = File.createTempFile("peers", "txt");
file.deleteOnExit();
try (final FileOutputStream fos = new FileOutputStream(file)) {
fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
}
final Supplier<String> readFile = () -> {
try (final FileInputStream fin = new FileInputStream(file);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fin))) {
final StringBuilder lines = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
lines.append(line).append("\n");
}
return lines.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
// PeerSelector should restore peer statuses from managed cache.
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
peerSelector.refreshPeers();
assertEquals("Restored peers should be used",
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n", readFile.get());
// If the stored state is too old, PeerSelector refreshes peers.
file.setLastModified(System.currentTimeMillis() - 120_000);
peerSelector = new PeerSelector(peerStatusProvider, new FilePeerPersistence(file));
peerSelector.refreshPeers();
assertEquals("Peers should be refreshed",
"RAW\nnifi0:8081:false:true\n", readFile.get());
}
} }

View File

@ -19,19 +19,21 @@ package org.apache.nifi.remote.client.socket;
import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.io.Output;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
@ -136,6 +138,40 @@ public class TestSiteToSiteClient {
} }
} }
@Test
public void testSerializationWithStateManager() {
final StateManager stateManager = Mockito.mock(StateManager.class);
final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("input")
.stateManager(stateManager)
.buildConfig();
final Kryo kryo = new Kryo();
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final Output output = new Output(out);
try {
kryo.writeObject(output, clientConfig);
} finally {
output.close();
}
final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
final Input input = new Input(in);
try {
SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls());
// Serialization works, but the state manager is not serialized.
Assert.assertNotNull(clientConfig.getStateManager());
Assert.assertNull(clientConfig2.getStateManager());
} finally {
input.close();
}
}
@Test @Test
public void testGetUrlBackwardCompatibility() { public void testGetUrlBackwardCompatibility() {
final Set<String> urls = new LinkedHashSet<>(); final Set<String> urls = new LinkedHashSet<>();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable; import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Positionable; import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
@ -240,4 +241,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
void verifyCanStopTransmitting(); void verifyCanStopTransmitting();
void verifyCanUpdate(); void verifyCanUpdate();
StateManager getStateManager();
} }

View File

@ -211,7 +211,9 @@ public class StandardFlowManager implements FlowManager {
} }
public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) { public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
return new StandardRemoteProcessGroup(requireNonNull(id), uris, null, processScheduler, bulletinRepository, sslContext, nifiProperties); return new StandardRemoteProcessGroup(requireNonNull(id), uris, null,
processScheduler, bulletinRepository, sslContext, nifiProperties,
flowController.getStateManagerProvider().getStateManager(id));
} }
public void setRootGroup(final ProcessGroup rootGroup) { public void setRootGroup(final ProcessGroup rootGroup) {

View File

@ -853,6 +853,14 @@ public final class StandardProcessGroup implements ProcessGroup {
remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved); remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved); remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
scheduler.submitFrameworkTask(new Runnable() {
@Override
public void run() {
stateManagerProvider.onComponentRemoved(remoteGroup.getIdentifier());
}
});
remoteGroups.remove(remoteGroupId); remoteGroups.remove(remoteGroupId);
LOG.info("{} removed from flow", remoteProcessGroup); LOG.info("{} removed from flow", remoteProcessGroup);
} finally { } finally {

View File

@ -18,7 +18,6 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
@ -54,6 +53,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Port;
@ -100,6 +100,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ProcessScheduler scheduler; private final ProcessScheduler scheduler;
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final NiFiProperties nifiProperties; private final NiFiProperties nifiProperties;
private final StateManager stateManager;
private final long remoteContentsCacheExpiration; private final long remoteContentsCacheExpiration;
private volatile boolean initialized = false; private volatile boolean initialized = false;
@ -146,8 +147,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor; private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler, public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, final ProcessScheduler processScheduler,
final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties) { final BulletinRepository bulletinRepository, final SSLContext sslContext, final NiFiProperties nifiProperties,
final StateManager stateManager) {
this.nifiProperties = nifiProperties; this.nifiProperties = nifiProperties;
this.stateManager = stateManager;
this.id = requireNonNull(id); this.id = requireNonNull(id);
this.targetUris = targetUris; this.targetUris = targetUris;
@ -234,11 +237,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override @Override
public void onRemove() { public void onRemove() {
backgroundThreadExecutor.shutdown(); backgroundThreadExecutor.shutdown();
final File file = getPeerPersistenceFile();
if (file.exists() && !file.delete()) {
logger.warn("Failed to remove {}. This file should be removed manually.", file);
}
} }
@Override @Override
@ -1366,11 +1364,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} }
} }
private File getPeerPersistenceFile() {
final File stateDir = nifiProperties.getPersistentStateDirectory();
return new File(stateDir, getIdentifier() + ".peers");
}
@Override @Override
public Optional<String> getVersionedComponentId() { public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get()); return Optional.ofNullable(versionedComponentId.get());
@ -1393,4 +1386,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} }
} }
} }
@Override
public StateManager getStateManager() {
return stateManager;
}
} }

View File

@ -42,7 +42,6 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException; import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.remote.util.StandardDataPacket;
@ -56,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -125,11 +123,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.targetId = targetId; this.targetId = targetId;
} }
private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties, final SiteToSiteTransportProtocol transportProtocol) {
final File stateDir = nifiProperties.getPersistentStateDirectory();
return new File(stateDir, String.format("%s_%s.peers", portId, transportProtocol.name()));
}
@Override @Override
public boolean isTargetRunning() { public boolean isTargetRunning() {
return targetRunning.get(); return targetRunning.get();
@ -181,7 +174,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.sslContext(sslContext) .sslContext(sslContext)
.useCompression(isUseCompression()) .useCompression(isUseCompression())
.eventReporter(remoteGroup.getEventReporter()) .eventReporter(remoteGroup.getEventReporter())
.peerPersistenceFile(getPeerPersistenceFile(getIdentifier(), nifiProperties, remoteGroup.getTransportProtocol())) .stateManager(remoteGroup.getStateManager())
.nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS) .nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS)
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol()) .transportProtocol(remoteGroup.getTransportProtocol())

View File

@ -1675,6 +1675,14 @@ public interface NiFiServiceFacade {
*/ */
void clearReportingTaskState(String reportingTaskId); void clearReportingTaskState(String reportingTaskId);
/**
* Gets the state for the specified RemoteProcessGroup.
*
* @param remoteProcessGroupId the RemoteProcessGroup id
* @return the component state
*/
ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
// ---------------------------------------- // ----------------------------------------
// Label methods // Label methods

View File

@ -1536,6 +1536,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
reportingTaskDAO.clearState(reportingTaskId); reportingTaskDAO.clearState(reportingTaskId);
} }
@Override
public ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId) {
final StateMap clusterState = isClustered() ? remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.CLUSTER) : null;
final StateMap localState = remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.LOCAL);
// processor will be non null as it was already found when getting the state
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
return dtoFactory.createComponentStateDTO(remoteProcessGroupId, remoteProcessGroup.getClass(), localState, clusterState);
}
@Override @Override
public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
final Connection connection = connectionDAO.getConnection(connectionId); final Connection connection = connectionDAO.getConnection(connectionId);

View File

@ -30,10 +30,12 @@ import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision; import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity; import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
@ -827,6 +829,61 @@ public class RemoteProcessGroupResource extends ApplicationResource {
); );
} }
/**
* Gets the state for a RemoteProcessGroup.
*
* @param id The id of the RemoteProcessGroup
* @return a componentStateEntity
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}/state")
@ApiOperation(
value = "Gets the state for a RemoteProcessGroup",
response = ComponentStateEntity.class,
authorizations = {
@Authorization(value = "Write - /remote-process-groups/{uuid}")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getState(
@ApiParam(
value = "The processor id.",
required = true
)
@PathParam("id") final String id) throws InterruptedException {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getRemoteProcessGroup(id);
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
// get the component state
final ComponentStateDTO state = serviceFacade.getRemoteProcessGroupState(id);
// generate the response entity
final ComponentStateEntity entity = new ComponentStateEntity();
entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
}
private RemoteProcessGroupDTO createDTOWithDesiredRunStatus(final String id, final RemotePortRunStatusEntity entity) { private RemoteProcessGroupDTO createDTOWithDesiredRunStatus(final String id, final RemotePortRunStatusEntity entity) {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
dto.setId(id); dto.setId(id);

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.RemoteProcessGroup;
public interface ComponentStateDAO { public interface ComponentStateDAO {
@ -71,4 +72,13 @@ public interface ComponentStateDAO {
* @param reportingTask reporting task * @param reportingTask reporting task
*/ */
void clearState(ReportingTaskNode reportingTask); void clearState(ReportingTaskNode reportingTask);
/**
* Gets the state map for the specified RemoteProcessGroup.
*
* @param remoteProcessGroup RemoteProcessGroup
* @param scope scope
* @return state map
*/
StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope);
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.web.dao; package org.apache.nifi.web.dao;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@ -120,4 +122,12 @@ public interface RemoteProcessGroupDAO {
* @param remoteProcessGroupId The remote process group id * @param remoteProcessGroupId The remote process group id
*/ */
void deleteRemoteProcessGroup(String remoteProcessGroupId); void deleteRemoteProcessGroup(String remoteProcessGroupId);
/**
* Gets the specified RemoteProcessGroupId.
*
* @param remoteProcessGroupId RemoteProcessGroupId id
* @return state map
*/
StateMap getState(String remoteProcessGroupId, Scope scope);
} }

View File

@ -23,6 +23,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.dao.ComponentStateDAO; import org.apache.nifi.web.dao.ComponentStateDAO;
@ -90,6 +91,11 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
clearState(reportingTask.getIdentifier()); clearState(reportingTask.getIdentifier());
} }
@Override
public StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope) {
return getState(remoteProcessGroup.getIdentifier(), scope);
}
/* setters */ /* setters */
public void setStateManagerProvider(StateManagerProvider stateManagerProvider) { public void setStateManagerProvider(StateManagerProvider stateManagerProvider) {

View File

@ -17,6 +17,8 @@
package org.apache.nifi.web.dao.impl; package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.exception.ValidationException;
@ -31,6 +33,7 @@ import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import java.util.ArrayList; import java.util.ArrayList;
@ -43,6 +46,7 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO { public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
private FlowController flowController; private FlowController flowController;
private ComponentStateDAO componentStateDAO;
private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) { private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
@ -465,7 +469,17 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup); remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
} }
@Override
public StateMap getState(String remoteProcessGroupId, Scope scope) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
return componentStateDAO.getState(remoteProcessGroup, scope);
}
public void setFlowController(FlowController flowController) { public void setFlowController(FlowController flowController) {
this.flowController = flowController; this.flowController = flowController;
} }
public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
this.componentStateDAO = componentStateDAO;
}
} }

View File

@ -82,6 +82,7 @@
</bean> </bean>
<bean id="remoteProcessGroupDAO" class="org.apache.nifi.web.dao.impl.StandardRemoteProcessGroupDAO"> <bean id="remoteProcessGroupDAO" class="org.apache.nifi.web.dao.impl.StandardRemoteProcessGroupDAO">
<property name="flowController" ref="flowController"/> <property name="flowController" ref="flowController"/>
<property name="componentStateDAO" ref="componentStateDAO"/>
</bean> </bean>
<bean id="labelDAO" class="org.apache.nifi.web.dao.impl.StandardLabelDAO"> <bean id="labelDAO" class="org.apache.nifi.web.dao.impl.StandardLabelDAO">
<property name="flowController" ref="flowController"/> <property name="flowController" ref="flowController"/>