diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
index 70c6a321d8..df8391aa61 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -25,6 +25,7 @@ public class LimitingInputStream extends InputStream {
private final long limit;
private long bytesRead = 0;
private volatile boolean limitReached = false;
+ private long markOffset = -1L;
/**
* Constructs a limited input stream whereby if the limit is reached all
@@ -112,6 +113,7 @@ public class LimitingInputStream extends InputStream {
@Override
public void mark(int readlimit) {
in.mark(readlimit);
+ markOffset = bytesRead;
}
@Override
@@ -122,6 +124,11 @@ public class LimitingInputStream extends InputStream {
@Override
public void reset() throws IOException {
in.reset();
+
+ if (markOffset >= 0) {
+ bytesRead = markOffset;
+ }
+ markOffset = -1;
}
public long getLimit() {
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index 7636966878..97dd1005ed 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -252,6 +252,24 @@ public interface ContentRepository {
*/
InputStream read(ContentClaim claim) throws IOException;
+ /**
+ * Provides access ot the input stream for the entire Resource Claim
+ * @param claim the resource claim to read from
+ * @return InputStream over the content of the entire Resource Claim
+ * @throws IOException if unable to read
+ */
+ InputStream read(ResourceClaim claim) throws IOException;
+
+ /**
+ * Indicates whether or not this Content Repository supports obtaining an InputStream for
+ * an entire Resource Claim. If this method returns false
, the {@link #read(ResourceClaim)} should not
+ * be called and instead {@link #read(ContentClaim)} should always be used
+ * @return true
if reading an entire Resource Claim is allowed, false
otherwise
+ */
+ default boolean isResourceClaimStreamSupported() {
+ return true;
+ }
+
/**
* Obtains an OutputStream to the content for the given claim.
*
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0cdfd05059..1d24f33b36 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -59,10 +59,12 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -145,8 +147,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L;
- private ContentClaim currentReadClaim = null;
- private ContentClaimInputStream currentReadClaimStream = null;
+ private ResourceClaim currentReadClaim = null;
+ private ByteCountingInputStream currentReadClaimStream = null;
private long processingStartTime;
// List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
@@ -2259,12 +2261,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
context.getProvenanceRepository().registerEvents(iterable);
context.getFlowFileRepository().updateRepository(expiredRecords);
} catch (final IOException e) {
- LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e);
+ LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e.toString(), e);
}
}
- private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset, final boolean allowCachingOfStream) throws ContentNotFoundException {
+ private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long contentClaimOffset, final boolean allowCachingOfStream) throws ContentNotFoundException {
// If there's no content, don't bother going to the Content Repository because it is generally expensive and we know
// that there is no actual content.
if (flowFile.getSize() == 0L) {
@@ -2275,15 +2277,18 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// If the recursion set is empty, we can use the same input stream that we already have open. However, if
// the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
- if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
- if (currentReadClaim == claim) {
- if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) {
- final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset();
+ if (allowCachingOfStream && readRecursionSet.isEmpty() && !writeRecursionSet.contains(flowFile) && context.getContentRepository().isResourceClaimStreamSupported()) {
+ if (currentReadClaim == claim.getResourceClaim()) {
+ final long resourceClaimOffset = claim.getOffset() + contentClaimOffset;
+ if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= resourceClaimOffset) {
+ final long bytesToSkip = resourceClaimOffset - currentReadClaimStream.getBytesConsumed();
if (bytesToSkip > 0) {
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}
- return new DisableOnCloseInputStream(currentReadClaimStream);
+ final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
+ final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
+ return contentClaimInputStream;
}
}
@@ -2293,17 +2298,25 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
currentReadClaimStream.close();
}
- currentReadClaim = claim;
- currentReadClaimStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
+ currentReadClaim = claim.getResourceClaim();
+ final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
+ StreamUtils.skip(contentRepoStream, claim.getOffset());
+ final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
+ final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset());
+ currentReadClaimStream = byteCountingInputStream;
- // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
- // reuse the same InputStream for the next FlowFile
- final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream);
- return disableOnClose;
+ // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can
+ // reuse the same InputStream for the next FlowFile. We then need to use a LimitingInputStream to ensure that we don't allow the InputStream
+ // to be read past the end of the FlowFile (since multiple FlowFiles' contents may be in the given Resource Claim Input Stream).
+ // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability
+ // without buffering data in memory.
+ final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
+ final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
+ return contentClaimInputStream;
} else {
claimCache.flush(claim);
- final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
+ final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset);
return rawInStream;
}
} catch (final ContentNotFoundException cnfe) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
index 94b9d2e3f7..169f0e2b24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
@@ -39,11 +39,16 @@ public class ContentClaimInputStream extends InputStream {
private long markOffset;
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) {
+ this(contentRepository, contentClaim, claimOffset, null);
+ }
+
+ public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) {
this.contentRepository = contentRepository;
this.contentClaim = contentClaim;
this.claimOffset = claimOffset;
this.currentOffset = claimOffset;
+ this.delegate = initialDelegate;
}
private InputStream getDelegate() throws IOException {
@@ -132,7 +137,10 @@ public class ContentClaimInputStream extends InputStream {
}
if (currentOffset != markOffset) {
- delegate.close();
+ if (delegate != null) {
+ delegate.close();
+ }
+
formDelegate();
StreamUtils.skip(delegate, markOffset - claimOffset);
currentOffset = markOffset;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 498852ad99..71e7d6c3b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -552,6 +552,11 @@ public class FileSystemRepository implements ContentRepository {
return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
}
+ public Path getPath(final ResourceClaim resourceClaim, final boolean verifyExists) throws ContentNotFoundException {
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+ return getPath(contentClaim, verifyExists);
+ }
+
public Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
final ResourceClaim resourceClaim = claim.getResourceClaim();
final Path containerPath = containers.get(resourceClaim.getContainer());
@@ -874,6 +879,17 @@ public class FileSystemRepository implements ContentRepository {
return claim.getLength();
}
+ @Override
+ public InputStream read(final ResourceClaim claim) throws IOException {
+ if (claim == null) {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ final Path path = getPath(claim, true);
+ final FileInputStream fis = new FileInputStream(path.toFile());
+ return fis;
+ }
+
@Override
public InputStream read(final ContentClaim claim) throws IOException {
if (claim == null) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index ba5a3f96d3..a8c867f954 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -468,6 +468,11 @@ public class VolatileContentRepository implements ContentRepository {
return backupClaim == null ? getContent(claim).read() : getBackupRepository().read(backupClaim);
}
+ @Override
+ public InputStream read(final ResourceClaim claim) throws IOException {
+ return read(new StandardContentClaim(claim, 0L));
+ }
+
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
final ContentClaim backupClaim = getBackupClaim(claim);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
index 676639cfd8..f9041d94ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
@@ -16,16 +16,10 @@
*/
package org.apache.nifi.controller.repository.crypto;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.security.KeyManagementException;
-import javax.crypto.CipherOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.kms.KeyProvider;
@@ -40,6 +34,14 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.crypto.CipherOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.KeyManagementException;
+
/**
* This class is an implementation of the {@link FileSystemRepository} content repository which provides transparent
* streaming encryption/decryption of content claim data during file system interaction. As of Apache NiFi 1.10.0
@@ -155,6 +157,16 @@ public class EncryptedFileSystemRepository extends FileSystemRepository {
return super.exportTo(claim, destination, append, offset, length);
}
+ @Override
+ public InputStream read(final ResourceClaim claim) {
+ throw new UnsupportedOperationException("Cannot read full ResourceClaim as a Stream when using EncryptedFileSystemRepository");
+ }
+
+ @Override
+ public boolean isResourceClaimStreamSupported() {
+ return false;
+ }
+
/**
* Returns an InputStream (actually a {@link javax.crypto.CipherInputStream}) which wraps
* the {@link java.io.FileInputStream} from the content repository claim on disk. This
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
index 5c03690022..7425f33544 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
@@ -161,7 +161,7 @@ class EncryptedFileSystemRepositoryTest {
@Test
void testReadNullContentClaimShouldReturnEmptyInputStream() {
- final InputStream inputStream = repository.read(null)
+ final InputStream inputStream = repository.read((ContentClaim) null)
final int read = inputStream.read()
assert read == -1
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 505bc3f03b..9c65dce7e3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -2745,6 +2745,10 @@ public class StandardProcessSessionIT {
private Path getPath(final ContentClaim contentClaim) {
final ResourceClaim claim = contentClaim.getResourceClaim();
+ return getPath(claim);
+ }
+
+ private Path getPath(final ResourceClaim claim) {
return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
}
@@ -2806,6 +2810,23 @@ public class StandardProcessSessionIT {
}
}
+ @Override
+ public InputStream read(final ResourceClaim claim) throws IOException {
+ if (disableRead) {
+ throw new IOException("Reading from repo is disabled by unit test");
+ }
+
+ if (claim == null) {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ try {
+ return new FileInputStream(getPath(claim).toFile());
+ } catch (final FileNotFoundException fnfe) {
+ throw new ContentNotFoundException(null, fnfe);
+ }
+ }
+
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
final Path path = getPath(claim);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 9c42d7e26a..f20acc5224 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -213,6 +213,20 @@ public class ByteArrayContentRepository implements ContentRepository {
return byteArrayContentClaim.read();
}
+ @Override
+ public InputStream read(final ResourceClaim claim) throws IOException {
+ if (claim == null) {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ if (!(claim instanceof ByteArrayResourceClaim)) {
+ throw new IllegalArgumentException("Cannot access Resource Claim " + claim + " because the Resource Claim does not belong to this Content Repository");
+ }
+
+ final ByteArrayResourceClaim byteArrayResourceClaim = (ByteArrayResourceClaim) claim;
+ return byteArrayResourceClaim.read();
+ }
+
@Override
public OutputStream write(final ContentClaim claim) {
final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
index f84ca4891d..bc88efe5f6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
@@ -16,49 +16,28 @@
*/
package org.apache.nifi.provenance;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.util.concurrent.atomic.AtomicLong;
+
public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class);
-
- private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000;
-
- private ProvenanceEventEncryptor provenanceEventEncryptor;
-
- private static final TimedBuffer encryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-
- private String keyId;
-
- private int debugFrequency;
+ public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
public static final int SERIALIZATION_VERSION = 1;
- public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
-
- public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
- final int uncompressedBlockSize, final IdentifierLookup idLookup,
- ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException, EncryptionException {
- this(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY);
- }
+ private ProvenanceEventEncryptor provenanceEventEncryptor;
+ private String keyId;
public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
final int uncompressedBlockSize, final IdentifierLookup idLookup,
ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException {
super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup);
this.provenanceEventEncryptor = provenanceEventEncryptor;
- this.debugFrequency = debugFrequency;
try {
this.keyId = getNextAvailableKeyId();
@@ -69,101 +48,21 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
}
@Override
- public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException {
- final long encryptStart = System.nanoTime();
- byte[] cipherBytes;
+ protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
+ final byte[] serialized = super.serializeEvent(event);
+ final String eventId = event.getBestEventIdentifier();
+
try {
- byte[] serialized;
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
- final DataOutputStream dos = new DataOutputStream(baos)) {
- writeRecord(record, 0L, dos);
- serialized = baos.toByteArray();
- }
- String eventId = record.getBestEventIdentifier();
- cipherBytes = encrypt(serialized, eventId);
+ final byte[] cipherBytes = encrypt(serialized, eventId);
+ return cipherBytes;
} catch (EncryptionException e) {
logger.error("Encountered an error: ", e);
throw new IOException("Error encrypting the provenance record", e);
}
- final long encryptStop = System.nanoTime();
-
- final long lockStart = System.nanoTime();
- final long writeStart;
- final long startBytes;
- final long endBytes;
- final long recordIdentifier;
- synchronized (this) {
- writeStart = System.nanoTime();
- try {
- recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
- startBytes = getBytesWritten();
-
- ensureStreamState(recordIdentifier, startBytes);
-
- final DataOutputStream out = getBufferedOutputStream();
- final int recordIdOffset = (int) (recordIdentifier - getFirstEventId());
- out.writeInt(recordIdOffset);
- out.writeInt(cipherBytes.length);
- out.write(cipherBytes);
-
- getRecordCount().incrementAndGet();
- endBytes = getBytesWritten();
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- }
- }
-
- if (logger.isDebugEnabled()) {
- // Collect stats and periodically dump them if log level is set to at least info.
- final long writeNanos = System.nanoTime() - writeStart;
- getWriteTimes().add(new TimestampedLong(writeNanos));
-
- final long serializeNanos = lockStart - encryptStart;
- getSerializeTimes().add(new TimestampedLong(serializeNanos));
-
- final long encryptNanos = encryptStop - encryptStart;
- getEncryptTimes().add(new TimestampedLong(encryptNanos));
-
- final long lockNanos = writeStart - lockStart;
- getLockTimes().add(new TimestampedLong(lockNanos));
- getBytesWrittenBuffer().add(new TimestampedLong(endBytes - startBytes));
-
- final long recordCount = getTotalRecordCount().incrementAndGet();
- if (recordCount % debugFrequency == 0) {
- printStats();
- }
- }
-
- final long serializedLength = endBytes - startBytes;
- final TocWriter tocWriter = getTocWriter();
- final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
- final File file = getFile();
- final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
- return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
}
- private void printStats() {
- final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
- final Long writeNanosLast60 = getWriteTimes().getAggregateValue(sixtySecondsAgo).getValue();
- final Long lockNanosLast60 = getLockTimes().getAggregateValue(sixtySecondsAgo).getValue();
- final Long serializeNanosLast60 = getSerializeTimes().getAggregateValue(sixtySecondsAgo).getValue();
- final Long encryptNanosLast60 = getEncryptTimes().getAggregateValue(sixtySecondsAgo).getValue();
- final Long bytesWrittenLast60 = getBytesWrittenBuffer().getAggregateValue(sixtySecondsAgo).getValue();
- logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events, {} millis encrypting events",
- TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
- bytesWrittenLast60 / 1024 / 1024,
- TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
- TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60),
- TimeUnit.NANOSECONDS.toMillis(encryptNanosLast60));
- }
-
- static TimedBuffer getEncryptTimes() {
- return encryptTimes;
- }
-
- private byte[] encrypt(byte[] serialized, String eventId) throws IOException, EncryptionException {
- String keyId = getKeyId();
+ private byte[] encrypt(byte[] serialized, String eventId) throws EncryptionException {
+ final String keyId = getKeyId();
try {
return provenanceEventEncryptor.encrypt(serialized, eventId, keyId);
} catch (Exception e) {
@@ -192,8 +91,6 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
@Override
public String toString() {
- return "EncryptedSchemaRecordWriter" +
- " using " + provenanceEventEncryptor +
- " and current keyId " + keyId;
+ return "EncryptedSchemaRecordWriter[keyId=" + keyId + ", encryptor=" + provenanceEventEncryptor + "]";
}
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
index f49e8a6eba..24ee0b5f1f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
@@ -16,10 +16,6 @@
*/
package org.apache.nifi.provenance;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.authorization.Authorizer;
@@ -38,6 +34,11 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.security.KeyManagementException;
+
/**
* This class is an implementation of the {@link WriteAheadProvenanceRepository} provenance repository which provides transparent
* block encryption/decryption of provenance event data during file system interaction. As of Apache NiFi 1.10.0
@@ -53,10 +54,13 @@ public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanc
/**
* This constructor exists solely for the use of the Java Service Loader mechanism and should not be used.
*/
+ @SuppressWarnings("unused")
public EncryptedWriteAheadProvenanceRepository() {
super();
}
+ // Created via reflection from FlowController
+ @SuppressWarnings("unused")
public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
super(RepositoryConfiguration.create(nifiProperties));
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
index 05e6736c27..a60dcf4b0c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
@@ -17,19 +17,6 @@
package org.apache.nifi.provenance;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.provenance.schema.EventFieldNames;
import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
import org.apache.nifi.provenance.schema.LookupTableEventRecord;
@@ -37,21 +24,24 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema;
import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.provenance.util.ByteArrayDataOutputStream;
-import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache;
import org.apache.nifi.repository.schema.FieldMapRecord;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordSchema;
import org.apache.nifi.repository.schema.SchemaRecordWriter;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
- private static final Logger logger = LoggerFactory.getLogger(EventIdFirstSchemaRecordWriter.class);
-
private static final RecordSchema eventSchema = LookupTableEventSchema.EVENT_SCHEMA;
private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
private static final RecordSchema previousContentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields());
@@ -70,14 +60,6 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
private static final Map eventTypeMap;
private static final List eventTypeNames;
- private static final TimedBuffer serializeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
- private static final TimedBuffer lockTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
- private static final TimedBuffer writeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
- private static final TimedBuffer bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
- private static final AtomicLong totalRecordCount = new AtomicLong(0L);
-
- private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024);
-
private long firstEventId;
private long systemTimeOffset;
@@ -102,94 +84,89 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
queueIdMap = idLookup.invertQueueIdentifiers();
}
- public EventIdFirstSchemaRecordWriter(final OutputStream out, final String storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed,
- final int uncompressedBlockSize, final IdentifierLookup idLookup) throws IOException {
- super(out, storageLocation, idGenerator, tocWriter, compressed, uncompressedBlockSize);
-
- this.idLookup = idLookup;
- componentIdMap = idLookup.invertComponentIdentifiers();
- componentTypeMap = idLookup.invertComponentTypes();
- queueIdMap = idLookup.invertQueueIdentifiers();
- }
@Override
- public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException {
+ public Map writeRecords(final Iterable events) throws IOException {
if (isDirty()) {
throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
}
- final long lockStart;
- final long writeStart;
- final long startBytes;
- final long endBytes;
- final long recordIdentifier;
+ final int heapThreshold = 1_000_000;
+ final Map storageSummaries = new HashMap<>();
- final long serializeStart = System.nanoTime();
- final ByteArrayDataOutputStream bados = streamCache.checkOut();
- try {
- writeRecord(record, 0L, bados.getDataOutputStream());
+ final Map serializedEvents = new LinkedHashMap<>();
+ int totalBytes = 0;
- lockStart = System.nanoTime();
- synchronized (this) {
- writeStart = System.nanoTime();
- try {
- recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
- startBytes = getBytesWritten();
+ for (final ProvenanceEventRecord event : events) {
+ final byte[] serialized = serializeEvent(event);
+ serializedEvents.put(event, serialized);
+ totalBytes += serialized.length;
- ensureStreamState(recordIdentifier, startBytes);
-
- final DataOutputStream out = getBufferedOutputStream();
- final int recordIdOffset = (int) (recordIdentifier - firstEventId);
- out.writeInt(recordIdOffset);
-
- final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
- out.writeInt(baos.size());
- baos.writeTo(out);
-
- recordCount.incrementAndGet();
- endBytes = getBytesWritten();
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- }
- }
- } finally {
- streamCache.checkIn(bados);
- }
-
- if (logger.isDebugEnabled()) {
- // Collect stats and periodically dump them if log level is set to at least info.
- final long writeNanos = System.nanoTime() - writeStart;
- writeTimes.add(new TimestampedLong(writeNanos));
-
- final long serializeNanos = lockStart - serializeStart;
- serializeTimes.add(new TimestampedLong(serializeNanos));
-
- final long lockNanos = writeStart - lockStart;
- lockTimes.add(new TimestampedLong(lockNanos));
- bytesWritten.add(new TimestampedLong(endBytes - startBytes));
-
- final long recordCount = totalRecordCount.incrementAndGet();
- if (recordCount % 1_000_000 == 0) {
- final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
- final Long writeNanosLast60 = writeTimes.getAggregateValue(sixtySecondsAgo).getValue();
- final Long lockNanosLast60 = lockTimes.getAggregateValue(sixtySecondsAgo).getValue();
- final Long serializeNanosLast60 = serializeTimes.getAggregateValue(sixtySecondsAgo).getValue();
- final Long bytesWrittenLast60 = bytesWritten.getAggregateValue(sixtySecondsAgo).getValue();
- logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events",
- TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
- bytesWrittenLast60 / 1024 / 1024,
- TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
- TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60));
+ if (totalBytes >= heapThreshold) {
+ storeEvents(serializedEvents, storageSummaries);
+ recordCount.addAndGet(serializedEvents.size());
+ serializedEvents.clear();
+ totalBytes = 0;
}
}
- final long serializedLength = endBytes - startBytes;
- final TocWriter tocWriter = getTocWriter();
- final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
- final File file = getFile();
- final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
- return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
+ storeEvents(serializedEvents, storageSummaries);
+ recordCount.addAndGet(serializedEvents.size());
+
+ return storageSummaries;
+ }
+
+ protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream dataOutputStream = new DataOutputStream(baos)) {
+ writeRecord(event, 0L, dataOutputStream);
+ dataOutputStream.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ private synchronized void storeEvents(final Map serializedEvents, final Map summaryMap) throws IOException {
+ for (final Map.Entry entry : serializedEvents.entrySet()) {
+ final ProvenanceEventRecord event = entry.getKey();
+ final byte[] serialized = entry.getValue();
+
+ final long startBytes;
+ final long endBytes;
+ final long recordIdentifier;
+
+ try {
+ recordIdentifier = event.getEventId() == -1 ? getIdGenerator().getAndIncrement() : event.getEventId();
+ startBytes = getBytesWritten();
+
+ ensureStreamState(recordIdentifier, startBytes);
+
+ final DataOutputStream out = getBufferedOutputStream();
+ final int recordIdOffset = (int) (recordIdentifier - firstEventId);
+ out.writeInt(recordIdOffset);
+
+ out.writeInt(serialized.length);
+ out.write(serialized);
+
+ endBytes = getBytesWritten();
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ }
+
+ final long serializedLength = endBytes - startBytes;
+ final TocWriter tocWriter = getTocWriter();
+ final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
+ final File file = getFile();
+ final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
+ final StorageSummary storageSummary = new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
+ summaryMap.put(event, storageSummary);
+ }
+ }
+
+ @Override
+ public StorageSummary writeRecord(final ProvenanceEventRecord record) {
+ // This method should never be called because it's only called by super.writeRecords. That method is overridden in this class and never delegates to this method.
+ throw new UnsupportedOperationException();
}
@Override
@@ -245,47 +222,4 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
protected String getSerializationName() {
return SERIALIZATION_NAME;
}
-
- /* Getters for internal state written to by subclass EncryptedSchemaRecordWriter */
-
- IdentifierLookup getIdLookup() {
- return idLookup;
- }
-
- SchemaRecordWriter getSchemaRecordWriter() {
- return schemaRecordWriter;
- }
-
- AtomicInteger getRecordCount() {
- return recordCount;
- }
-
- static TimedBuffer getSerializeTimes() {
- return serializeTimes;
- }
-
- static TimedBuffer getLockTimes() {
- return lockTimes;
- }
-
- static TimedBuffer getWriteTimes() {
- return writeTimes;
- }
-
- static TimedBuffer getBytesWrittenBuffer() {
- return bytesWritten;
- }
-
- static AtomicLong getTotalRecordCount() {
- return totalRecordCount;
- }
-
- long getFirstEventId() {
- return firstEventId;
- }
-
- long getSystemTimeOffset() {
- return systemTimeOffset;
- }
-
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
index 2dc249d5f1..07eab61c9b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -108,6 +108,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
config = null;
}
+ // Created via reflection from FlowController
+ @SuppressWarnings("unused")
public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
this(RepositoryConfiguration.create(nifiProperties));
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
index 21e2c0767b..bb679e7cc0 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -17,14 +17,6 @@
package org.apache.nifi.provenance.serialization;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.provenance.AbstractRecordWriter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.toc.TocWriter;
@@ -34,6 +26,14 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
public abstract class CompressableRecordWriter extends AbstractRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class);
@@ -145,7 +145,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
}
}
- protected synchronized void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException {
+ protected void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException {
// add a new block to the TOC if needed.
if (getTocWriter() != null && (startBytes - blockStartOffset >= uncompressedBlockSize)) {
blockStartOffset = startBytes;
@@ -222,4 +222,5 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
protected abstract int getSerializationVersion();
protected abstract String getSerializationName();
+
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 475c750d93..ad0c5b5289 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,17 +16,6 @@
*/
package org.apache.nifi.provenance.serialization;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.zip.GZIPInputStream;
import org.apache.nifi.properties.NiFiPropertiesLoader;
import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
@@ -44,6 +33,18 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.zip.GZIPInputStream;
+
public class RecordReaders {
private static Logger logger = LoggerFactory.getLogger(RecordReaders.class);
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index c9d2a22adf..5b5a9d6cdf 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.provenance.serialization;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
+
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.toc.TocWriter;
+import java.util.HashMap;
+import java.util.Map;
public interface RecordWriter extends Closeable {
@@ -42,6 +44,17 @@ public interface RecordWriter extends Closeable {
*/
StorageSummary writeRecord(ProvenanceEventRecord record) throws IOException;
+ default Map writeRecords(Iterable events) throws IOException {
+ final Map locationMap = new HashMap<>();
+
+ for (final ProvenanceEventRecord nextEvent : events) {
+ final StorageSummary writerSummary = writeRecord(nextEvent);
+ locationMap.put(nextEvent, writerSummary);
+ }
+
+ return locationMap;
+ }
+
/**
* Flushes any data that is held in a buffer to the underlying storage mechanism
*
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 1df84f6d41..c32de77336 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -320,11 +320,15 @@ public class WriteAheadStorePartition implements EventStorePartition {
try {
long maxId = -1L;
int numEvents = 0;
- for (final ProvenanceEventRecord nextEvent : events) {
- final StorageSummary writerSummary = writer.writeRecord(nextEvent);
+
+ final Map writerSummaries = writer.writeRecords(events);
+ for (final Map.Entry entry : writerSummaries.entrySet()) {
+ final ProvenanceEventRecord eventRecord = entry.getKey();
+ final StorageSummary writerSummary = entry.getValue();
+
final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName,
writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten());
- locationMap.put(nextEvent, summaryWithIndex);
+ locationMap.put(eventRecord, summaryWithIndex);
maxId = summaryWithIndex.getEventId();
numEvents++;
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
deleted file mode 100644
index 23aefb3ce9..0000000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-
-public class ByteArrayDataOutputStream {
- private final ByteArrayOutputStream baos;
- private final DataOutputStream dos;
-
- public ByteArrayDataOutputStream(final int initialCapacity) {
- baos = new ByteArrayOutputStream(initialCapacity);
- dos = new DataOutputStream(baos);
- }
-
- public ByteArrayOutputStream getByteArrayOutputStream() {
- return baos;
- }
-
- public DataOutputStream getDataOutputStream() {
- return dos;
- }
-}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
deleted file mode 100644
index 953559093f..0000000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.util;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class ByteArrayDataOutputStreamCache {
- private final BlockingQueue queue;
- private final int initialBufferSize;
- private final int maxBufferSize;
-
- public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) {
- this.queue = new LinkedBlockingQueue<>(maxCapacity);
- this.initialBufferSize = initialBufferSize;
- this.maxBufferSize = maxBufferSize;
- }
-
- public ByteArrayDataOutputStream checkOut() {
- final ByteArrayDataOutputStream stream = queue.poll();
- if (stream != null) {
- return stream;
- }
-
- return new ByteArrayDataOutputStream(initialBufferSize);
- }
-
- public void checkIn(final ByteArrayDataOutputStream bados) {
- final int size = bados.getByteArrayOutputStream().size();
- if (size > maxBufferSize) {
- return;
- }
-
- bados.getByteArrayOutputStream().reset();
- queue.offer(bados);
- }
-}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
index c146982c2b..784eb9ac03 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
@@ -199,7 +199,7 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
// Act
int encryptedRecordId = idGenerator.get()
encryptedWriter.writeHeader(encryptedRecordId)
- encryptedWriter.writeRecord(record)
+ encryptedWriter.writeRecords(Collections.singletonList(record))
encryptedWriter.close()
logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
@@ -242,13 +242,13 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
// Act
int standardRecordId = idGenerator.get()
standardWriter.writeHeader(standardRecordId)
- standardWriter.writeRecord(record)
+ standardWriter.writeRecords(Collections.singletonList(record))
standardWriter.close()
logger.info("Wrote standard record ${standardRecordId} to journal")
int encryptedRecordId = idGenerator.get()
encryptedWriter.writeHeader(encryptedRecordId)
- encryptedWriter.writeRecord(record)
+ encryptedWriter.writeRecords(Collections.singletonList(record))
encryptedWriter.close()
logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
index 4b2ca50bd0..ddb6d0219e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -17,21 +17,6 @@
package org.apache.nifi.provenance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.StandardTocReader;
@@ -43,6 +28,23 @@ import org.apache.nifi.util.file.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
public abstract class AbstractTestRecordReaderWriter {
@BeforeClass
@@ -62,7 +64,7 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(1L);
- writer.writeRecord(createEvent());
+ writer.writeRecords(Collections.singletonList(createEvent()));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -96,7 +98,7 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
writer.writeHeader(1L);
- writer.writeRecord(createEvent());
+ writer.writeRecords(Collections.singletonList(createEvent()));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -117,7 +119,7 @@ public abstract class AbstractTestRecordReaderWriter {
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
- writer.writeRecord(createEvent());
+ writer.writeRecords(Collections.singletonList(createEvent()));
}
writer.close();
@@ -156,7 +158,7 @@ public abstract class AbstractTestRecordReaderWriter {
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
- writer.writeRecord(createEvent());
+ writer.writeRecords(Collections.singletonList(createEvent()));
}
writer.close();
@@ -198,7 +200,7 @@ public abstract class AbstractTestRecordReaderWriter {
for (int i = 0; i < numEvents; i++) {
final ProvenanceEventRecord event = createEvent();
events.add(event);
- writer.writeRecord(event);
+ writer.writeRecords(Collections.singletonList(event));
}
writer.close();
@@ -208,6 +210,8 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < numEvents; i++) {
+ System.out.println(i);
+
final Optional eventOption = reader.skipToEvent(i);
assertTrue(eventOption.isPresent());
assertEquals(i, eventOption.get().getEventId());
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
index a6833b44d7..b6b86a2926 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
@@ -17,9 +17,19 @@
package org.apache.nifi.provenance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
@@ -34,19 +44,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.provenance.toc.StandardTocReader;
-import org.apache.nifi.provenance.toc.StandardTocWriter;
-import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.provenance.toc.TocUtil;
-import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter {
private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -88,7 +88,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L);
- writer.writeRecord(record);
+ writer.writeRecords(Collections.singletonList(record));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -146,7 +146,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L);
- writer.writeRecord(record);
+ writer.writeRecords(Collections.singletonList(record));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -203,7 +203,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L);
- writer.writeRecord(record);
+ writer.writeRecords(Collections.singletonList(record));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -261,7 +261,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L);
- writer.writeRecord(record);
+ writer.writeRecords(Collections.singletonList(record));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -322,7 +322,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build();
writer.writeHeader(500_000L);
- writer.writeRecord(record);
+ writer.writeRecords(Collections.singletonList(record));
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -382,12 +382,12 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
builder.setCurrentContentClaim("container-2", "section-2", "identifier-2", 2L, 2L);
writer.writeHeader(500_000L);
- writer.writeRecord(builder.build());
+ writer.writeRecords(Collections.singletonList(builder.build()));
builder.setEventId(1_000_001L);
builder.setComponentId("4444");
builder.setComponentType("unit-test-component-1");
- writer.writeRecord(builder.build());
+ writer.writeRecords(Collections.singletonList(builder.build()));
writer.close();
@@ -435,7 +435,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
writer.writeHeader(0L);
for (int i = 0; i < 100_000; i++) {
- writer.writeRecord(createEvent());
+ writer.writeRecords(Collections.singletonList(createEvent()));
}
}