mirror of https://github.com/apache/nifi.git
NIFI-11746: Refactored instance caching for system tests
Also changed some properties around so that we can have both a clustered and a standalone instance running at the same time. This closes #7427 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
40014486a4
commit
5d3443c191
|
@ -134,12 +134,6 @@ public class AggregateNiFiInstance implements NiFiInstance {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFlowXmlGz(final File flowXmlGz) throws IOException {
|
||||
for (final NiFiInstance instance : instances) {
|
||||
instance.setFlowXmlGz(flowXmlGz);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperties(final Map<String, String> properties) throws IOException {
|
||||
|
@ -156,4 +150,15 @@ public class AggregateNiFiInstance implements NiFiInstance {
|
|||
instance.quarantineTroubleshootingInfo(nodeDirectory, cause);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccessible() {
|
||||
for (final NiFiInstance instance : instances) {
|
||||
if (!instance.isAccessible()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.File;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class InstanceConfiguration {
|
||||
private final File bootstrapConfigFile;
|
||||
|
@ -68,6 +69,27 @@ public class InstanceConfiguration {
|
|||
return nifiPropertiesOverrides;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final InstanceConfiguration that = (InstanceConfiguration) other;
|
||||
return autoStart == that.autoStart && unpackPythonExtensions == that.unpackPythonExtensions && Objects.equals(bootstrapConfigFile, that.bootstrapConfigFile)
|
||||
&& Objects.equals(instanceDirectory, that.instanceDirectory) && Objects.equals(flowXmlGz, that.flowXmlGz)
|
||||
&& Objects.equals(stateDirectory, that.stateDirectory) && Objects.equals(nifiPropertiesOverrides, that.nifiPropertiesOverrides);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(bootstrapConfigFile, instanceDirectory, flowXmlGz, stateDirectory, autoStart, nifiPropertiesOverrides, unpackPythonExtensions);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private File bootstrapConfigFile;
|
||||
private File instanceDirectory;
|
||||
|
|
|
@ -1289,9 +1289,9 @@ public class NiFiClientUtil {
|
|||
return requestEntity;
|
||||
}
|
||||
|
||||
public RemoteProcessGroupEntity createRPG(final String parentGroupId, final SiteToSiteTransportProtocol transportProtocol) throws NiFiClientException, IOException {
|
||||
public RemoteProcessGroupEntity createRPG(final String parentGroupId, final int httpPort, final SiteToSiteTransportProtocol transportProtocol) throws NiFiClientException, IOException {
|
||||
final RemoteProcessGroupDTO component = new RemoteProcessGroupDTO();
|
||||
component.setTargetUri("http://localhost:5671");
|
||||
component.setTargetUri("http://localhost:" + httpPort);
|
||||
component.setName(component.getTargetUri());
|
||||
component.setTransportProtocol(transportProtocol.name());
|
||||
|
||||
|
@ -1300,7 +1300,7 @@ public class NiFiClientUtil {
|
|||
entity.setRevision(createNewRevision());
|
||||
|
||||
final RemoteProcessGroupEntity rpg = nifiClient.getRemoteProcessGroupClient().createRemoteProcessGroup(parentGroupId, entity);
|
||||
logger.info("Created Remote Process Group [id={}, protocol={}, url={}, parentGroupId={}] for Test [{}]", rpg.getId(), transportProtocol, parentGroupId, testName);
|
||||
logger.info("Created Remote Process Group [id={}, protocol={}, url={}, parentGroupId={}] for Test [{}]", rpg.getId(), transportProtocol, component.getTargetUri(), parentGroupId, testName);
|
||||
return rpg;
|
||||
}
|
||||
|
||||
|
|
|
@ -97,12 +97,6 @@ public interface NiFiInstance {
|
|||
*/
|
||||
void setProperty(String propertyName, String propertyValue) throws IOException;
|
||||
|
||||
/**
|
||||
* Change the value of the flow that should be loaded on startup
|
||||
* @param flowXmlGz the file that contains the flow that should be loaded on startup
|
||||
*/
|
||||
void setFlowXmlGz(final File flowXmlGz) throws IOException;
|
||||
|
||||
/**
|
||||
* Change the values of the given properties in nifi.properties. Any property that is not present in the given map will remain unchanged. If the node is already running, this change will not take
|
||||
* effect until the instance is stopped and started again.
|
||||
|
@ -118,4 +112,11 @@ public interface NiFiInstance {
|
|||
* @throws IOException if unable to write the information
|
||||
*/
|
||||
void quarantineTroubleshootingInfo(final File directory, final Throwable failureCause) throws IOException;
|
||||
|
||||
/**
|
||||
* Checks if able to communicate with the instance
|
||||
*
|
||||
* @return <code>true</code> if the instance is started and the REST API can be accessed, false otherwise
|
||||
*/
|
||||
boolean isAccessible();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.tests.system;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
|
||||
public class NiFiInstanceCache {
|
||||
private static final Logger logger = LoggerFactory.getLogger(NiFiInstanceCache.class);
|
||||
|
||||
private final CachedInstance standaloneInstance = new CachedInstance();
|
||||
private final CachedInstance clusteredInstance = new CachedInstance();
|
||||
|
||||
public NiFiInstance createInstance(final NiFiInstanceFactory instanceFactory, final String testName, final boolean allowReuse) throws IOException {
|
||||
final CachedInstance cachedInstance = instanceFactory.isClusteredInstance() ? clusteredInstance : standaloneInstance;
|
||||
|
||||
if (!allowReuse) {
|
||||
logger.info("Will create new NiFi instance for {} because the test does not allow reuse of the created instance", testName);
|
||||
cachedInstance.shutdown();
|
||||
cachedInstance.setFactory(null, testName);
|
||||
|
||||
return instanceFactory.createInstance();
|
||||
}
|
||||
|
||||
if (cachedInstance.getFactory() == null) {
|
||||
cachedInstance.setFactory(instanceFactory, testName);
|
||||
return cachedInstance.getOrCreateInstance();
|
||||
}
|
||||
|
||||
if (Objects.equals(cachedInstance.getFactory(), instanceFactory)) {
|
||||
logger.info("Will use cached Nifi instance that was created for test {} in order to run test {} if there is one", cachedInstance.getTestName(), testName);
|
||||
return cachedInstance.getOrCreateInstance();
|
||||
}
|
||||
|
||||
logger.info("Cached NiFi Instance created for test {} differs in configuration from what is necessary for test {}. Will shutdown cached instance.", cachedInstance.getTestName(), testName);
|
||||
cachedInstance.shutdown();
|
||||
cachedInstance.setFactory(instanceFactory, testName);
|
||||
return cachedInstance.getOrCreateInstance();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
standaloneInstance.shutdown();
|
||||
clusteredInstance.shutdown();
|
||||
}
|
||||
|
||||
public void stopOrRecycle(final NiFiInstance nifiInstance) {
|
||||
if (nifiInstance instanceof CachedNiFiInstance) {
|
||||
((CachedNiFiInstance) nifiInstance).ensureProperState();
|
||||
return;
|
||||
}
|
||||
|
||||
nifiInstance.stop();
|
||||
}
|
||||
|
||||
public void poison(final NiFiInstance nifiInstance) {
|
||||
if (nifiInstance == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
nifiInstance.stop();
|
||||
|
||||
standaloneInstance.poison(nifiInstance);
|
||||
clusteredInstance.poison(nifiInstance);
|
||||
}
|
||||
|
||||
private static class CachedInstance {
|
||||
private NiFiInstanceFactory factory;
|
||||
private CachedNiFiInstance instance;
|
||||
private String testName;
|
||||
|
||||
public void setFactory(final NiFiInstanceFactory factory, final String testName) {
|
||||
this.factory = factory;
|
||||
this.testName = testName;
|
||||
}
|
||||
|
||||
public String getTestName() {
|
||||
return testName;
|
||||
}
|
||||
|
||||
public NiFiInstanceFactory getFactory() {
|
||||
return factory;
|
||||
}
|
||||
|
||||
public void poison(final NiFiInstance toPoison) {
|
||||
final NiFiInstance rawInstance = this.instance.getRawInstance();
|
||||
|
||||
if (Objects.equals(rawInstance, toPoison)) {
|
||||
logger.info("{} has been poisoned. Will not reuse this NiFi instance", rawInstance);
|
||||
this.instance = null;
|
||||
}
|
||||
}
|
||||
|
||||
public NiFiInstance getOrCreateInstance() throws IOException {
|
||||
if (instance != null) {
|
||||
return instance;
|
||||
}
|
||||
|
||||
final NiFiInstance rawInstance = factory.createInstance();
|
||||
this.instance = new CachedNiFiInstance(rawInstance);
|
||||
return this.instance;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (instance != null) {
|
||||
instance.stop();
|
||||
}
|
||||
|
||||
instance = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class CachedNiFiInstance implements NiFiInstance {
|
||||
private final NiFiInstance rawInstance;
|
||||
private boolean envCreated = false;
|
||||
private boolean started = false;
|
||||
private boolean requireRestart;
|
||||
|
||||
public CachedNiFiInstance(final NiFiInstance rawInstance) {
|
||||
this.rawInstance = rawInstance;
|
||||
}
|
||||
|
||||
public NiFiInstance getRawInstance() {
|
||||
return this.rawInstance;
|
||||
}
|
||||
|
||||
public void ensureProperState() {
|
||||
if (rawInstance.getNumberOfNodes() == 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!rawInstance.isAccessible()) {
|
||||
logger.info("NiFi Instance {} is not accessible so will stop the instance to ensure proper state for the next test", rawInstance);
|
||||
stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccessible() {
|
||||
return rawInstance.isAccessible();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createEnvironment() throws IOException {
|
||||
if (envCreated) {
|
||||
return;
|
||||
}
|
||||
|
||||
rawInstance.createEnvironment();
|
||||
envCreated = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(final boolean waitForCompletion) {
|
||||
if (started && requireRestart) {
|
||||
logger.info("Must restart NiFi Instance {} before use", rawInstance);
|
||||
|
||||
rawInstance.stop();
|
||||
started = false;
|
||||
requireRestart = false;
|
||||
}
|
||||
|
||||
if (started) {
|
||||
logger.info("NiFi Instance {} is already started", rawInstance);
|
||||
return;
|
||||
}
|
||||
|
||||
rawInstance.start(waitForCompletion);
|
||||
started = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
logger.info("Stopping NiFi Instance {}", rawInstance);
|
||||
started = false;
|
||||
rawInstance.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClustered() {
|
||||
return rawInstance.isClustered();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfNodes() {
|
||||
return rawInstance.getNumberOfNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfNodes(final boolean includeOnlyAutoStartInstances) {
|
||||
return rawInstance.getNumberOfNodes(includeOnlyAutoStartInstances);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NiFiInstance getNodeInstance(final int nodeIndex) {
|
||||
return rawInstance.getNodeInstance(nodeIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Properties getProperties() throws IOException {
|
||||
return rawInstance.getProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getInstanceDirectory() {
|
||||
return rawInstance.getInstanceDirectory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStart() {
|
||||
return rawInstance.isAutoStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String propertyName, final String propertyValue) throws IOException {
|
||||
rawInstance.setProperty(propertyName, propertyValue);
|
||||
requireRestart = true;
|
||||
|
||||
logger.info("Setting property {} on NiFi Instance {}. This will require that the instance be restarted.", propertyName, rawInstance);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperties(final Map<String, String> properties) throws IOException {
|
||||
rawInstance.setProperties(properties);
|
||||
requireRestart = true;
|
||||
|
||||
logger.info("Setting multiple properties on NiFi Instance {}. This will require that the instance be restarted.", rawInstance);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void quarantineTroubleshootingInfo(final File directory, final Throwable failureCause) throws IOException {
|
||||
rawInstance.quarantineTroubleshootingInfo(directory, failureCause);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,4 +20,6 @@ import java.io.IOException;
|
|||
|
||||
public interface NiFiInstanceFactory {
|
||||
NiFiInstance createInstance() throws IOException;
|
||||
|
||||
boolean isClusteredInstance();
|
||||
}
|
||||
|
|
|
@ -66,8 +66,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
|
||||
public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("X-Request-Replicated", "value");
|
||||
|
||||
public static final int CLIENT_API_PORT = 5671;
|
||||
public static final int CLIENT_API_BASE_PORT = 5670;
|
||||
public static final int CLUSTERED_CLIENT_API_BASE_PORT = 5671;
|
||||
public static final int STANDALONE_CLIENT_API_BASE_PORT = 5670;
|
||||
public static final String NIFI_GROUP_ID = "org.apache.nifi";
|
||||
public static final String TEST_EXTENSIONS_ARTIFACT_ID = "nifi-system-test-extensions-nar";
|
||||
public static final String TEST_PYTHON_EXTENSIONS_ARTIFACT_ID = "python-extensions";
|
||||
|
@ -83,6 +83,11 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
private NiFiClient nifiClient;
|
||||
private NiFiClientUtil clientUtil;
|
||||
private static final AtomicReference<NiFiInstance> nifiRef = new AtomicReference<>();
|
||||
private static final NiFiInstanceCache instanceCache = new NiFiInstanceCache();
|
||||
|
||||
static {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> instanceCache.shutdown()));
|
||||
}
|
||||
|
||||
private TestInfo testInfo;
|
||||
|
||||
|
@ -90,25 +95,31 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
public void setup(final TestInfo testInfo) throws IOException {
|
||||
this.testInfo = testInfo;
|
||||
final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test Class>");
|
||||
logger.info("Beginning Test {}:{}", testClassName, testInfo.getDisplayName());
|
||||
final String friendlyTestName = testClassName + ":" + testInfo.getDisplayName();
|
||||
logger.info("Beginning Test {}", friendlyTestName);
|
||||
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
|
||||
final NiFiInstanceFactory instanceFactory = getInstanceFactory();
|
||||
final NiFiInstance instance = instanceCache.createInstance(instanceFactory, friendlyTestName, isAllowFactoryReuse());
|
||||
nifiRef.set(instance);
|
||||
|
||||
instance.createEnvironment();
|
||||
instance.start();
|
||||
|
||||
setupClient();
|
||||
|
||||
if (nifiRef.get() == null) {
|
||||
final NiFiInstance instance = getInstanceFactory().createInstance();
|
||||
nifiRef.set(instance);
|
||||
instance.createEnvironment();
|
||||
instance.start();
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
|
||||
if (instance.isClustered()) {
|
||||
waitForAllNodesConnected();
|
||||
}
|
||||
if (instance.isClustered()) {
|
||||
waitForAllNodesConnected();
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isAllowFactoryReuse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected TestInfo getTestInfo() {
|
||||
return testInfo;
|
||||
}
|
||||
|
@ -118,7 +129,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
final NiFiInstance nifi = nifiRef.get();
|
||||
nifiRef.set(null);
|
||||
if (nifi != null) {
|
||||
nifi.stop();
|
||||
instanceCache.stopOrRecycle(nifi);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,12 +148,14 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
}
|
||||
|
||||
if (isDestroyEnvironmentAfterEachTest()) {
|
||||
instanceCache.poison(nifiRef.get());
|
||||
cleanup();
|
||||
} else if (destroyFlowFailure != null) {
|
||||
// If unable to destroy the flow, we need to shutdown the instance and delete the flow and completely recreate the environment.
|
||||
// Otherwise, we will be left in an unknown state for the next test, and that can cause cascading failures that are very difficult
|
||||
// to understand and troubleshoot.
|
||||
logger.info("Because there was a failure when destroying the flow, will completely tear down the environments and start with a clean environment for the next test.");
|
||||
instanceCache.poison(nifiRef.get());
|
||||
cleanup();
|
||||
}
|
||||
|
||||
|
@ -217,6 +230,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
|
||||
final NiFiClient client = getNifiClient();
|
||||
|
||||
int attemptedNodeIndex = 0;
|
||||
final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60);
|
||||
while (true) {
|
||||
int connectedNodeCount = -1;
|
||||
|
@ -230,7 +244,9 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
|
||||
logEverySecond("Waiting for {} nodes to connect but currently only {} nodes are connected", expectedNumberOfNodes, connectedNodeCount);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
logger.error("Failed to determine how many nodes are currently connected", e);
|
||||
final int nodeIndexToAttempt = attemptedNodeIndex++ % expectedNumberOfNodes;
|
||||
setupClient(CLUSTERED_CLIENT_API_BASE_PORT + nodeIndexToAttempt);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > maxTime) {
|
||||
|
@ -247,7 +263,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
}
|
||||
|
||||
protected void switchClientToNode(final int nodeIndex) {
|
||||
setupClient(CLIENT_API_BASE_PORT + nodeIndex);
|
||||
setupClient(CLUSTERED_CLIENT_API_BASE_PORT + nodeIndex - 1);
|
||||
}
|
||||
|
||||
protected void setupClient() {
|
||||
|
@ -276,7 +292,12 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
|
|||
}
|
||||
|
||||
protected int getClientApiPort() {
|
||||
return CLIENT_API_PORT;
|
||||
NiFiInstance nifiInstance = nifiRef.get();
|
||||
if (nifiInstance.getNumberOfNodes() > 1) {
|
||||
return CLUSTERED_CLIENT_API_BASE_PORT;
|
||||
}
|
||||
|
||||
return STANDALONE_CLIENT_API_BASE_PORT;
|
||||
}
|
||||
|
||||
protected NiFiClient getNifiClient() {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.tests.system;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SpawnedClusterNiFiInstanceFactory implements NiFiInstanceFactory {
|
||||
private final List<InstanceConfiguration> instanceConfigs = new ArrayList<>();
|
||||
|
@ -63,4 +64,28 @@ public class SpawnedClusterNiFiInstanceFactory implements NiFiInstanceFactory {
|
|||
|
||||
return new AggregateNiFiInstance(instances);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusteredInstance() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final SpawnedClusterNiFiInstanceFactory that = (SpawnedClusterNiFiInstanceFactory) other;
|
||||
return Objects.equals(instanceConfigs, that.instanceConfigs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(instanceConfigs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.io.PrintWriter;
|
|||
import java.nio.file.Files;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -52,6 +53,29 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
|||
return new RunNiFiInstance(instanceConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusteredInstance() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final SpawnedStandaloneNiFiInstanceFactory that = (SpawnedStandaloneNiFiInstanceFactory) other;
|
||||
return Objects.equals(instanceConfig, that.instanceConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(instanceConfig);
|
||||
}
|
||||
|
||||
private static class RunNiFiInstance implements NiFiInstance {
|
||||
private final File instanceDirectory;
|
||||
|
@ -196,27 +220,32 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFlowXmlGz(final File flowXmlGz) throws IOException {
|
||||
final File destinationConf = new File(instanceDirectory, "conf");
|
||||
final File destinationFlowXmlGz = new File(destinationConf, "flow.xml.gz");
|
||||
destinationFlowXmlGz.delete();
|
||||
Files.copy(flowXmlGz.toPath(), destinationFlowXmlGz.toPath());
|
||||
public boolean isAccessible() {
|
||||
if (runNiFi == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try (final NiFiClient client = createClient()) {
|
||||
client.getFlowClient().getRootGroupId();
|
||||
return true;
|
||||
} catch (final Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForStartup() throws IOException {
|
||||
final NiFiClient client = createClient();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
client.getFlowClient().getRootGroupId();
|
||||
logger.info("NiFi Startup Completed [{}]", instanceDirectory.getName());
|
||||
return;
|
||||
} catch (final Exception e) {
|
||||
try (final NiFiClient client = createClient()) {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (InterruptedException ex) {
|
||||
logger.debug("NiFi Startup sleep interrupted", ex);
|
||||
client.getFlowClient().getRootGroupId();
|
||||
logger.info("NiFi Startup Completed [{}]", instanceDirectory.getName());
|
||||
return;
|
||||
} catch (final Exception e) {
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (InterruptedException ex) {
|
||||
logger.debug("NiFi Startup sleep interrupted", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,11 +25,9 @@ import org.apache.nifi.controller.queue.LoadBalanceCompression;
|
|||
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
|
||||
import org.apache.nifi.controller.service.ControllerServiceState;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.tests.system.InstanceConfiguration;
|
||||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
|
||||
|
@ -84,16 +82,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
new InstanceConfiguration.Builder()
|
||||
.bootstrapConfig("src/test/resources/conf/clustered/node1/bootstrap.conf")
|
||||
.instanceDirectory("target/node1")
|
||||
.build(),
|
||||
new InstanceConfiguration.Builder()
|
||||
.bootstrapConfig("src/test/resources/conf/clustered/node2/bootstrap.conf")
|
||||
.instanceDirectory("target/node2")
|
||||
.build()
|
||||
);
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
|
||||
|
@ -979,7 +968,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
|
||||
private NodeEntity getNodeEntity(final int nodeIndex) throws NiFiClientException, IOException {
|
||||
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
|
||||
final int expectedPort = CLIENT_API_BASE_PORT + nodeIndex;
|
||||
final int expectedPort = CLUSTERED_CLIENT_API_BASE_PORT + nodeIndex - 1;
|
||||
|
||||
for (final NodeDTO nodeDto : clusterEntity.getCluster().getNodes()) {
|
||||
if (nodeDto.getApiPort() == expectedPort) {
|
||||
|
|
|
@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class JoinClusterAdjustStateIT extends NiFiSystemIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
// Create a factory explicitly because we want the second instance not to be auto-started
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
new InstanceConfiguration.Builder()
|
||||
.bootstrapConfig("src/test/resources/conf/clustered/node1/bootstrap.conf")
|
||||
|
@ -48,6 +49,11 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAllowFactoryReuse() {
|
||||
return false; // Do not allow reuse because this test requires that Node 1 be auto-started and Node 2 not be.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessorsStartWhenAble() throws NiFiClientException, IOException, InterruptedException {
|
||||
final ProcessorEntity countProcessor = getClientUtil().createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
|
||||
|
|
|
@ -58,6 +58,12 @@ public class JoinClusterWithMissingConnectionWithData extends NiFiSystemIT {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAllowFactoryReuse() {
|
||||
// Do not allow reuse of the factory because we expect the node to be in a disconnected state at the end.
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailsToJoinWithMissingConnectionThatHasData() throws NiFiClientException, IOException, InterruptedException {
|
||||
// Create the flow
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.clustering;
|
|||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.NodeDTO;
|
||||
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
|
||||
|
@ -29,17 +28,15 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class NodeRestartWithNewNodeIdIT extends NiFiSystemIT {
|
||||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.clustering;
|
|||
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.NodeDTO;
|
||||
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
|
||||
|
@ -37,9 +36,7 @@ public class OffloadIT extends NiFiSystemIT {
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.clustering;
|
|||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.NodeDTO;
|
||||
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
|
||||
|
@ -37,9 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class RestartWithDifferentPort extends NiFiSystemIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.tests.system.clustering;
|
||||
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
|
@ -32,9 +31,7 @@ public class SimpleNodeRestartIT extends NiFiSystemIT {
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.nifi.scheduling.ExecutionNode;
|
|||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.ConnectionDTO;
|
||||
import org.apache.nifi.web.api.dto.NodeDTO;
|
||||
|
@ -55,9 +54,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.tests.system.parameters;
|
||||
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
|
||||
/**
|
||||
|
@ -25,8 +24,6 @@ import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
|||
public class ClusteredParameterContextIT extends ParameterContextIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,14 +17,11 @@
|
|||
package org.apache.nifi.tests.system.rpg;
|
||||
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
|
||||
public class ClusteredRemoteProcessGroupIT extends RemoteProcessGroupIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ public class RemoteProcessGroupIT extends NiFiSystemIT {
|
|||
|
||||
// Create a flow that is GenerateFlowFile -> RPG, connected to the input port
|
||||
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
|
||||
RemoteProcessGroupEntity rpg = getClientUtil().createRPG("root", protocol);
|
||||
RemoteProcessGroupEntity rpg = getClientUtil().createRPG("root", getClientApiPort(), protocol);
|
||||
|
||||
util.updateProcessorProperties(generateFlowFile, Collections.singletonMap("File Size", "1 KB"));
|
||||
util.updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "3"));
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.verification;
|
|||
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
|
@ -35,9 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
public class ClusteredVerifiableControllerServiceSystemIT extends VerifiableControllerServiceSystemIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.verification;
|
|||
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
|
||||
import org.apache.nifi.web.api.entity.ParameterProviderEntity;
|
||||
|
@ -35,9 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||
public class ClusteredVerifiableParameterProviderSystemIT extends VerifiableParameterProviderSystemIT {
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.verification;
|
|||
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
|
||||
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
||||
|
@ -36,9 +35,7 @@ public class ClusteredVerifiableProcessorSystemIT extends VerifiableProcessorSys
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.tests.system.verification;
|
|||
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.tests.system.NiFiInstanceFactory;
|
||||
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
|
||||
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
||||
|
@ -36,9 +35,7 @@ public class ClusteredVerifiableReportingTaskSystemIT extends VerifiableReportin
|
|||
|
||||
@Override
|
||||
public NiFiInstanceFactory getInstanceFactory() {
|
||||
return new SpawnedClusterNiFiInstanceFactory(
|
||||
"src/test/resources/conf/clustered/node1/bootstrap.conf",
|
||||
"src/test/resources/conf/clustered/node2/bootstrap.conf");
|
||||
return createTwoNodeInstanceFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -134,7 +134,7 @@ nifi.components.status.snapshot.frequency=1 min
|
|||
# Site to Site properties
|
||||
nifi.remote.input.host=
|
||||
nifi.remote.input.secure=false
|
||||
nifi.remote.input.socket.port=7781
|
||||
nifi.remote.input.socket.port=7780
|
||||
nifi.remote.input.http.enabled=true
|
||||
nifi.remote.input.http.transaction.ttl=30 sec
|
||||
nifi.remote.contents.cache.expiration=30 secs
|
||||
|
@ -142,7 +142,7 @@ nifi.remote.contents.cache.expiration=30 secs
|
|||
# web properties #
|
||||
nifi.web.war.directory=./lib
|
||||
nifi.web.http.host=
|
||||
nifi.web.http.port=5671
|
||||
nifi.web.http.port=5670
|
||||
nifi.web.http.network.interface.default=
|
||||
nifi.web.https.host=
|
||||
nifi.web.https.port=
|
||||
|
|
Loading…
Reference in New Issue