diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java new file mode 100644 index 0000000000..35aa4e70be --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/IllegalClusterStateException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +public class IllegalClusterStateException extends Exception { + public IllegalClusterStateException(final String message) { + super(message); + } +} diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java index b9f6951960..d1008324ee 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -25,8 +25,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue { /** * Adds the given FlowFiles to this queue, as they have been received from another node in the cluster * @param flowFiles the FlowFiles received from the peer + * + * @throws IllegalClusterStateException if the node is not currently in a state in which it can receive data from other nodes */ - void receiveFromPeer(Collection flowFiles); + void receiveFromPeer(Collection flowFiles) throws IllegalClusterStateException; /** * Distributes the given FlowFiles to the appropriate partitions. Unlike the {@link #putAll(Collection)} method, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 076f1ec8bd..0838e36b5b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -76,10 +76,6 @@ org.apache.nifi nifi-site-to-site - - ch.qos.logback - logback-classic - org.apache.nifi nifi-framework-cluster-protocol diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 46610abc36..2cb5e28a60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -763,7 +763,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE); final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository), - eventReporter, new StandardLoadBalanceFlowFileCodec()); + eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator); loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode); final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 06c86d1fbf..30638bd1c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -28,6 +28,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.FlowFileQueueContents; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; @@ -753,9 +754,16 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return partition; } - public void receiveFromPeer(final Collection flowFiles) { + public void receiveFromPeer(final Collection flowFiles) throws IllegalClusterStateException { partitionReadLock.lock(); try { + if (offloaded) { + throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading"); + } + if (!clusterCoordinator.isConnected()) { + throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster"); + } + if (partitioner.isRebalanceOnClusterResize()) { logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles); putAll(flowFiles); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index ac2a561cee..f3c4df226d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -17,6 +17,9 @@ package org.apache.nifi.controller.queue.clustered.client.async.nio; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess; @@ -66,6 +69,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { private final FlowFileContentAccess flowFileContentAccess; private final LoadBalanceFlowFileCodec flowFileCodec; private final EventReporter eventReporter; + private final ClusterCoordinator clusterCoordinator; private volatile boolean running = false; private final AtomicLong penalizationEnd = new AtomicLong(0L); @@ -88,13 +92,14 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { public NioAsyncLoadBalanceClient(final NodeIdentifier nodeIdentifier, final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess, - final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter) { + final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter, final ClusterCoordinator clusterCoordinator) { this.nodeIdentifier = nodeIdentifier; this.sslContext = sslContext; this.timeoutMillis = timeoutMillis; this.flowFileContentAccess = flowFileContentAccess; this.flowFileCodec = flowFileCodec; this.eventReporter = eventReporter; + this.clusterCoordinator = clusterCoordinator; } @Override @@ -347,7 +352,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { try { RegisteredPartition partition; while ((partition = partitionQueue.poll()) != null) { - if (partition.isEmpty() || partition.isPenalized() || !filter.test(partition)) { + if (partition.isEmpty() || partition.isPenalized() || !checkNodeConnected() || !filter.test(partition)) { polledPartitions.add(partition); continue; } @@ -361,6 +366,19 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { } } + private synchronized boolean checkNodeConnected() { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeIdentifier); + final boolean connected = status != null && status.getState() == NodeConnectionState.CONNECTED; + + // If not connected but the last known state is connected, we know that the node has just transitioned to disconnected. + // In this case we need to call #nodeDisconnected in order to allow for failover to take place + if (!connected) { + nodeDisconnected(); + } + + return connected; + } + private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) { if (loadBalanceSession != null && !loadBalanceSession.isComplete()) { return loadBalanceSession; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java index 79fe4be572..73546aba8f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientFactory.java @@ -17,10 +17,10 @@ package org.apache.nifi.controller.queue.clustered.client.async.nio; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess; import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec; -import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec; import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientFactory; import org.apache.nifi.events.EventReporter; @@ -32,19 +32,21 @@ public class NioAsyncLoadBalanceClientFactory implements AsyncLoadBalanceClientF private final FlowFileContentAccess flowFileContentAccess; private final EventReporter eventReporter; private final LoadBalanceFlowFileCodec flowFileCodec; + private final ClusterCoordinator clusterCoordinator; public NioAsyncLoadBalanceClientFactory(final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess, final EventReporter eventReporter, - final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec) { + final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec, final ClusterCoordinator clusterCoordinator) { this.sslContext = sslContext; this.timeoutMillis = timeoutMillis; this.flowFileContentAccess = flowFileContentAccess; this.eventReporter = eventReporter; this.flowFileCodec = loadBalanceFlowFileCodec; + this.clusterCoordinator = clusterCoordinator; } @Override public NioAsyncLoadBalanceClient createClient(final NodeIdentifier nodeIdentifier) { - return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, new StandardLoadBalanceFlowFileCodec(), eventReporter); + return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, flowFileCodec, eventReporter, clusterCoordinator); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index c29a9ec94a..1e35bc71ac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.repository.ContentRepository; @@ -314,12 +315,79 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription); registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp); updateFlowFileRepository(flowFilesReceived, flowFileQueue); - transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); + + try { + transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); + } catch (final IllegalClusterStateException e) { + logger.error("Failed to transferred received data into FlowFile Queue {}", flowFileQueue, e); + out.write(ABORT_TRANSACTION); + out.flush(); + + try { + cleanupRepositoriesOnTransferFailure(flowFilesReceived, flowFileQueue, "Rejected transfer due to " + e.getMessage()); + } catch (final Exception e1) { + logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", e1); + } + + // We log the error here and cleanup. We do not throw an Exception. If we did throw an Exception, + // the caller of this method would catch the Exception and decrement the Content Claims, etc. However, + // since we have already updated the FlowFile Repository to DROP the data, that would decrement the claims + // twice, which could lead to data loss. + return; + } out.write(CONFIRM_COMPLETE_TRANSACTION); out.flush(); } + private void cleanupRepositoriesOnTransferFailure(final List flowFilesReceived, final FlowFileQueue flowFileQueue, final String details) throws IOException { + dropFlowFilesFromRepository(flowFilesReceived, flowFileQueue); + reportDropEvents(flowFilesReceived, flowFileQueue.getIdentifier(), details); + } + + private void dropFlowFilesFromRepository(final List flowFiles, final FlowFileQueue flowFileQueue) throws IOException { + final List repoRecords = flowFiles.stream() + .map(remoteFlowFile -> { + final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile()); + record.setDestination(flowFileQueue); + record.markForDelete(); + return record; + }) + .collect(Collectors.toList()); + + flowFileRepository.updateRepository(repoRecords); + logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", + repoRecords); + } + + private void reportDropEvents(final List flowFilesReceived, final String connectionId, final String details) { + final List events = new ArrayList<>(flowFilesReceived.size()); + for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) { + final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile(); + + final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder() + .fromFlowFile(flowFileRecord) + .setEventType(ProvenanceEventType.DROP) + .setComponentId(connectionId) + .setComponentType("Load Balanced Connection") + .setDetails(details); + + final ContentClaim contentClaim = flowFileRecord.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize()); + } + + final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build(); + events.add(provenanceEvent); + } + + logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile " + + "Queue", events.size()); + provenanceRepository.registerEvents(events); + } + private void registerReceiveProvenanceEvents(final List flowFiles, final String nodeName, final String connectionId, final long startTimestamp) { final long duration = System.currentTimeMillis() - startTimestamp; @@ -361,7 +429,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { flowFileRepository.updateRepository(repoRecords); } - private void transferFlowFilesToQueue(final List remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) { + private void transferFlowFilesToQueue(final List remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) throws IllegalClusterStateException { final List flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList()); flowFileQueue.receiveFromPeer(flowFiles); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy index 03faec699e..61d0b7a69a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy @@ -17,30 +17,17 @@ package org.apache.nifi.wali -import ch.qos.logback.classic.Level + import org.apache.commons.lang3.SystemUtils import org.apache.nifi.controller.queue.FlowFileQueue -import org.apache.nifi.controller.repository.EncryptedSchemaRepositoryRecordSerde -import org.apache.nifi.controller.repository.LiveSerializedRepositoryRecord -import org.apache.nifi.controller.repository.RepositoryRecord -import org.apache.nifi.controller.repository.RepositoryRecordType -import org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde -import org.apache.nifi.controller.repository.SerializedRepositoryRecord -import org.apache.nifi.controller.repository.StandardFlowFileRecord -import org.apache.nifi.controller.repository.StandardRepositoryRecord -import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory +import org.apache.nifi.controller.repository.* import org.apache.nifi.controller.repository.claim.ResourceClaimManager import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager import org.apache.nifi.repository.schema.NoOpFieldCache import org.apache.nifi.security.kms.CryptoUtils import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration import org.bouncycastle.jce.provider.BouncyCastleProvider -import org.junit.After -import org.junit.Assume -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Rule -import org.junit.Test +import org.junit.* import org.junit.rules.TestName import org.junit.runner.RunWith import org.junit.runners.JUnit4 @@ -53,14 +40,11 @@ import org.wali.SingletonSerDeFactory import java.security.Security import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME -import static org.junit.Assert.assertNotNull @RunWith(JUnit4.class) class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLogTest.class) - private static Level ORIGINAL_REPO_LOG_LEVEL - private static Level ORIGINAL_TEST_LOG_LEVEL private static final String REPO_LOG_PACKAGE = "org.apache.nifi.security.repository" public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier" @@ -182,16 +166,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { final SequentialAccessWriteAheadLog repo = createWriteRepo(encryptedSerde) - // Turn off debugging because of the high volume - logger.debug("Temporarily turning off DEBUG logging") - def encryptorLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(REPO_LOG_PACKAGE) - ORIGINAL_REPO_LOG_LEVEL = encryptorLogger.getLevel() - encryptorLogger.setLevel(Level.INFO) - - def testLogger = (ch.qos.logback.classic.Logger) logger - ORIGINAL_TEST_LOG_LEVEL = testLogger.getLevel() - testLogger.setLevel(Level.INFO) - final List records = new ArrayList<>() 10_000.times { int i -> def attributes = [name: "User ${i}" as String, age: "${i}" as String] @@ -210,11 +184,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase { // Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed) assert recovered.size() == records.size() assert recovered.every { it.type == RepositoryRecordType.CREATE } - - // Reset log level - encryptorLogger.setLevel(ORIGINAL_REPO_LOG_LEVEL) - testLogger.setLevel(ORIGINAL_TEST_LOG_LEVEL) - logger.debug("Re-enabled DEBUG logging") } private EncryptedSchemaRepositoryRecordSerde buildEncryptedSerDe(FlowFileRepositoryEncryptionConfiguration ffrec = flowFileREC) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java index a168b65537..abca9e7fe2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java @@ -17,42 +17,6 @@ package org.apache.nifi.controller.queue.clustered; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; @@ -103,6 +67,43 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class LoadBalancedQueueIT { private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com"; private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> { @@ -148,6 +149,8 @@ public class LoadBalancedQueueIT { clusterCoordinator = mock(ClusterCoordinator.class); when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers)); when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId); + when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenAnswer(invocation -> + new NodeConnectionStatus(invocation.getArgument(0, NodeIdentifier.class), NodeConnectionState.CONNECTED)); clusterEventListeners.clear(); doAnswer(new Answer() { @@ -217,7 +220,7 @@ public class LoadBalancedQueueIT { private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) { final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim()); - return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec()); + return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator); } @Test(timeout = 20_000) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index 57fc7d934f..bfd6f36c98 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.repository.ContentRepository; @@ -94,7 +95,7 @@ public class TestStandardLoadBalanceProtocol { @Before - public void setup() throws IOException { + public void setup() throws IOException, IllegalClusterStateException { flowFileQueuePutRecords = new ArrayList<>(); flowFileQueueReceiveRecords = new ArrayList<>(); flowFileRepoUpdateRecords = new ArrayList<>(); @@ -177,7 +178,7 @@ public class TestStandardLoadBalanceProtocol { @Test - public void testSimpleFlowFileTransaction() throws IOException { + public void testSimpleFlowFileTransaction() throws IOException, IllegalClusterStateException { final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); final PipedInputStream serverInput = new PipedInputStream(); @@ -571,7 +572,7 @@ public class TestStandardLoadBalanceProtocol { } @Test - public void testFlowFileNoContent() throws IOException { + public void testFlowFileNoContent() throws IOException, IllegalClusterStateException { final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); final PipedInputStream serverInput = new PipedInputStream(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index e47dd517d4..dad2e10e6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -16,40 +16,6 @@ */ package org.apache.nifi.controller.repository; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.text.NumberFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SystemUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -67,7 +33,35 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; public class TestFileSystemRepository { @@ -134,44 +128,6 @@ public class TestFileSystemRepository { + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s"); } - @Test - public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception { - // We are going to construct our own repository using different properties, so - // we need to shutdown the existing one. - shutdown(); - - Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); - ListAppender testAppender = new ListAppender<>(); - testAppender.setName("Test"); - testAppender.start(); - root.addAppender(testAppender); - final Map addProps = new HashMap<>(); - addProps.put(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis"); - final NiFiProperties localProps = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile(), addProps); - repository = new FileSystemRepository(localProps); - repository.initialize(new StandardResourceClaimManager()); - repository.purge(); - - boolean messageFound = false; - String message = "The value of nifi.content.repository.archive.cleanup.frequency property " - + "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). " - + "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task."; - - // Must synchronize on testAppender, because the call to append() is synchronized and this synchronize - // keyword guards testAppender.list. Since we are accessing testAppender.list, we must do so in a thread-safe manner. - synchronized (testAppender) { - for (ILoggingEvent event : testAppender.list) { - String actualMessage = event.getFormattedMessage(); - if (actualMessage.equals(message)) { - assertEquals(event.getLevel(), Level.WARN); - messageFound = true; - break; - } - } - } - - assertTrue(messageFound); - } @Test public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {