NIFI-3736: change to honor nifi.content.claim.max.appendable.size and nifi.content.claim.max.flow.files properties. Added 100 MB cap for NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE

This closes #2010.
This commit is contained in:
m-hogue 2017-07-14 12:50:45 -04:00 committed by Mark Payne
parent d334532b16
commit c54b2ad81c
4 changed files with 58 additions and 13 deletions

View File

@ -220,6 +220,7 @@ public abstract class NiFiProperties {
public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256"; public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256";
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min"; public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min";
public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100; public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100;
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB";
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000; public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap"; public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap";
public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec"; public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec";
@ -924,6 +925,15 @@ public abstract class NiFiProperties {
return provenanceRepositoryPaths; return provenanceRepositoryPaths;
} }
/**
* Returns the number of claims to keep open for writing. Ideally, this will be at
* least as large as the number of threads that will be updating the repository simultaneously but we don't want
* to get too large because it will hold open up to this many FileOutputStreams.
*
* Default is {@link #DEFAULT_MAX_FLOWFILES_PER_CLAIM}
*
* @return the maximum number of flow files per claim
*/
public int getMaxFlowFilesPerClaim() { public int getMaxFlowFilesPerClaim() {
try { try {
return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM));
@ -932,8 +942,16 @@ public abstract class NiFiProperties {
} }
} }
/**
* Returns the maximum size, in bytes, that claims should grow before writing a new file. This means that we won't continually write to one
* file that keeps growing but gives us a chance to bunch together many small files.
*
* Default is {@link #DEFAULT_MAX_APPENDABLE_CLAIM_SIZE}
*
* @return the maximum appendable claim size
*/
public String getMaxAppendableClaimSize() { public String getMaxAppendableClaimSize() {
return getProperty(MAX_APPENDABLE_CLAIM_SIZE); return getProperty(MAX_APPENDABLE_CLAIM_SIZE, DEFAULT_MAX_APPENDABLE_CLAIM_SIZE);
} }
public String getProperty(final String key, final String defaultValue) { public String getProperty(final String key, final String defaultValue) {

View File

@ -66,6 +66,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
@ -85,6 +86,9 @@ public class FileSystemRepository implements ContentRepository {
public static final int SECTIONS_PER_CONTAINER = 1024; public static final int SECTIONS_PER_CONTAINER = 1024;
public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000; public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000;
public static final String ARCHIVE_DIR_NAME = "archive"; public static final String ARCHIVE_DIR_NAME = "archive";
// 100 MB cap for the configurable NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE property to prevent
// unnecessarily large resource claim files
public static final String APPENDABLE_CLAIM_LENGTH_CAP = "100 MB";
public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%"); public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%");
private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class); private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class);
@ -97,22 +101,23 @@ public class FileSystemRepository implements ContentRepository {
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>(); private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>();
// 1 MB. This could be adjusted but 1 MB seems reasonable, as it means that we won't continually write to one
// file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues
// with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
static final int MAX_APPENDABLE_CLAIM_LENGTH = 1024 * 1024;
// Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at // Queue for claims that are kept open for writing. Ideally, this will be at
// least as large as the number of threads that will be updating the repository simultaneously but we don't want // least as large as the number of threads that will be updating the repository simultaneously but we don't want
// to get too large because it will hold open up to this many FileOutputStreams. // to get too large because it will hold open up to this many FileOutputStreams.
// The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain // The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain
// the OutputStream that we can use for writing to the claim. // the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final BlockingQueue<ClaimLengthPair> writableClaimQueue;
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final boolean archiveData; private final boolean archiveData;
// 1 MB default, as it means that we won't continually write to one
// file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues
// with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
private final long maxAppendableClaimLength;
private final int maxFlowFilesPerClaim;
private final long maxArchiveMillis; private final long maxArchiveMillis;
private final Map<String, Long> minUsableContainerBytesForArchive = new HashMap<>(); private final Map<String, Long> minUsableContainerBytesForArchive = new HashMap<>();
private final boolean alwaysSync; private final boolean alwaysSync;
@ -140,6 +145,9 @@ public class FileSystemRepository implements ContentRepository {
alwaysSync = false; alwaysSync = false;
containerCleanupExecutor = null; containerCleanupExecutor = null;
nifiProperties = null; nifiProperties = null;
maxAppendableClaimLength = 0;
maxFlowFilesPerClaim = 0;
writableClaimQueue = null;
} }
public FileSystemRepository(final NiFiProperties nifiProperties) throws IOException { public FileSystemRepository(final NiFiProperties nifiProperties) throws IOException {
@ -149,6 +157,20 @@ public class FileSystemRepository implements ContentRepository {
for (final Path path : fileRespositoryPaths.values()) { for (final Path path : fileRespositoryPaths.values()) {
Files.createDirectories(path); Files.createDirectories(path);
} }
this.maxFlowFilesPerClaim = nifiProperties.getMaxFlowFilesPerClaim();
this.writableClaimQueue = new LinkedBlockingQueue<>(maxFlowFilesPerClaim);
final long configuredAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue();
final long appendableClaimLengthCap = DataUnit.parseDataSize(APPENDABLE_CLAIM_LENGTH_CAP, DataUnit.B).longValue();
if (configuredAppendableClaimLength > appendableClaimLengthCap) {
LOG.warn("Configured property '{}' with value {} exceeds cap of {}. Setting value to {}",
NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE,
configuredAppendableClaimLength,
APPENDABLE_CLAIM_LENGTH_CAP,
APPENDABLE_CLAIM_LENGTH_CAP);
this.maxAppendableClaimLength = appendableClaimLengthCap;
} else {
this.maxAppendableClaimLength = configuredAppendableClaimLength;
}
this.containers = new HashMap<>(fileRespositoryPaths); this.containers = new HashMap<>(fileRespositoryPaths);
this.containerNames = new ArrayList<>(containers.keySet()); this.containerNames = new ArrayList<>(containers.keySet());
@ -948,7 +970,7 @@ public class FileSystemRepository implements ContentRepository {
// is called. In this case, we don't have to actually close the file stream. Instead, we // is called. In this case, we don't have to actually close the file stream. Instead, we
// can just add it onto the queue and continue to use it for the next content claim. // can just add it onto the queue and continue to use it for the next content claim.
final long resourceClaimLength = scc.getOffset() + scc.getLength(); final long resourceClaimLength = scc.getOffset() + scc.getLength();
if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) { if (recycle && resourceClaimLength < maxAppendableClaimLength) {
final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
// We are checking that writableClaimStreams contains the resource claim as a key, as a sanity check. // We are checking that writableClaimStreams contains the resource claim as a key, as a sanity check.

View File

@ -50,6 +50,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.util.DiskUtils; import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.junit.After; import org.junit.After;
@ -286,10 +287,12 @@ public class TestFileSystemRepository {
repository.incrementClaimaintCount(claim); repository.incrementClaimaintCount(claim);
final Path claimPath = getPath(claim); final Path claimPath = getPath(claim);
final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize();
final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
// Create the file. // Create the file.
try (final OutputStream out = repository.write(claim)) { try (final OutputStream out = repository.write(claim)) {
out.write(new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]); out.write(new byte[maxClaimLength]);
} }
int count = repository.decrementClaimantCount(claim); int count = repository.decrementClaimantCount(claim);
@ -495,7 +498,9 @@ public class TestFileSystemRepository {
// write at least 1 MB to the output stream so that when we close the output stream // write at least 1 MB to the output stream so that when we close the output stream
// the repo won't keep the stream open. // the repo won't keep the stream open.
final byte[] buff = new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]; final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize();
final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
final byte[] buff = new byte[maxClaimLength];
out.write(buff); out.write(buff);
out.write(buff); out.write(buff);

View File

@ -45,7 +45,7 @@ nifi.swap.out.period=5 sec
nifi.swap.out.threads=4 nifi.swap.out.threads=4
# Content Repository # Content Repository
nifi.content.claim.max.appendable.size=10 MB nifi.content.claim.max.appendable.size=1 MB
nifi.content.claim.max.flow.files=100 nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/content_repository nifi.content.repository.directory.default=./target/content_repository