NIFI-9823 Removed VolatileContentRepository

- Removed Volatile Content Repository documentation references

This closes #5889

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
exceptionfactory 2022-03-17 15:29:33 -05:00 committed by Mike Thomsen
parent 2a6e521549
commit bf51687291
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
6 changed files with 4 additions and 895 deletions

View File

@ -34,7 +34,7 @@ FlowFile Repository:
out period: 5 sec
out threads: 4
Content Repository:
implementation: org.apache.nifi.controller.repository.VolatileContentRepository
implementation: org.apache.nifi.controller.repository.OtherContentRepository
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false

View File

@ -3318,14 +3318,14 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s
|====
|*Property*|*Description*
|`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository` and should only be changed with caution. To store flowfile content in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to `org.apache.nifi.controller.repository.VolatileContentRepository`.
|`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository`.
|====
=== File System Content Repository Properties
|====
|*Property*|*Description*
|`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository` and should only be changed with caution. To store flowfile content in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to `org.apache.nifi.controller.repository.VolatileContentRepository`.
|`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository`.
|`nifi.content.claim.max.appendable.size`|When NiFi processes many small FlowFiles, the contents of those FlowFiles are stored in the content repository, but we do not store the content of each
individual FlowFile as a separate file in the content repository. Doing so would be very detrimental to performance, if each 120 byte FlowFile, for instance, was written to its own file. Instead,
we continue writing to the same file until it reaches some threshold. This property configures that threshold. Setting the value too small can result in poor performance due to reading from and
@ -3376,14 +3376,6 @@ All of the properties defined above (see <<file-system-content-repository-proper
|`nifi.content.repository.encryption.key.id.`*|Allows for additional keys to be specified for the `StaticKeyProvider`. For example, the line `nifi.content.repository.encryption.key.id.Key2=012...210` would provide an available key `Key2`.
|====
=== Volatile Content Repository Properties
|====
|*Property*|*Description*
|`nifi.volatile.content.repository.max.size`|The Content Repository maximum size in memory. The default value is `100 MB`.
|`nifi.volatile.content.repository.block.size`|The Content Repository block size. The default value is `32 KB`.
|====
=== Provenance Repository
The Provenance Repository contains the information related to Data Provenance. The next four sections are for Provenance Repository properties.

View File

@ -1,694 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
import org.apache.nifi.controller.repository.io.MemoryManager;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
* An in-memory implementation of the {@link ContentRepository} interface. This
* implementation stores FlowFile content in the Java heap and keeps track of
* the number of bytes used. If the number of bytes used by FlowFile content
* exceeds some threshold (configured via the
* <code>nifi.volatile.content.repository.max.size</code> property in the NiFi
* properties with a default of 100 MB), one of two situations will occur:
* </p>
*
* <ul>
* <li><b>Backup Repository:</b> If a Backup Repository has been specified (via
* the {@link #setBackupRepository(ContentRepository)} method), the content will
* be stored in the backup repository and all access to the FlowFile content
* will automatically and transparently be proxied to the backup repository.
* </li>
* <li>
* <b>Without Backup Repository:</b> If no Backup Repository has been specified,
* when the threshold is exceeded, an IOException will be thrown.
* </li>
* </ul>
*
* <p>
* When a Content Claim is created via the {@link #create(boolean)} method, if
* the <code>lossTolerant</code> flag is set to <code>false</code>, the Backup
* Repository will be used to create the Content Claim and any accesses to the
* ContentClaim will be proxied to the Backup Repository. If the Backup
* Repository has not been specified, attempting to create a non-loss-tolerant
* ContentClaim will result in an {@link IllegalStateException} being thrown.
* </p>
*/
public class VolatileContentRepository implements ContentRepository {
private final Logger logger = LoggerFactory.getLogger(VolatileContentRepository.class);
public static String CONTAINER_NAME = "in-memory";
public static final int DEFAULT_BLOCK_SIZE_KB = 32;
public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size";
public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size";
private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers", true);
private final ConcurrentMap<ContentClaim, ContentBlock> claimMap = new ConcurrentHashMap<>(256);
private final AtomicLong repoSize = new AtomicLong(0L);
private final AtomicLong idGenerator = new AtomicLong(0L);
private final long maxBytes;
private final MemoryManager memoryManager;
private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
private ResourceClaimManager claimManager; // effectively final
/**
* Default no args constructor for service loading only
*/
public VolatileContentRepository() {
maxBytes = 0;
memoryManager = null;
}
public VolatileContentRepository(final NiFiProperties nifiProperties) {
final String maxSize = nifiProperties.getProperty(MAX_SIZE_PROPERTY);
final String blockSizeVal = nifiProperties.getProperty(BLOCK_SIZE_PROPERTY);
if (maxSize == null) {
maxBytes = (long) DataUnit.B.convert(100D, DataUnit.MB);
} else {
maxBytes = DataUnit.parseDataSize(maxSize, DataUnit.B).longValue();
}
final int blockSize;
if (blockSizeVal == null) {
blockSize = (int) DataUnit.B.convert(DEFAULT_BLOCK_SIZE_KB, DataUnit.KB);
} else {
blockSize = DataUnit.parseDataSize(blockSizeVal, DataUnit.B).intValue();
}
memoryManager = new MemoryManager(maxBytes, blockSize);
}
@Override
public void initialize(final ContentRepositoryContext context) {
this.claimManager = context.getResourceClaimManager();
for (int i = 0; i < 3; i++) {
executor.scheduleWithFixedDelay(new CleanupOldClaims(), 1000, 10, TimeUnit.MILLISECONDS);
}
}
@Override
public void shutdown() {
executor.shutdown();
}
/**
* Specifies a Backup Repository where data should be written if this
* Repository fills up
*
* @param backup repo backup
*/
public void setBackupRepository(final ContentRepository backup) {
final boolean updated = backupRepositoryRef.compareAndSet(null, backup);
if (!updated) {
throw new IllegalStateException("Cannot change BackupRepository after it has already been set");
}
}
public ContentRepository getBackupRepository() {
return backupRepositoryRef.get();
}
private StandardContentClaim resolveClaim(final ContentClaim claim) {
if (!(claim instanceof StandardContentClaim)) {
throw new IllegalArgumentException("Cannot increment ClaimantCount of " + claim + " because it does not belong to this ContentRepository");
}
return (StandardContentClaim) claim;
}
private ContentClaim getBackupClaim(final ContentClaim claim) {
if (claim == null) {
return null;
}
return backupRepoClaimMap.get(claim);
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
return maxBytes;
}
@Override
public Set<String> getContainerNames() {
return Collections.singleton(CONTAINER_NAME);
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
return maxBytes - repoSize.get();
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}
@Override
public ContentClaim create(boolean lossTolerant) throws IOException {
if (lossTolerant) {
return createLossTolerant();
} else {
final ContentRepository backupRepo = getBackupRepository();
if (backupRepo == null) {
// TODO: Loss Tolerance is not yet configurable.
// Therefore, since this is an in-memory content repository, assume that this claim is loss-tolerant if there
// is not a backup repository
return createLossTolerant();
}
final ContentClaim backupClaim = backupRepo.create(lossTolerant);
backupRepoClaimMap.put(backupClaim, backupClaim);
return backupClaim;
}
}
private ContentClaim createLossTolerant() {
final long id = idGenerator.getAndIncrement();
final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false);
final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L);
final ContentBlock contentBlock = new ContentBlock(claim, repoSize);
claimManager.incrementClaimantCount(resourceClaim, true);
claimMap.put(claim, contentBlock);
logger.debug("Created {} and mapped to {}", claim, contentBlock);
return claim;
}
@Override
public int incrementClaimaintCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
return claimManager.incrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().incrementClaimaintCount(backupClaim);
}
}
@Override
public int decrementClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
return claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().decrementClaimantCount(backupClaim);
}
}
@Override
public int getClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
return claimManager.getClaimantCount(resolveClaim(claim).getResourceClaim());
} else {
return getBackupRepository().getClaimantCount(backupClaim);
}
}
@Override
public boolean remove(final ContentClaim claim) {
if (claim == null) {
return false;
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
final ContentBlock content = claimMap.remove(claim);
if (content == null) {
logger.debug("Removed {} from repo but it did not exist", claim);
} else {
logger.debug("Removed {} from repo; Content = {}", claim, content);
content.destroy();
}
} else {
getBackupRepository().remove(backupClaim);
}
return true;
}
private boolean remove(final ResourceClaim claim) {
if (claim == null) {
return false;
}
final Set<ContentClaim> contentClaims = new HashSet<>();
for (final Map.Entry<ContentClaim, ContentBlock> entry : claimMap.entrySet()) {
final ContentClaim contentClaim = entry.getKey();
if (contentClaim.getResourceClaim().equals(claim)) {
contentClaims.add(contentClaim);
}
}
boolean removed = false;
for (final ContentClaim contentClaim : contentClaims) {
if (remove(contentClaim)) {
removed = true;
}
}
return removed;
}
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
final ContentClaim createdClaim = create(lossTolerant);
try (final InputStream dataIn = read(original)) {
final ContentRepository createdClaimRepo = lossTolerant ? this : getBackupRepository();
if (createdClaimRepo == null) {
throw new IllegalStateException("Cannot create non-loss-tolerant ContentClaim because there is no persistent Content Repository configured");
}
try (final OutputStream dataOut = createdClaimRepo.write(createdClaim)) {
StreamUtils.copy(dataIn, dataOut);
}
}
return createdClaim;
}
@Override
public long merge(final Collection<ContentClaim> claims, final ContentClaim destination, final byte[] header, final byte[] footer, final byte[] demarcator) throws IOException {
long bytes = 0L;
try (final OutputStream out = write(destination)) {
if (header != null) {
out.write(header);
bytes += header.length;
}
final Iterator<ContentClaim> itr = claims.iterator();
while (itr.hasNext()) {
final ContentClaim readClaim = itr.next();
try (final InputStream in = read(readClaim)) {
bytes += StreamUtils.copy(in, out);
}
if (itr.hasNext() && demarcator != null) {
bytes += demarcator.length;
out.write(demarcator);
}
}
if (footer != null) {
bytes += footer.length;
out.write(footer);
}
return bytes;
}
}
@Override
public long importFrom(final Path content, final ContentClaim claim) throws IOException {
try (final InputStream in = new FileInputStream(content.toFile())) {
return importFrom(in, claim);
}
}
@Override
public long importFrom(final InputStream in, final ContentClaim claim) throws IOException {
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
final ContentBlock content = getContent(claim);
content.reset();
return StreamUtils.copy(in, content.write());
} else {
return getBackupRepository().importFrom(in, claim);
}
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append) throws IOException {
return exportTo(claim, destination, append, 0L, size(claim));
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append, final long offset, final long length) throws IOException {
if (claim == null) {
if (append) {
return 0L;
}
Files.createFile(destination);
return 0L;
}
final StandardOpenOption openOption = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE;
try (final InputStream in = read(claim);
final OutputStream destinationStream = Files.newOutputStream(destination, openOption)) {
if (offset > 0) {
StreamUtils.skip(in, offset);
}
StreamUtils.copy(in, destinationStream, length);
return length;
}
}
@Override
public long exportTo(ContentClaim claim, OutputStream destination) throws IOException {
final InputStream in = read(claim);
try {
return StreamUtils.copy(in, destination);
} finally {
IOUtils.closeQuietly(in);
}
}
@Override
public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException {
final InputStream in = read(claim);
try {
StreamUtils.skip(in, offset);
StreamUtils.copy(in, destination, length);
} finally {
IOUtils.closeQuietly(in);
}
return length;
}
private ContentBlock getContent(final ContentClaim claim) throws ContentNotFoundException {
final ContentBlock content = claimMap.get(claim);
if (content == null) {
throw new ContentNotFoundException(claim);
}
return content;
}
@Override
public long size(final ContentClaim claim) throws IOException {
if (claim == null) {
return 0L;
}
final ContentClaim backupClaim = getBackupClaim(claim);
return backupClaim == null ? getContent(claim).getSize() : getBackupRepository().size(claim);
}
@Override
public long size(final ResourceClaim claim) throws IOException {
return 0;
}
@Override
public InputStream read(final ContentClaim claim) throws IOException {
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
final ContentClaim backupClaim = getBackupClaim(claim);
return backupClaim == null ? getContent(claim).read() : getBackupRepository().read(backupClaim);
}
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
return read(new StandardContentClaim(claim, 0L));
}
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
final ContentClaim backupClaim = getBackupClaim(claim);
return backupClaim == null ? getContent(claim).write() : getBackupRepository().write(backupClaim);
}
@Override
public void purge() {
for (final ContentClaim claim : claimMap.keySet()) {
claimManager.decrementClaimantCount(resolveClaim(claim).getResourceClaim());
final ContentClaim backup = getBackupClaim(claim);
if (backup != null) {
getBackupRepository().remove(backup);
}
}
}
@Override
public void cleanup() {
}
@Override
public boolean isAccessible(final ContentClaim claim) throws IOException {
if (claim == null) {
return false;
}
final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) {
final ContentBlock contentBlock = claimMap.get(claim);
return contentBlock != null;
}
return getBackupRepository().isAccessible(backupClaim);
}
class ContentBlock {
private final ClaimSwitchingOutputStream out;
private final ContentClaim claim;
private final AtomicLong repoSizeCounter;
public ContentBlock(final ContentClaim claim, final AtomicLong repoSizeCounter) {
this.claim = claim;
this.repoSizeCounter = repoSizeCounter;
out = new ClaimSwitchingOutputStream(new ArrayManagedOutputStream(memoryManager) {
@Override
public void write(int b) throws IOException {
try {
final long bufferLengthBefore = getBufferLength();
super.write(b);
final long bufferLengthAfter = getBufferLength();
final long bufferSpaceAdded = bufferLengthAfter - bufferLengthBefore;
if (bufferSpaceAdded > 0) {
repoSizeCounter.addAndGet(bufferSpaceAdded);
}
} catch (final IOException e) {
final byte[] buff = new byte[1];
buff[0] = (byte) (b & 0xFF);
redirect(buff, 0, 1);
}
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
final long bufferLengthBefore = getBufferLength();
super.write(b, off, len);
final long bufferLengthAfter = getBufferLength();
final long bufferSpaceAdded = bufferLengthAfter - bufferLengthBefore;
if (bufferSpaceAdded > 0) {
repoSizeCounter.addAndGet(bufferSpaceAdded);
}
} catch (final IOException e) {
redirect(b, off, len);
}
}
private void redirect(byte[] b, int off, int len) throws IOException {
logger.debug("Redirecting {}", claim);
out.redirect();
out.write(b, off, len);
}
});
}
public synchronized long getSize() throws IOException {
return out.getSize();
}
public synchronized OutputStream write() {
return out;
}
public synchronized InputStream read() throws IOException {
return out.read();
}
public synchronized void reset() {
out.reset();
}
public synchronized void destroy() {
out.destroy();
}
private class ClaimSwitchingOutputStream extends FilterOutputStream {
private ArrayManagedOutputStream amos;
private OutputStream out;
public ClaimSwitchingOutputStream(final ArrayManagedOutputStream out) {
super(out);
this.amos = out;
this.out = out;
}
@Override
public void write(final byte[] b) throws IOException {
out.write(b);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
out.write(b, off, len);
}
@Override
public void write(final int b) throws IOException {
out.write(b);
}
public void destroy() {
final int vcosLength = amos.getBufferLength();
amos.destroy();
amos = null;
repoSizeCounter.addAndGet(-vcosLength);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
public void reset() {
amos.reset();
}
private void redirect() throws IOException {
final ContentRepository backupRepo = getBackupRepository();
if (backupRepo == null) {
throw new IOException("Content Repository is out of space");
}
final ContentClaim backupClaim = backupRepo.create(true);
backupRepoClaimMap.put(claim, backupClaim);
out = backupRepo.write(backupClaim);
amos.writeTo(out);
amos.destroy();
amos = null;
}
public long getSize() throws IOException {
if (amos == null) {
final ContentClaim backupClaim = getBackupClaim(claim);
return getBackupRepository().size(backupClaim);
} else {
return amos.size();
}
}
public InputStream read() throws IOException {
if (amos == null) {
final ContentClaim backupClaim = getBackupClaim(claim);
return getBackupRepository().read(backupClaim);
} else {
return amos.newInputStream();
}
}
}
}
private class CleanupOldClaims implements Runnable {
@Override
public void run() {
final List<ResourceClaim> destructable = new ArrayList<>(1000);
while (true) {
destructable.clear();
claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS);
if (destructable.isEmpty()) {
return;
}
for (final ResourceClaim claim : destructable) {
remove(claim);
}
}
}
}
}

View File

@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.repository.FileSystemRepository
org.apache.nifi.controller.repository.VolatileContentRepository
org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository

View File

@ -1,188 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
public class TestVolatileContentRepository {
private ResourceClaimManager claimManager;
@Before
public void setup() {
claimManager = new StandardResourceClaimManager();
}
@Test
public void testRedirects() throws IOException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestVolatileContentRepository.class.getResource("/conf/nifi.properties").getFile());
final Map<String, String> addProps = new HashMap<>();
addProps.put(VolatileContentRepository.MAX_SIZE_PROPERTY, "10 MB");
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final ContentClaim claim = contentRepo.create(true);
final OutputStream out = contentRepo.write(claim);
final byte[] oneK = new byte[1024];
Arrays.fill(oneK, (byte) 55);
// Write 10 MB to the repo
for (int i = 0; i < 10240; i++) {
out.write(oneK);
}
try {
out.write(1);
Assert.fail("Expected to be out of space on content repo");
} catch (final IOException e) {
}
try {
out.write(1);
Assert.fail("Expected to be out of space on content repo");
} catch (final IOException e) {
}
final ContentRepository mockRepo = Mockito.mock(ContentRepository.class);
contentRepo.setBackupRepository(mockRepo);
final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
Mockito.when(mockRepo.create(ArgumentMatchers.anyBoolean())).thenReturn(contentClaim);
final ByteArrayOutputStream overflowStream = new ByteArrayOutputStream();
Mockito.when(mockRepo.write(ArgumentMatchers.any(ContentClaim.class))).thenReturn(overflowStream);
out.write(10);
assertEquals(1024 * 1024 * 10 + 1, overflowStream.size());
final byte[] overflowBuffer = overflowStream.toByteArray();
assertEquals(55, overflowBuffer[0]);
for (int i = 0; i < overflowBuffer.length; i++) {
if (i == overflowBuffer.length - 1) {
assertEquals(10, overflowBuffer[i]);
} else {
assertEquals(55, overflowBuffer[i]);
}
}
}
@Test
public void testMemoryIsFreed() throws IOException, InterruptedException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestVolatileContentRepository.class.getResource("/conf/nifi.properties").getFile());
final Map<String, String> addProps = new HashMap<>();
addProps.put(VolatileContentRepository.MAX_SIZE_PROPERTY, "11 MB");
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final byte[] oneK = new byte[1024];
Arrays.fill(oneK, (byte) 55);
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int t = 0; t < 10; t++) {
final Runnable r = new Runnable() {
@Override
public void run() {
try {
for (int j = 0; j < 10000; j++) {
final ContentClaim claim = contentRepo.create(true);
final OutputStream out = contentRepo.write(claim);
// Write 1 MB to the repo
for (int i = 0; i < 1024; i++) {
out.write(oneK);
}
final int count = contentRepo.decrementClaimantCount(claim);
if (count <= 0) {
contentRepo.remove(claim);
}
}
} catch (final Exception e) {
e.printStackTrace();
}
}
};
exec.submit(r);
}
exec.shutdown();
exec.awaitTermination(100000, TimeUnit.MINUTES);
}
@Test
public void testSimpleReadWrite() throws IOException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestVolatileContentRepository.class.getResource("/conf/nifi.properties").getFile());
final Map<String, String> addProps = new HashMap<>();
addProps.put(VolatileContentRepository.MAX_SIZE_PROPERTY, "11 MB");
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, addProps);
final VolatileContentRepository contentRepo = new VolatileContentRepository(nifiProps);
contentRepo.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
final ContentClaim claim = contentRepo.create(true);
final OutputStream out = contentRepo.write(claim);
final int byteCount = 2398473 * 4;
final byte[] x = new byte[4];
x[0] = 48;
x[1] = 29;
x[2] = 49;
x[3] = 51;
for (int i = 0; i < byteCount / 4; i++) {
out.write(x);
}
out.close();
final InputStream in = contentRepo.read(claim);
for (int i = 0; i < byteCount; i++) {
final int val = in.read();
final int index = i % 4;
final byte expectedVal = x[index];
assertEquals(expectedVal, val);
}
assertEquals(-1, in.read());
}
}

View File

@ -48,7 +48,7 @@ nifi.swap.out.period=5 sec
nifi.swap.out.threads=4
# Content Repository
nifi.content.repository.implementation=org.apache.nifi.controller.repository.VolatileContentRepository
nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
nifi.volatile.content.repository.max.size=1 KB
nifi.volatile.content.repository.block.size=1 KB
nifi.content.claim.max.appendable.size=10 MB