diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index a23fd0e4b1..0e47331b60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -96,6 +96,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -152,7 +153,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by synchronized keyword private ExecutionNode executionNode; - private final Map activeThreads = new HashMap<>(48); + private final Map activeThreads = new ConcurrentHashMap<>(48); private final int hashCode; private volatile boolean hasActiveThreads = false; diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 23e3a6fa3d..d94529da0b 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -273,7 +273,7 @@ public abstract class NiFiSystemIT { waitForQueueCountToMatch(connectionId, size -> size > 0, "greater than 0"); - logger.info("Waiting for Queue on Connection {} is not empty", connectionId); + logger.info("Queue on Connection {} is not empty", connectionId); } protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java new file mode 100644 index 0000000000..1e66fc338a --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.clustering; + +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +public class OffloadIT extends NiFiSystemIT { + private static final Logger logger = LoggerFactory.getLogger(OffloadIT.class); + + @Override + protected NiFiInstanceFactory getInstanceFactory() { + return new SpawnedClusterNiFiInstanceFactory( + "src/test/resources/conf/clustered/node1/bootstrap.conf", + "src/test/resources/conf/clustered/node2/bootstrap.conf"); + } + + @Test + public void testOffload() throws InterruptedException, IOException, NiFiClientException { + for (int i=0; i < 5; i++) { + logger.info("Running iteration {}", i); + testIteration(); + logger.info("Node reconnected to cluster"); + destroyFlow(); + } + } + + private void testIteration() throws NiFiClientException, IOException, InterruptedException { + ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + ProcessorEntity sleep = getClientUtil().createProcessor("Sleep"); + ConnectionEntity connectionEntity = getClientUtil().createConnection(generate, sleep, "success"); + + getClientUtil().setAutoTerminatedRelationships(sleep, "success"); + generate = getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("File Size", "1 KB")); + final ProcessorConfigDTO configDto = generate.getComponent().getConfig(); + configDto.setSchedulingPeriod("0 sec"); + getClientUtil().updateProcessorConfig(generate, configDto); + + getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "100 ms")); + + + getClientUtil().startProcessGroupComponents("root"); + + waitForQueueNotEmpty(connectionEntity.getId()); + + final NodeDTO node2Dto = getNodeDTO(5672); + + disconnectNode(node2Dto); + + final String nodeId = node2Dto.getNodeId(); + getClientUtil().offloadNode(nodeId); + waitFor(this::isNodeOffloaded); + + getClientUtil().connectNode(nodeId); + waitForAllNodesConnected(); + } + + private boolean isNodeOffloaded() { + final ClusterEntity clusterEntity; + try { + clusterEntity = getNifiClient().getControllerClient().getNodes(); + } catch (final Exception e) { + logger.error("Failed to determine if node is offloaded", e); + return false; + } + + final Collection nodeDtos = clusterEntity.getCluster().getNodes(); + + for (final NodeDTO dto : nodeDtos) { + final String status = dto.getStatus(); + if (status.equalsIgnoreCase("OFFLOADED")) { + return true; + } + } + + return false; + } + + private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, IOException { + final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes(); + final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream() + .filter(nodeDto -> nodeDto.getApiPort() == apiPort) + .findAny() + .orElseThrow(() -> new RuntimeException("Could not locate Node 2")); + + return node2Dto; + } + + + private void disconnectNode(final NodeDTO nodeDto) throws NiFiClientException, IOException, InterruptedException { + getClientUtil().disconnectNode(nodeDto.getNodeId()); + + final Integer apiPort = nodeDto.getApiPort(); + waitFor(() -> { + try { + final NodeDTO dto = getNodeDTO(apiPort); + final String status = dto.getStatus(); + return "DISCONNECTED".equals(status); + } catch (final Exception e) { + logger.error("Failed to determine if node is disconnected", e); + } + + return false; + }); + } + +}