NIFI-9434: Added bulletin to alert FileSystemRepository backpressure (#5596)

This commit is contained in:
Lehel Boér 2022-01-04 21:59:32 +01:00 committed by GitHub
parent 4346dd8faf
commit be138ab656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 170 additions and 42 deletions

View File

@ -18,7 +18,6 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.IOException;
import java.io.InputStream;
@ -37,13 +36,12 @@ public interface ContentRepository {
/**
* Initializes the Content Repository, providing to it the
* ContentClaimManager that is to be used for interacting with Content
* Claims
* ContentRepositoryContext.
*
* @param claimManager to handle claims
* @param context to initialize repository
* @throws java.io.IOException if unable to init
*/
void initialize(ResourceClaimManager claimManager) throws IOException;
void initialize(ContentRepositoryContext context) throws IOException;
/**
* Shuts down the Content Repository, freeing any resources that may be

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
/**
* Initialization context for {@link ContentRepository}.
*/
public interface ContentRepositoryContext {
/**
* Provides a {@link ResourceClaimManager} for {@link ContentRepository}
* that is to be used for interacting with ContentClaims.
*
* @return a ResourceClaimManager instance
*/
ResourceClaimManager getResourceClaimManager();
/**
* Provides a {@link EventReporter} for {@link ContentRepository}
* that is used to emit bulletins.
*
* @return an EventReporter instance
*/
EventReporter getEventReporter();
}

View File

@ -88,6 +88,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardContentRepositoryContext;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardQueueProvider;
@ -1169,7 +1170,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
try {
final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ContentRepository.class, properties);
synchronized (contentRepo) {
contentRepo.initialize(resourceClaimManager);
contentRepo.initialize(new StandardContentRepositoryContext(resourceClaimManager, createEventReporter()));
}
return contentRepo;
} catch (final Exception e) {

View File

@ -23,7 +23,9 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
@ -80,7 +82,6 @@ import java.util.regex.Pattern;
/**
* Is thread safe
*
*/
public class FileSystemRepository implements ContentRepository {
@ -126,6 +127,7 @@ public class FileSystemRepository implements ContentRepository {
private final ScheduledExecutorService containerCleanupExecutor;
private ResourceClaimManager resourceClaimManager; // effectively final
private EventReporter eventReporter;
// Map of container to archived files that should be deleted next.
private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
@ -256,8 +258,9 @@ public class FileSystemRepository implements ContentRepository {
}
@Override
public void initialize(final ResourceClaimManager claimManager) {
this.resourceClaimManager = claimManager;
public void initialize(final ContentRepositoryContext context) {
this.resourceClaimManager = context.getResourceClaimManager();
this.eventReporter = context.getEventReporter();
final Map<String, Path> fileRespositoryPaths = nifiProperties.getContentRepositoryPaths();
@ -1694,7 +1697,10 @@ public class FileSystemRepository implements ContentRepository {
try {
while (isWaitRequired()) {
try {
LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
final String message = String.format("Unable to write flowfile content to content repository container %s due to archive file size constraints;" +
" waiting for archive cleanup", containerName);
LOG.warn(message);
eventReporter.reportEvent(Severity.WARNING, "FileSystemRepository", message);
condition.await();
} catch (final InterruptedException e) {
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
public class StandardContentRepositoryContext implements ContentRepositoryContext {
private final ResourceClaimManager resourceClaimManager;
private final EventReporter eventReporter;
public StandardContentRepositoryContext(ResourceClaimManager resourceClaimManager, EventReporter eventReporter) {
this.resourceClaimManager = resourceClaimManager;
this.eventReporter = eventReporter;
}
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public EventReporter getEventReporter() {
return eventReporter;
}
}

View File

@ -135,8 +135,8 @@ public class VolatileContentRepository implements ContentRepository {
}
@Override
public void initialize(final ResourceClaimManager claimManager) {
this.claimManager = claimManager;
public void initialize(final ContentRepositoryContext context) {
this.claimManager = context.getResourceClaimManager();
for (int i = 0; i < 3; i++) {
executor.scheduleWithFixedDelay(new CleanupOldClaims(), 1000, 10, TimeUnit.MILLISECONDS);

View File

@ -17,9 +17,11 @@
package org.apache.nifi.controller.repository.crypto
import org.apache.commons.lang3.SystemUtils
import org.apache.nifi.controller.repository.StandardContentRepositoryContext
import org.apache.nifi.controller.repository.claim.ContentClaim
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
import org.apache.nifi.controller.repository.util.DiskUtils
import org.apache.nifi.events.EventReporter
import org.apache.nifi.security.kms.StaticKeyProvider
import org.apache.nifi.util.NiFiProperties
import org.bouncycastle.jce.provider.BouncyCastleProvider
@ -94,7 +96,7 @@ class EncryptedFileSystemRepositoryTest {
EncryptedFileSystemRepository repository = new EncryptedFileSystemRepository(nifiProperties)
StandardResourceClaimManager claimManager = new StandardResourceClaimManager()
repository.initialize(claimManager)
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP))
repository.purge()
logger.info("Created EFSR with nifi.properties [${nifiPropertiesPath}] and ${additionalProperties.size()} additional properties: ${additionalProperties}")

View File

@ -33,6 +33,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@ -202,7 +203,7 @@ public class StandardProcessSessionIT {
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
contentRepo = new MockContentRepository();
contentRepo.initialize(new StandardResourceClaimManager());
contentRepo.initialize(new StandardContentRepositoryContext(new StandardResourceClaimManager(), EventReporter.NO_OP));
flowFileRepo = new MockFlowFileRepository(contentRepo);
stateManager = new MockStateManager(connectable);
@ -3175,8 +3176,8 @@ public class StandardProcessSessionIT {
}
@Override
public void initialize(ResourceClaimManager claimManager) throws IOException {
this.claimManager = claimManager;
public void initialize(ContentRepositoryContext context) throws IOException {
this.claimManager = context.getResourceClaimManager();
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
@ -65,8 +66,6 @@ import static org.junit.Assert.assertTrue;
public class TestFileSystemRepository {
public static final int NUM_REPO_SECTIONS = 1;
public static final File helloWorldFile = new File("src/test/resources/hello.txt");
private FileSystemRepository repository = null;
@ -87,7 +86,7 @@ public class TestFileSystemRepository {
}
repository = new FileSystemRepository(nifiProperties);
claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
}
@ -183,7 +182,7 @@ public class TestFileSystemRepository {
bogus.setReadable(false);
repository = new FileSystemRepository(nifiProperties);
repository.initialize(new StandardResourceClaimManager());
repository.initialize(new StandardContentRepositoryContext(new StandardResourceClaimManager(), EventReporter.NO_OP));
} finally {
bogus.setReadable(true);
assertTrue(bogus.delete());
@ -267,7 +266,7 @@ public class TestFileSystemRepository {
Thread.sleep(1000L);
repository = new FileSystemRepository(nifiProperties);
repository.initialize(new StandardResourceClaimManager());
repository.initialize(new StandardContentRepositoryContext(new StandardResourceClaimManager(), EventReporter.NO_OP));
repository.purge();
final ContentClaim claim2 = repository.create(false);
@ -567,7 +566,7 @@ public class TestFileSystemRepository {
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
final ContentClaim claim = repository.create(false);
@ -621,7 +620,7 @@ public class TestFileSystemRepository {
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
final ContentClaim claim = repository.create(false);
@ -699,7 +698,8 @@ public class TestFileSystemRepository {
};
final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
final ContentClaim claim = repository.create(false);

View File

@ -34,6 +34,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Before;
@ -57,7 +58,7 @@ public class TestVolatileContentRepository {
addProps.put(VolatileContentRepository.MAX_SIZE_PROPERTY, "10 MB");
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(claimManager);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final ContentClaim claim = contentRepo.create(true);
final OutputStream out = contentRepo.write(claim);
@ -112,7 +113,7 @@ public class TestVolatileContentRepository {
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(claimManager);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final byte[] oneK = new byte[1024];
Arrays.fill(oneK, (byte) 55);
@ -157,7 +158,7 @@ public class TestVolatileContentRepository {
addProps.put(VolatileContentRepository.MAX_SIZE_PROPERTY, "11 MB");
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(claimManager);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final ContentClaim claim = contentRepo.create(true);
final OutputStream out = contentRepo.write(claim);

View File

@ -25,8 +25,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.StandardContentRepositoryContext;
import org.apache.nifi.controller.repository.TestFileSystemRepository;
import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
@ -49,7 +51,7 @@ public class TestContentClaimWriteCache {
}
repository = new FileSystemRepository(nifiProperties);
claimManager = new StandardResourceClaimManager();
repository.initialize(claimManager);
repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
repository.purge();
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
@ -33,6 +34,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptionMethod;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorBuilder;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.extensions.ExtensionClient;
@ -51,6 +53,7 @@ import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
@ -215,7 +218,21 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
// Initialize components. This is generally needed because of the interdependencies between the components.
// There are some circular dependencies that are resolved by passing objects via initialization rather than by providing to the constructors.
final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
contentRepo.initialize(resourceClaimManager);
final EventReporter eventReporter = (severity, category, message) -> {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
bulletinRepository.addBulletin(bulletin);
};
contentRepo.initialize(new ContentRepositoryContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public EventReporter getEventReporter() {
return eventReporter;
}
});
flowFileRepo.initialize(resourceClaimManager);
flowManager.initialize(controllerServiceProvider);

View File

@ -18,6 +18,7 @@
package org.apache.nifi.stateless.repository;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@ -43,8 +44,8 @@ public class ByteArrayContentRepository implements ContentRepository {
private ResourceClaimManager resourceClaimManager;
@Override
public void initialize(final ResourceClaimManager claimManager) {
resourceClaimManager = claimManager;
public void initialize(final ContentRepositoryContext context) {
resourceClaimManager = context.getResourceClaimManager();
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.nifi.stateless.repository;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@ -69,8 +70,8 @@ public class StatelessFileSystemContentRepository implements ContentRepository {
}
@Override
public void initialize(final ResourceClaimManager claimManager) throws IOException {
this.resourceClaimManager = claimManager;
public void initialize(final ContentRepositoryContext context) throws IOException {
this.resourceClaimManager = context.getResourceClaimManager();
if (!directory.exists() && !directory.mkdirs()) {
throw new IOException("Cannot initialize Content Repository because " + directory.getAbsolutePath() + " does not exist and cannot be created");
}

View File

@ -17,9 +17,12 @@
package org.apache.nifi.stateless.repository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -41,10 +44,22 @@ public class TestStatelessFileSystemContentRepository {
private final File repoDirectory = new File("target/test-stateless-file-system-repository");
private StatelessFileSystemContentRepository repository;
private final ContentRepositoryContext contentRepositoryContext = new ContentRepositoryContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return new StandardResourceClaimManager();
}
@Override
public EventReporter getEventReporter() {
return EventReporter.NO_OP;
}
};
@BeforeEach
public void setup() throws IOException {
repository = new StatelessFileSystemContentRepository(repoDirectory);
repository.initialize(new StandardResourceClaimManager());
repository.initialize(contentRepositoryContext);
}
@AfterEach