NIFI-744: Addressed feedback from review, mostly adding documentation to a few points in the code

This commit is contained in:
Mark Payne 2015-08-21 10:52:40 -04:00
parent 68d94cc01b
commit 15a8699dc4
6 changed files with 46 additions and 83 deletions

View File

@ -164,24 +164,6 @@ public interface ContentRepository {
*/ */
long importFrom(Path content, ContentClaim claim) throws IOException; long importFrom(Path content, ContentClaim claim) throws IOException;
/**
* Imports content from the given path to the specified claim, appending or
* replacing the current claim, according to the value of the append
* argument
*
* @return the size of the claim
* @param content to import from
* @param claim the claim to write imported content to
* @param append if true, the content will be appended to the claim; if
* false, the content will replace the contents of the claim
* @throws IOException if unable to read content
*
* @deprecated if needing to append to a content claim, the contents of the claim should be
* copied to a new claim and then the data to append should be written to that new claim.
*/
@Deprecated
long importFrom(Path content, ContentClaim claim, boolean append) throws IOException;
/** /**
* Imports content from the given stream creating a new content object and * Imports content from the given stream creating a new content object and
* claim within the repository. * claim within the repository.
@ -193,22 +175,6 @@ public interface ContentRepository {
*/ */
long importFrom(InputStream content, ContentClaim claim) throws IOException; long importFrom(InputStream content, ContentClaim claim) throws IOException;
/**
* Imports content from the given stream, appending or replacing the current
* claim, according to the value of the appen dargument
*
* @param content to import from
* @param claim to write to
* @param append whether to append or replace
* @return length of data imported in bytes
* @throws IOException if failure to read or write stream
*
* @deprecated if needing to append to a content claim, the contents of the claim should be
* copied to a new claim and then the data to append should be written to that new claim.
*/
@Deprecated
long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException;
/** /**
* Exports the content of the given claim to the given destination. * Exports the content of the given claim to the given destination.
* *

View File

@ -3318,6 +3318,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
lineageIdentifiers.add(parentUUID); lineageIdentifiers.add(parentUUID);
final String newFlowFileUUID = UUID.randomUUID().toString(); final String newFlowFileUUID = UUID.randomUUID().toString();
// We need to create a new FlowFile by populating it with information from the
// Provenance Event. Particularly of note here is that we are setting the FlowFile's
// contentClaimOffset to 0. This is done for backward compatibility reasons. ContentClaim
// used to not have a concept of an offset, and the offset was tied only to the FlowFile. This
// was later refactored, so that the offset was part of the ContentClaim. If we set the offset
// in both places, we'll end up skipping over that many bytes twice instead of once (once to get
// to the beginning of the Content Claim and again to get to the offset within that Content Claim).
// To avoid this, we just always set the offset in the Content Claim itself and set the
// FlowFileRecord's contentClaimOffset to 0.
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
// Copy relevant info from source FlowFile // Copy relevant info from source FlowFile
.addAttributes(event.getPreviousAttributes()) .addAttributes(event.getPreviousAttributes())

View File

@ -66,8 +66,8 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream; import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
@ -94,7 +94,18 @@ public class FileSystemRepository implements ContentRepository {
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>(); private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>();
private final long maxAppendClaimLength = 1024L * 1024L; // 1 MB // 1 MB. This could be adjusted but 1 MB seems reasonable, as it means that we won't continually write to one
// file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues
// with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
private final long maxAppendClaimLength = 1024L * 1024L;
// Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at
// least as large as the number of threads that will be updating the repository simultaneously but we don't want
// to get too large because it will hold open up to this many FileOutputStreams.
// The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain
// the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
@ -235,6 +246,13 @@ public class FileSystemRepository implements ContentRepository {
executor.shutdown(); executor.shutdown();
containerCleanupExecutor.shutdown(); containerCleanupExecutor.shutdown();
// Close any of the writable claim streams that are currently open.
// Other threads may be writing to these streams, and that's okay.
// If that happens, we will simply close the stream, resulting in an
// IOException that will roll back the session. Since this is called
// only on shutdown of the application, we don't have to worry about
// partially written files - on restart, we will simply start writing
// to new files and leave those trailing bytes alone.
for (final OutputStream out : writableClaimStreams.values()) { for (final OutputStream out : writableClaimStreams.values()) {
try { try {
out.close(); out.close();
@ -482,7 +500,13 @@ public class FileSystemRepository implements ContentRepository {
// the queue and incrementing the associated claimant count MUST be done atomically. // the queue and incrementing the associated claimant count MUST be done atomically.
// This way, if the claimant count is decremented to 0, we can ensure that the // This way, if the claimant count is decremented to 0, we can ensure that the
// claim is not then pulled from the queue and used as another thread is destroying/archiving // claim is not then pulled from the queue and used as another thread is destroying/archiving
// the claim. // the claim. The logic in the remove() method dictates that the underlying file can be
// deleted (or archived) only if the claimant count becomes <= 0 AND there is no other claim on
// the queue that references that file. As a result, we need to ensure that those two conditions
// can be evaluated atomically. In order for that to be the case, we need to also treat the
// removal of a claim from the queue and the incrementing of its claimant count as an atomic
// action to ensure that the comparison of those two conditions is atomic also. As a result,
// we will synchronize on the queue while performing those actions.
final long resourceOffset; final long resourceOffset;
synchronized (writableClaimQueue) { synchronized (writableClaimQueue) {
final ClaimLengthPair pair = writableClaimQueue.poll(); final ClaimLengthPair pair = writableClaimQueue.poll();
@ -571,7 +595,9 @@ public class FileSystemRepository implements ContentRepository {
// we synchronize on the queue here because if the claimant count is 0, // we synchronize on the queue here because if the claimant count is 0,
// we need to be able to remove any instance of that resource claim from the // we need to be able to remove any instance of that resource claim from the
// queue atomically (i.e., the checking of the claimant count plus removal from the queue // queue atomically (i.e., the checking of the claimant count plus removal from the queue
// must be atomic) // must be atomic). The create() method also synchronizes on the queue whenever it
// polls from the queue and increments a claimant count in order to ensure that these
// two conditions can be checked atomically.
synchronized (writableClaimQueue) { synchronized (writableClaimQueue) {
final int claimantCount = resourceClaimManager.getClaimantCount(claim); final int claimantCount = resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
@ -647,24 +673,14 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public long importFrom(final Path content, final ContentClaim claim) throws IOException { public long importFrom(final Path content, final ContentClaim claim) throws IOException {
return importFrom(content, claim, false);
}
@Override
public long importFrom(final Path content, final ContentClaim claim, final boolean append) throws IOException {
try (final InputStream in = Files.newInputStream(content, StandardOpenOption.READ)) { try (final InputStream in = Files.newInputStream(content, StandardOpenOption.READ)) {
return importFrom(in, claim, append); return importFrom(in, claim);
} }
} }
@Override @Override
public long importFrom(final InputStream content, final ContentClaim claim) throws IOException { public long importFrom(final InputStream content, final ContentClaim claim) throws IOException {
return importFrom(content, claim, false); try (final OutputStream out = write(claim, false)) {
}
@Override
public long importFrom(final InputStream content, final ContentClaim claim, final boolean append) throws IOException {
try (final OutputStream out = write(claim, append)) {
return StreamUtils.copy(content, out); return StreamUtils.copy(content, out);
} }
} }

View File

@ -352,33 +352,21 @@ public class VolatileContentRepository implements ContentRepository {
@Override @Override
public long importFrom(final Path content, final ContentClaim claim) throws IOException { public long importFrom(final Path content, final ContentClaim claim) throws IOException {
return importFrom(content, claim, false);
}
@Override
public long importFrom(final Path content, final ContentClaim claim, boolean append) throws IOException {
try (final InputStream in = new FileInputStream(content.toFile())) { try (final InputStream in = new FileInputStream(content.toFile())) {
return importFrom(in, claim, append); return importFrom(in, claim);
} }
} }
@Override @Override
public long importFrom(final InputStream content, final ContentClaim claim) throws IOException { public long importFrom(final InputStream in, final ContentClaim claim) throws IOException {
return importFrom(content, claim, false);
}
@Override
public long importFrom(final InputStream in, final ContentClaim claim, final boolean append) throws IOException {
final ContentClaim backupClaim = getBackupClaim(claim); final ContentClaim backupClaim = getBackupClaim(claim);
if (backupClaim == null) { if (backupClaim == null) {
final ContentBlock content = getContent(claim); final ContentBlock content = getContent(claim);
if (!append) { content.reset();
content.reset();
}
return StreamUtils.copy(in, content.write()); return StreamUtils.copy(in, content.write());
} else { } else {
return getBackupRepository().importFrom(in, claim, append); return getBackupRepository().importFrom(in, claim);
} }
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import org.apache.nifi.controller.FileSystemSwapManager;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;

View File

@ -1089,28 +1089,12 @@ public class TestStandardProcessSession {
return Files.size(content); return Files.size(content);
} }
@Override
public long importFrom(Path content, ContentClaim claim, boolean append) throws IOException {
if (append) {
throw new UnsupportedOperationException();
}
return importFrom(content, claim);
}
@Override @Override
public long importFrom(InputStream content, ContentClaim claim) throws IOException { public long importFrom(InputStream content, ContentClaim claim) throws IOException {
Files.copy(content, getPath(claim)); Files.copy(content, getPath(claim));
return Files.size(getPath(claim)); return Files.size(getPath(claim));
} }
@Override
public long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException {
if (append) {
throw new UnsupportedOperationException();
}
return importFrom(content, claim);
}
@Override @Override
public long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException { public long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();