From 321e979673139310b7f92dfccd926092a2b6c1c9 Mon Sep 17 00:00:00 2001 From: markap14 Date: Tue, 23 Mar 2021 14:17:54 -0400 Subject: [PATCH] NIFI-8353: When receiving data via load-balanced connection, throw an Exception (resulting in an ABORT_TRANSACTION status code) when attempting to add received FlowFiles to the FlowFile queue, if the node is not currently connected to cluster. In this case, ensure that we remove the received FlowFiles from the FlowFile Repository and emit a DROP event to the repository (with appropriate details) to coincide with the already-committed Provenance events. Also ensure that when a node is disconnected that we don't keep sending to that node until it reconnects. While testing the fixes via LoadBalancedQueueIT, also noticed that unit test logging was not working properly in nifi-framework-core because of the (erroneous) dependency on logback-class; removed this dependency and updated unit tests that were created that depended on its existence. (#4924) --- .../queue/IllegalClusterStateException.java | 24 +++++ .../queue/LoadBalancedFlowFileQueue.java | 4 +- .../nifi-framework-core/pom.xml | 4 - .../nifi/controller/FlowController.java | 2 +- .../SocketLoadBalancedFlowFileQueue.java | 10 +- .../async/nio/NioAsyncLoadBalanceClient.java | 22 +++- .../nio/NioAsyncLoadBalanceClientFactory.java | 8 +- .../server/StandardLoadBalanceProtocol.java | 72 ++++++++++++- ...edSequentialAccessWriteAheadLogTest.groovy | 37 +------ .../queue/clustered/LoadBalancedQueueIT.java | 77 ++++++------- .../TestStandardLoadBalanceProtocol.java | 7 +- .../repository/TestFileSystemRepository.java | 102 +++++------------- 12 files changed, 208 insertions(+), 161 deletions(-) create mode 100644 nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java new file mode 100644 index 0000000000..35aa4e70be --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class IllegalClusterStateException extends Exception { + public IllegalClusterStateException(final String message) { + super(message); + } +} diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java index b9f6951960..d1008324ee 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -25,8 +25,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue { /** * Adds the given FlowFiles to this queue, as they have been received from another node in the cluster * @param flowFiles the FlowFiles received from the peer + * + * @throws IllegalClusterStateException if the node is not currently in a state in which it can receive data from other nodes */ - void receiveFromPeer(Collection flowFiles); + void receiveFromPeer(Collection flowFiles) throws IllegalClusterStateException; /** * Distributes the given FlowFiles to the appropriate partitions. Unlike the {@link #putAll(Collection)} method, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 076f1ec8bd..0838e36b5b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -76,10 +76,6 @@ org.apache.nifi nifi-site-to-site - - ch.qos.logback - logback-classic - org.apache.nifi nifi-framework-cluster-protocol diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 46610abc36..2cb5e28a60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -763,7 +763,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE); final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository), - eventReporter, new StandardLoadBalanceFlowFileCodec()); + eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator); loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode); final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 06c86d1fbf..30638bd1c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -28,6 +28,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; @@ -753,9 +754,16 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return partition; } - public void receiveFromPeer(final Collection flowFiles) { + public void receiveFromPeer(final Collection flowFiles) throws IllegalClusterStateException { partitionReadLock.lock(); try { + if (offloaded) { + throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading"); + } + if (!clusterCoordinator.isConnected()) { + throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster"); + } + if (partitioner.isRebalanceOnClusterResize()) { logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles); putAll(flowFiles); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index ac2a561cee..f3c4df226d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -17,6 +17,9 @@ package org.apache.nifi.controller.queue.clustered.client.async.nio; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess; @@ -66,6 +69,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { private final FlowFileContentAccess flowFileContentAccess; private final LoadBalanceFlowFileCodec flowFileCodec; private final EventReporter eventReporter; + private final ClusterCoordinator clusterCoordinator; private volatile boolean running = false; private final AtomicLong penalizationEnd = new AtomicLong(0L); @@ -88,13 +92,14 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { public NioAsyncLoadBalanceClient(final NodeIdentifier nodeIdentifier, final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess, - final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter) { + final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter, final ClusterCoordinator clusterCoordinator) { this.nodeIdentifier = nodeIdentifier; this.sslContext = sslContext; this.timeoutMillis = timeoutMillis; this.flowFileContentAccess = flowFileContentAccess; this.flowFileCodec = flowFileCodec; this.eventReporter = eventReporter; + this.clusterCoordinator = clusterCoordinator; } @Override @@ -347,7 +352,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { try { RegisteredPartition partition; while ((partition = partitionQueue.poll()) != null) { - if (partition.isEmpty() || partition.isPenalized() || !filter.test(partition)) { + if (partition.isEmpty() || partition.isPenalized() || !checkNodeConnected() || !filter.test(partition)) { polledPartitions.add(partition); continue; } @@ -361,6 +366,19 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { } } + private synchronized boolean checkNodeConnected() { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeIdentifier); + final boolean connected = status != null && status.getState() == NodeConnectionState.CONNECTED; + + // If not connected but the last known state is connected, we know that the node has just transitioned to disconnected. + // In this case we need to call #nodeDisconnected in order to allow for failover to take place + if (!connected) { + nodeDisconnected(); + } + + return connected; + } + private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) { if (loadBalanceSession != null && !loadBalanceSession.isComplete()) { return loadBalanceSession; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java index 79fe4be572..73546aba8f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java @@ -17,10 +17,10 @@ package org.apache.nifi.controller.queue.clustered.client.async.nio; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess; import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec; -import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec; import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientFactory; import org.apache.nifi.events.EventReporter; @@ -32,19 +32,21 @@ public class NioAsyncLoadBalanceClientFactory implements AsyncLoadBalanceClientF private final FlowFileContentAccess flowFileContentAccess; private final EventReporter eventReporter; private final LoadBalanceFlowFileCodec flowFileCodec; + private final ClusterCoordinator clusterCoordinator; public NioAsyncLoadBalanceClientFactory(final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess, final EventReporter eventReporter, - final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec) { + final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec, final ClusterCoordinator clusterCoordinator) { this.sslContext = sslContext; this.timeoutMillis = timeoutMillis; this.flowFileContentAccess = flowFileContentAccess; this.eventReporter = eventReporter; this.flowFileCodec = loadBalanceFlowFileCodec; + this.clusterCoordinator = clusterCoordinator; } @Override public NioAsyncLoadBalanceClient createClient(final NodeIdentifier nodeIdentifier) { - return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, new StandardLoadBalanceFlowFileCodec(), eventReporter); + return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, flowFileCodec, eventReporter, clusterCoordinator); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index c29a9ec94a..1e35bc71ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.repository.ContentRepository; @@ -314,12 +315,79 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription); registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp); updateFlowFileRepository(flowFilesReceived, flowFileQueue); - transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); + + try { + transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); + } catch (final IllegalClusterStateException e) { + logger.error("Failed to transferred received data into FlowFile Queue {}", flowFileQueue, e); + out.write(ABORT_TRANSACTION); + out.flush(); + + try { + cleanupRepositoriesOnTransferFailure(flowFilesReceived, flowFileQueue, "Rejected transfer due to " + e.getMessage()); + } catch (final Exception e1) { + logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", e1); + } + + // We log the error here and cleanup. We do not throw an Exception. If we did throw an Exception, + // the caller of this method would catch the Exception and decrement the Content Claims, etc. However, + // since we have already updated the FlowFile Repository to DROP the data, that would decrement the claims + // twice, which could lead to data loss. + return; + } out.write(CONFIRM_COMPLETE_TRANSACTION); out.flush(); } + private void cleanupRepositoriesOnTransferFailure(final List flowFilesReceived, final FlowFileQueue flowFileQueue, final String details) throws IOException { + dropFlowFilesFromRepository(flowFilesReceived, flowFileQueue); + reportDropEvents(flowFilesReceived, flowFileQueue.getIdentifier(), details); + } + + private void dropFlowFilesFromRepository(final List flowFiles, final FlowFileQueue flowFileQueue) throws IOException { + final List repoRecords = flowFiles.stream() + .map(remoteFlowFile -> { + final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile()); + record.setDestination(flowFileQueue); + record.markForDelete(); + return record; + }) + .collect(Collectors.toList()); + + flowFileRepository.updateRepository(repoRecords); + logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", + repoRecords); + } + + private void reportDropEvents(final List flowFilesReceived, final String connectionId, final String details) { + final List events = new ArrayList<>(flowFilesReceived.size()); + for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) { + final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile(); + + final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder() + .fromFlowFile(flowFileRecord) + .setEventType(ProvenanceEventType.DROP) + .setComponentId(connectionId) + .setComponentType("Load Balanced Connection") + .setDetails(details); + + final ContentClaim contentClaim = flowFileRecord.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize()); + } + + final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build(); + events.add(provenanceEvent); + } + + logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile " + + "Queue", events.size()); + provenanceRepository.registerEvents(events); + } + private void registerReceiveProvenanceEvents(final List flowFiles, final String nodeName, final String connectionId, final long startTimestamp) { final long duration = System.currentTimeMillis() - startTimestamp; @@ -361,7 +429,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { flowFileRepository.updateRepository(repoRecords); } - private void transferFlowFilesToQueue(final List remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) { + private void transferFlowFilesToQueue(final List remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) throws IllegalClusterStateException { final List flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList()); flowFileQueue.receiveFromPeer(flowFiles); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy index 03faec699e..61d0b7a69a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy @@ -17,30 +17,17 @@ package org.apache.nifi.wali -import ch.qos.logback.classic.Level + import org.apache.commons.lang3.SystemUtils import org.apache.nifi.controller.queue.FlowFileQueue -import org.apache.nifi.controller.repository.EncryptedSchemaRepositoryRecordSerde -import org.apache.nifi.controller.repository.LiveSerializedRepositoryRecord -import org.apache.nifi.controller.repository.RepositoryRecord -import org.apache.nifi.controller.repository.RepositoryRecordType -import org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde -import org.apache.nifi.controller.repository.SerializedRepositoryRecord -import org.apache.nifi.controller.repository.StandardFlowFileRecord -import org.apache.nifi.controller.repository.StandardRepositoryRecord -import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory +import org.apache.nifi.controller.repository.* import org.apache.nifi.controller.repository.claim.ResourceClaimManager import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager import org.apache.nifi.repository.schema.NoOpFieldCache import org.apache.nifi.security.kms.CryptoUtils import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration import org.bouncycastle.jce.provider.BouncyCastleProvider -import org.junit.After -import org.junit.Assume -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Rule -import org.junit.Test +import org.junit.* import org.junit.rules.TestName import org.junit.runner.RunWith import org.junit.runners.JUnit4 @@ -53,14 +40,11 @@ import org.wali.SingletonSerDeFactory import java.security.Security import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME -import static org.junit.Assert.assertNotNull @RunWith(JUnit4.class) class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLogTest.class) - private static Level ORIGINAL_REPO_LOG_LEVEL - private static Level ORIGINAL_TEST_LOG_LEVEL private static final String REPO_LOG_PACKAGE = "org.apache.nifi.security.repository" public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier" @@ -182,16 +166,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { final SequentialAccessWriteAheadLog repo = createWriteRepo(encryptedSerde) - // Turn off debugging because of the high volume - logger.debug("Temporarily turning off DEBUG logging") - def encryptorLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(REPO_LOG_PACKAGE) - ORIGINAL_REPO_LOG_LEVEL = encryptorLogger.getLevel() - encryptorLogger.setLevel(Level.INFO) - - def testLogger = (ch.qos.logback.classic.Logger) logger - ORIGINAL_TEST_LOG_LEVEL = testLogger.getLevel() - testLogger.setLevel(Level.INFO) - final List records = new ArrayList<>() 10_000.times { int i -> def attributes = [name: "User ${i}" as String, age: "${i}" as String] @@ -210,11 +184,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { // Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed) assert recovered.size() == records.size() assert recovered.every { it.type == RepositoryRecordType.CREATE } - - // Reset log level - encryptorLogger.setLevel(ORIGINAL_REPO_LOG_LEVEL) - testLogger.setLevel(ORIGINAL_TEST_LOG_LEVEL) - logger.debug("Re-enabled DEBUG logging") } private EncryptedSchemaRepositoryRecordSerde buildEncryptedSerDe(FlowFileRepositoryEncryptionConfiguration ffrec = flowFileREC) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java index a168b65537..abca9e7fe2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java @@ -17,42 +17,6 @@ package org.apache.nifi.controller.queue.clustered; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; @@ -103,6 +67,43 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class LoadBalancedQueueIT { private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com"; private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> { @@ -148,6 +149,8 @@ public class LoadBalancedQueueIT { clusterCoordinator = mock(ClusterCoordinator.class); when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers)); when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId); + when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenAnswer(invocation -> + new NodeConnectionStatus(invocation.getArgument(0, NodeIdentifier.class), NodeConnectionState.CONNECTED)); clusterEventListeners.clear(); doAnswer(new Answer() { @@ -217,7 +220,7 @@ public class LoadBalancedQueueIT { private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) { final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim()); - return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec()); + return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator); } @Test(timeout = 20_000) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index 57fc7d934f..bfd6f36c98 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.repository.ContentRepository; @@ -94,7 +95,7 @@ public class TestStandardLoadBalanceProtocol { @Before - public void setup() throws IOException { + public void setup() throws IOException, IllegalClusterStateException { flowFileQueuePutRecords = new ArrayList<>(); flowFileQueueReceiveRecords = new ArrayList<>(); flowFileRepoUpdateRecords = new ArrayList<>(); @@ -177,7 +178,7 @@ public class TestStandardLoadBalanceProtocol { @Test - public void testSimpleFlowFileTransaction() throws IOException { + public void testSimpleFlowFileTransaction() throws IOException, IllegalClusterStateException { final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); final PipedInputStream serverInput = new PipedInputStream(); @@ -571,7 +572,7 @@ public class TestStandardLoadBalanceProtocol { } @Test - public void testFlowFileNoContent() throws IOException { + public void testFlowFileNoContent() throws IOException, IllegalClusterStateException { final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); final PipedInputStream serverInput = new PipedInputStream(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index e47dd517d4..dad2e10e6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -16,40 +16,6 @@ */ package org.apache.nifi.controller.repository; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.text.NumberFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SystemUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -67,7 +33,35 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; public class TestFileSystemRepository { @@ -134,44 +128,6 @@ public class TestFileSystemRepository { + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s"); } - @Test - public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception { - // We are going to construct our own repository using different properties, so - // we need to shutdown the existing one. - shutdown(); - - Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); - ListAppender testAppender = new ListAppender<>(); - testAppender.setName("Test"); - testAppender.start(); - root.addAppender(testAppender); - final Map addProps = new HashMap<>(); - addProps.put(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis"); - final NiFiProperties localProps = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile(), addProps); - repository = new FileSystemRepository(localProps); - repository.initialize(new StandardResourceClaimManager()); - repository.purge(); - - boolean messageFound = false; - String message = "The value of nifi.content.repository.archive.cleanup.frequency property " - + "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). " - + "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task."; - - // Must synchronize on testAppender, because the call to append() is synchronized and this synchronize - // keyword guards testAppender.list. Since we are accessing testAppender.list, we must do so in a thread-safe manner. - synchronized (testAppender) { - for (ILoggingEvent event : testAppender.list) { - String actualMessage = event.getFormattedMessage(); - if (actualMessage.equals(message)) { - assertEquals(event.getLevel(), Level.WARN); - messageFound = true; - break; - } - } - } - - assertTrue(messageFound); - } @Test public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {