mirror of https://github.com/apache/nifi.git
NIFI-8353: When receiving data via load-balanced connection, throw an Exception (resulting in an ABORT_TRANSACTION status code) when attempting to add received FlowFiles to the FlowFile queue, if the node is not currently connected to cluster. In this case, ensure that we remove the received FlowFiles from the FlowFile Repository and emit a DROP event to the repository (with appropriate details) to coincide with the already-committed Provenance events. Also ensure that when a node is disconnected that we don't keep sending to that node until it reconnects. While testing the fixes via LoadBalancedQueueIT, also noticed that unit test logging was not working properly in nifi-framework-core because of the (erroneous) dependency on logback-class; removed this dependency and updated unit tests that were created that depended on its existence. (#4924)
This commit is contained in:
parent
e505e8b42d
commit
321e979673
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.controller.queue;
|
||||
|
||||
public class IllegalClusterStateException extends Exception {
|
||||
public IllegalClusterStateException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -25,8 +25,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue {
|
|||
/**
|
||||
* Adds the given FlowFiles to this queue, as they have been received from another node in the cluster
|
||||
* @param flowFiles the FlowFiles received from the peer
|
||||
*
|
||||
* @throws IllegalClusterStateException if the node is not currently in a state in which it can receive data from other nodes
|
||||
*/
|
||||
void receiveFromPeer(Collection<FlowFileRecord> flowFiles);
|
||||
void receiveFromPeer(Collection<FlowFileRecord> flowFiles) throws IllegalClusterStateException;
|
||||
|
||||
/**
|
||||
* Distributes the given FlowFiles to the appropriate partitions. Unlike the {@link #putAll(Collection)} method,
|
||||
|
|
|
@ -76,10 +76,6 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-site-to-site</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-framework-cluster-protocol</artifactId>
|
||||
|
|
|
@ -763,7 +763,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
|
||||
final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
|
||||
final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
|
||||
eventReporter, new StandardLoadBalanceFlowFileCodec());
|
||||
eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
|
||||
loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
|
||||
|
||||
final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener;
|
|||
import org.apache.nifi.controller.queue.DropFlowFileRequest;
|
||||
import org.apache.nifi.controller.queue.DropFlowFileState;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueueContents;
|
||||
import org.apache.nifi.controller.queue.IllegalClusterStateException;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
|
||||
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
|
||||
|
@ -753,9 +754,16 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
return partition;
|
||||
}
|
||||
|
||||
public void receiveFromPeer(final Collection<FlowFileRecord> flowFiles) {
|
||||
public void receiveFromPeer(final Collection<FlowFileRecord> flowFiles) throws IllegalClusterStateException {
|
||||
partitionReadLock.lock();
|
||||
try {
|
||||
if (offloaded) {
|
||||
throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading");
|
||||
}
|
||||
if (!clusterCoordinator.isConnected()) {
|
||||
throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster");
|
||||
}
|
||||
|
||||
if (partitioner.isRebalanceOnClusterResize()) {
|
||||
logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles);
|
||||
putAll(flowFiles);
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.nifi.controller.queue.clustered.client.async.nio;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceCompression;
|
||||
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
|
||||
|
@ -66,6 +69,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
|
|||
private final FlowFileContentAccess flowFileContentAccess;
|
||||
private final LoadBalanceFlowFileCodec flowFileCodec;
|
||||
private final EventReporter eventReporter;
|
||||
private final ClusterCoordinator clusterCoordinator;
|
||||
|
||||
private volatile boolean running = false;
|
||||
private final AtomicLong penalizationEnd = new AtomicLong(0L);
|
||||
|
@ -88,13 +92,14 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
|
|||
|
||||
|
||||
public NioAsyncLoadBalanceClient(final NodeIdentifier nodeIdentifier, final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess,
|
||||
final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter) {
|
||||
final LoadBalanceFlowFileCodec flowFileCodec, final EventReporter eventReporter, final ClusterCoordinator clusterCoordinator) {
|
||||
this.nodeIdentifier = nodeIdentifier;
|
||||
this.sslContext = sslContext;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.flowFileContentAccess = flowFileContentAccess;
|
||||
this.flowFileCodec = flowFileCodec;
|
||||
this.eventReporter = eventReporter;
|
||||
this.clusterCoordinator = clusterCoordinator;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -347,7 +352,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
|
|||
try {
|
||||
RegisteredPartition partition;
|
||||
while ((partition = partitionQueue.poll()) != null) {
|
||||
if (partition.isEmpty() || partition.isPenalized() || !filter.test(partition)) {
|
||||
if (partition.isEmpty() || partition.isPenalized() || !checkNodeConnected() || !filter.test(partition)) {
|
||||
polledPartitions.add(partition);
|
||||
continue;
|
||||
}
|
||||
|
@ -361,6 +366,19 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized boolean checkNodeConnected() {
|
||||
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeIdentifier);
|
||||
final boolean connected = status != null && status.getState() == NodeConnectionState.CONNECTED;
|
||||
|
||||
// If not connected but the last known state is connected, we know that the node has just transitioned to disconnected.
|
||||
// In this case we need to call #nodeDisconnected in order to allow for failover to take place
|
||||
if (!connected) {
|
||||
nodeDisconnected();
|
||||
}
|
||||
|
||||
return connected;
|
||||
}
|
||||
|
||||
private synchronized LoadBalanceSession getActiveTransaction(final RegisteredPartition proposedPartition) {
|
||||
if (loadBalanceSession != null && !loadBalanceSession.isComplete()) {
|
||||
return loadBalanceSession;
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
|
||||
package org.apache.nifi.controller.queue.clustered.client.async.nio;
|
||||
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
|
||||
import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec;
|
||||
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
|
||||
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientFactory;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
|
||||
|
@ -32,19 +32,21 @@ public class NioAsyncLoadBalanceClientFactory implements AsyncLoadBalanceClientF
|
|||
private final FlowFileContentAccess flowFileContentAccess;
|
||||
private final EventReporter eventReporter;
|
||||
private final LoadBalanceFlowFileCodec flowFileCodec;
|
||||
private final ClusterCoordinator clusterCoordinator;
|
||||
|
||||
public NioAsyncLoadBalanceClientFactory(final SSLContext sslContext, final int timeoutMillis, final FlowFileContentAccess flowFileContentAccess, final EventReporter eventReporter,
|
||||
final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec) {
|
||||
final LoadBalanceFlowFileCodec loadBalanceFlowFileCodec, final ClusterCoordinator clusterCoordinator) {
|
||||
this.sslContext = sslContext;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.flowFileContentAccess = flowFileContentAccess;
|
||||
this.eventReporter = eventReporter;
|
||||
this.flowFileCodec = loadBalanceFlowFileCodec;
|
||||
this.clusterCoordinator = clusterCoordinator;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public NioAsyncLoadBalanceClient createClient(final NodeIdentifier nodeIdentifier) {
|
||||
return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, new StandardLoadBalanceFlowFileCodec(), eventReporter);
|
||||
return new NioAsyncLoadBalanceClient(nodeIdentifier, sslContext, timeoutMillis, flowFileContentAccess, flowFileCodec, eventReporter, clusterCoordinator);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server;
|
|||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.IllegalClusterStateException;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceCompression;
|
||||
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.ContentRepository;
|
||||
|
@ -314,12 +315,79 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
|
|||
logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription);
|
||||
registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp);
|
||||
updateFlowFileRepository(flowFilesReceived, flowFileQueue);
|
||||
transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
|
||||
|
||||
try {
|
||||
transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
|
||||
} catch (final IllegalClusterStateException e) {
|
||||
logger.error("Failed to transferred received data into FlowFile Queue {}", flowFileQueue, e);
|
||||
out.write(ABORT_TRANSACTION);
|
||||
out.flush();
|
||||
|
||||
try {
|
||||
cleanupRepositoriesOnTransferFailure(flowFilesReceived, flowFileQueue, "Rejected transfer due to " + e.getMessage());
|
||||
} catch (final Exception e1) {
|
||||
logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", e1);
|
||||
}
|
||||
|
||||
// We log the error here and cleanup. We do not throw an Exception. If we did throw an Exception,
|
||||
// the caller of this method would catch the Exception and decrement the Content Claims, etc. However,
|
||||
// since we have already updated the FlowFile Repository to DROP the data, that would decrement the claims
|
||||
// twice, which could lead to data loss.
|
||||
return;
|
||||
}
|
||||
|
||||
out.write(CONFIRM_COMPLETE_TRANSACTION);
|
||||
out.flush();
|
||||
}
|
||||
|
||||
private void cleanupRepositoriesOnTransferFailure(final List<RemoteFlowFileRecord> flowFilesReceived, final FlowFileQueue flowFileQueue, final String details) throws IOException {
|
||||
dropFlowFilesFromRepository(flowFilesReceived, flowFileQueue);
|
||||
reportDropEvents(flowFilesReceived, flowFileQueue.getIdentifier(), details);
|
||||
}
|
||||
|
||||
private void dropFlowFilesFromRepository(final List<RemoteFlowFileRecord> flowFiles, final FlowFileQueue flowFileQueue) throws IOException {
|
||||
final List<RepositoryRecord> repoRecords = flowFiles.stream()
|
||||
.map(remoteFlowFile -> {
|
||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
|
||||
record.setDestination(flowFileQueue);
|
||||
record.markForDelete();
|
||||
return record;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
flowFileRepository.updateRepository(repoRecords);
|
||||
logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue",
|
||||
repoRecords);
|
||||
}
|
||||
|
||||
private void reportDropEvents(final List<RemoteFlowFileRecord> flowFilesReceived, final String connectionId, final String details) {
|
||||
final List<ProvenanceEventRecord> events = new ArrayList<>(flowFilesReceived.size());
|
||||
for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
|
||||
final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
|
||||
|
||||
final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder()
|
||||
.fromFlowFile(flowFileRecord)
|
||||
.setEventType(ProvenanceEventType.DROP)
|
||||
.setComponentId(connectionId)
|
||||
.setComponentType("Load Balanced Connection")
|
||||
.setDetails(details);
|
||||
|
||||
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
|
||||
if (contentClaim != null) {
|
||||
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
|
||||
provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
|
||||
contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize());
|
||||
}
|
||||
|
||||
final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build();
|
||||
events.add(provenanceEvent);
|
||||
}
|
||||
|
||||
logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile " +
|
||||
"Queue", events.size());
|
||||
provenanceRepository.registerEvents(events);
|
||||
}
|
||||
|
||||
private void registerReceiveProvenanceEvents(final List<RemoteFlowFileRecord> flowFiles, final String nodeName, final String connectionId, final long startTimestamp) {
|
||||
final long duration = System.currentTimeMillis() - startTimestamp;
|
||||
|
||||
|
@ -361,7 +429,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
|
|||
flowFileRepository.updateRepository(repoRecords);
|
||||
}
|
||||
|
||||
private void transferFlowFilesToQueue(final List<RemoteFlowFileRecord> remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) {
|
||||
private void transferFlowFilesToQueue(final List<RemoteFlowFileRecord> remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) throws IllegalClusterStateException {
|
||||
final List<FlowFileRecord> flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList());
|
||||
flowFileQueue.receiveFromPeer(flowFiles);
|
||||
}
|
||||
|
|
|
@ -17,30 +17,17 @@
|
|||
|
||||
package org.apache.nifi.wali
|
||||
|
||||
import ch.qos.logback.classic.Level
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue
|
||||
import org.apache.nifi.controller.repository.EncryptedSchemaRepositoryRecordSerde
|
||||
import org.apache.nifi.controller.repository.LiveSerializedRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.RepositoryRecord
|
||||
import org.apache.nifi.controller.repository.RepositoryRecordType
|
||||
import org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde
|
||||
import org.apache.nifi.controller.repository.SerializedRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.StandardFlowFileRecord
|
||||
import org.apache.nifi.controller.repository.StandardRepositoryRecord
|
||||
import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory
|
||||
import org.apache.nifi.controller.repository.*
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
|
||||
import org.apache.nifi.repository.schema.NoOpFieldCache
|
||||
import org.apache.nifi.security.kms.CryptoUtils
|
||||
import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.After
|
||||
import org.junit.Assume
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.*
|
||||
import org.junit.rules.TestName
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.JUnit4
|
||||
|
@ -53,14 +40,11 @@ import org.wali.SingletonSerDeFactory
|
|||
import java.security.Security
|
||||
|
||||
import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME
|
||||
import static org.junit.Assert.assertNotNull
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLogTest.class)
|
||||
|
||||
private static Level ORIGINAL_REPO_LOG_LEVEL
|
||||
private static Level ORIGINAL_TEST_LOG_LEVEL
|
||||
private static final String REPO_LOG_PACKAGE = "org.apache.nifi.security.repository"
|
||||
|
||||
public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"
|
||||
|
@ -182,16 +166,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
|
|||
|
||||
final SequentialAccessWriteAheadLog<SerializedRepositoryRecord> repo = createWriteRepo(encryptedSerde)
|
||||
|
||||
// Turn off debugging because of the high volume
|
||||
logger.debug("Temporarily turning off DEBUG logging")
|
||||
def encryptorLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(REPO_LOG_PACKAGE)
|
||||
ORIGINAL_REPO_LOG_LEVEL = encryptorLogger.getLevel()
|
||||
encryptorLogger.setLevel(Level.INFO)
|
||||
|
||||
def testLogger = (ch.qos.logback.classic.Logger) logger
|
||||
ORIGINAL_TEST_LOG_LEVEL = testLogger.getLevel()
|
||||
testLogger.setLevel(Level.INFO)
|
||||
|
||||
final List<SerializedRepositoryRecord> records = new ArrayList<>()
|
||||
10_000.times { int i ->
|
||||
def attributes = [name: "User ${i}" as String, age: "${i}" as String]
|
||||
|
@ -210,11 +184,6 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
|
|||
// Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed)
|
||||
assert recovered.size() == records.size()
|
||||
assert recovered.every { it.type == RepositoryRecordType.CREATE }
|
||||
|
||||
// Reset log level
|
||||
encryptorLogger.setLevel(ORIGINAL_REPO_LOG_LEVEL)
|
||||
testLogger.setLevel(ORIGINAL_TEST_LOG_LEVEL)
|
||||
logger.debug("Re-enabled DEBUG logging")
|
||||
}
|
||||
|
||||
private EncryptedSchemaRepositoryRecordSerde buildEncryptedSerDe(FlowFileRepositoryEncryptionConfiguration ffrec = flowFileREC) {
|
||||
|
|
|
@ -17,42 +17,6 @@
|
|||
|
||||
package org.apache.nifi.controller.queue.clustered;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
|
@ -103,6 +67,43 @@ import org.mockito.Mockito;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class LoadBalancedQueueIT {
|
||||
private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = (sslSocket) -> sslSocket == null ? null : "authorized.mydomain.com";
|
||||
private final LoadBalanceAuthorizer NEVER_AUTHORIZED = (sslSocket) -> {
|
||||
|
@ -148,6 +149,8 @@ public class LoadBalancedQueueIT {
|
|||
clusterCoordinator = mock(ClusterCoordinator.class);
|
||||
when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers));
|
||||
when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId);
|
||||
when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenAnswer(invocation ->
|
||||
new NodeConnectionStatus(invocation.getArgument(0, NodeIdentifier.class), NodeConnectionState.CONNECTED));
|
||||
|
||||
clusterEventListeners.clear();
|
||||
doAnswer(new Answer() {
|
||||
|
@ -217,7 +220,7 @@ public class LoadBalancedQueueIT {
|
|||
|
||||
private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) {
|
||||
final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim());
|
||||
return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec());
|
||||
return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
|
||||
}
|
||||
|
||||
@Test(timeout = 20_000)
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.server;
|
|||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.flow.FlowManager;
|
||||
import org.apache.nifi.controller.queue.IllegalClusterStateException;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceCompression;
|
||||
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.ContentRepository;
|
||||
|
@ -94,7 +95,7 @@ public class TestStandardLoadBalanceProtocol {
|
|||
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
public void setup() throws IOException, IllegalClusterStateException {
|
||||
flowFileQueuePutRecords = new ArrayList<>();
|
||||
flowFileQueueReceiveRecords = new ArrayList<>();
|
||||
flowFileRepoUpdateRecords = new ArrayList<>();
|
||||
|
@ -177,7 +178,7 @@ public class TestStandardLoadBalanceProtocol {
|
|||
|
||||
|
||||
@Test
|
||||
public void testSimpleFlowFileTransaction() throws IOException {
|
||||
public void testSimpleFlowFileTransaction() throws IOException, IllegalClusterStateException {
|
||||
final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED);
|
||||
|
||||
final PipedInputStream serverInput = new PipedInputStream();
|
||||
|
@ -571,7 +572,7 @@ public class TestStandardLoadBalanceProtocol {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileNoContent() throws IOException {
|
||||
public void testFlowFileNoContent() throws IOException, IllegalClusterStateException {
|
||||
final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED);
|
||||
|
||||
final PipedInputStream serverInput = new PipedInputStream();
|
||||
|
|
|
@ -16,40 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.read.ListAppender;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
|
@ -67,7 +33,35 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestFileSystemRepository {
|
||||
|
||||
|
@ -134,44 +128,6 @@ public class TestFileSystemRepository {
|
|||
+ NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception {
|
||||
// We are going to construct our own repository using different properties, so
|
||||
// we need to shutdown the existing one.
|
||||
shutdown();
|
||||
|
||||
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
|
||||
ListAppender<ILoggingEvent> testAppender = new ListAppender<>();
|
||||
testAppender.setName("Test");
|
||||
testAppender.start();
|
||||
root.addAppender(testAppender);
|
||||
final Map<String, String> addProps = new HashMap<>();
|
||||
addProps.put(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 millis");
|
||||
final NiFiProperties localProps = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile(), addProps);
|
||||
repository = new FileSystemRepository(localProps);
|
||||
repository.initialize(new StandardResourceClaimManager());
|
||||
repository.purge();
|
||||
|
||||
boolean messageFound = false;
|
||||
String message = "The value of nifi.content.repository.archive.cleanup.frequency property "
|
||||
+ "is set to '1 millis' which is below the allowed minimum of 1 second (1000 milliseconds). "
|
||||
+ "Minimum value of 1 sec will be used as scheduling interval for archive cleanup task.";
|
||||
|
||||
// Must synchronize on testAppender, because the call to append() is synchronized and this synchronize
|
||||
// keyword guards testAppender.list. Since we are accessing testAppender.list, we must do so in a thread-safe manner.
|
||||
synchronized (testAppender) {
|
||||
for (ILoggingEvent event : testAppender.list) {
|
||||
String actualMessage = event.getFormattedMessage();
|
||||
if (actualMessage.equals(message)) {
|
||||
assertEquals(event.getLevel(), Level.WARN);
|
||||
messageFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(messageFound);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue