From 528fce2407d092d4ced1a58fcc14d0bc6e660b89 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 17 Jul 2020 09:12:47 -0400 Subject: [PATCH] NIFI-7646, NIFI-8222: Instead of having StandardProcessSession call ContentRepository.read(ContentClaim), introduced a new ContentRepository.read(ResourceClaim) and hold open the InputStream to the ResourceClaim. This can't be supported by EncryptedContentRepository, so introduced a method to allow using this or not. The benefit here is that when we have many FlowFiles read within a session, such as when using MergeContent/MergeRecord or a processor configured with a Run Duration, we can hold open a single InputStream instead of constantly opening FileInputStreams and seeking to the appropriate location. This is much faster. - Instead of entering a 'synchronized' block for every provenance event, serialize up to 1 MB worth of data, and then enter synchronized block to write that data out. This avoids large amounts of lock contention and context switches NIFI-7646: Removed TODO and unused Jackson dependency Signed-off-by: Matthew Burgess This closes #4818 --- .../nifi/stream/io/LimitingInputStream.java | 7 + .../repository/ContentRepository.java | 18 ++ .../repository/StandardProcessSession.java | 45 ++-- .../io/ContentClaimInputStream.java | 10 +- .../repository/FileSystemRepository.java | 16 ++ .../repository/VolatileContentRepository.java | 5 + .../crypto/EncryptedFileSystemRepository.java | 26 +- .../EncryptedFileSystemRepositoryTest.groovy | 2 +- .../repository/StandardProcessSessionIT.java | 21 ++ .../ByteArrayContentRepository.java | 14 ++ .../EncryptedSchemaRecordWriter.java | 137 ++--------- ...cryptedWriteAheadProvenanceRepository.java | 12 +- .../EventIdFirstSchemaRecordWriter.java | 232 +++++++----------- .../WriteAheadProvenanceRepository.java | 2 + .../CompressableRecordWriter.java | 19 +- .../serialization/RecordReaders.java | 23 +- .../serialization/RecordWriter.java | 19 +- .../store/WriteAheadStorePartition.java | 10 +- .../util/ByteArrayDataOutputStream.java | 39 --- .../util/ByteArrayDataOutputStreamCache.java | 52 ---- ...cryptedSchemaRecordReaderWriterTest.groovy | 6 +- .../AbstractTestRecordReaderWriter.java | 44 ++-- ...tEventIdFirstSchemaRecordReaderWriter.java | 48 ++-- 23 files changed, 345 insertions(+), 462 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java delete mode 100644 nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java index 70c6a321d8..df8391aa61 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -25,6 +25,7 @@ public class LimitingInputStream extends InputStream { private final long limit; private long bytesRead = 0; private volatile boolean limitReached = false; + private long markOffset = -1L; /** * Constructs a limited input stream whereby if the limit is reached all @@ -112,6 +113,7 @@ public class LimitingInputStream extends InputStream { @Override public void mark(int readlimit) { in.mark(readlimit); + markOffset = bytesRead; } @Override @@ -122,6 +124,11 @@ public class LimitingInputStream extends InputStream { @Override public void reset() throws IOException { in.reset(); + + if (markOffset >= 0) { + bytesRead = markOffset; + } + markOffset = -1; } public long getLimit() { diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index 7636966878..97dd1005ed 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -252,6 +252,24 @@ public interface ContentRepository { */ InputStream read(ContentClaim claim) throws IOException; + /** + * Provides access ot the input stream for the entire Resource Claim + * @param claim the resource claim to read from + * @return InputStream over the content of the entire Resource Claim + * @throws IOException if unable to read + */ + InputStream read(ResourceClaim claim) throws IOException; + + /** + * Indicates whether or not this Content Repository supports obtaining an InputStream for + * an entire Resource Claim. If this method returns false, the {@link #read(ResourceClaim)} should not + * be called and instead {@link #read(ContentClaim)} should always be used + * @return true if reading an entire Resource Claim is allowed, false otherwise + */ + default boolean isResourceClaimStreamSupported() { + return true; + } + /** * Obtains an OutputStream to the content for the given claim. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0cdfd05059..1d24f33b36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -59,10 +59,12 @@ import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.Closeable; @@ -145,8 +147,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private int flowFilesIn = 0, flowFilesOut = 0; private long contentSizeIn = 0L, contentSizeOut = 0L; - private ContentClaim currentReadClaim = null; - private ContentClaimInputStream currentReadClaimStream = null; + private ResourceClaim currentReadClaim = null; + private ByteCountingInputStream currentReadClaimStream = null; private long processingStartTime; // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed @@ -2259,12 +2261,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn context.getProvenanceRepository().registerEvents(iterable); context.getFlowFileRepository().updateRepository(expiredRecords); } catch (final IOException e) { - LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e); + LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e.toString(), e); } } - private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset, final boolean allowCachingOfStream) throws ContentNotFoundException { + private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long contentClaimOffset, final boolean allowCachingOfStream) throws ContentNotFoundException { // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know // that there is no actual content. if (flowFile.getSize() == 0L) { @@ -2275,15 +2277,18 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn // If the recursion set is empty, we can use the same input stream that we already have open. However, if // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. - if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) { - if (currentReadClaim == claim) { - if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) { - final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset(); + if (allowCachingOfStream && readRecursionSet.isEmpty() && !writeRecursionSet.contains(flowFile) && context.getContentRepository().isResourceClaimStreamSupported()) { + if (currentReadClaim == claim.getResourceClaim()) { + final long resourceClaimOffset = claim.getOffset() + contentClaimOffset; + if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= resourceClaimOffset) { + final long bytesToSkip = resourceClaimOffset - currentReadClaimStream.getBytesConsumed(); if (bytesToSkip > 0) { StreamUtils.skip(currentReadClaimStream, bytesToSkip); } - return new DisableOnCloseInputStream(currentReadClaimStream); + final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); + final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); + return contentClaimInputStream; } } @@ -2293,17 +2298,25 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn currentReadClaimStream.close(); } - currentReadClaim = claim; - currentReadClaimStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset); + currentReadClaim = claim.getResourceClaim(); + final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim()); + StreamUtils.skip(contentRepoStream, claim.getOffset()); + final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream); + final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset()); + currentReadClaimStream = byteCountingInputStream; - // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can - // reuse the same InputStream for the next FlowFile - final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream); - return disableOnClose; + // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can + // reuse the same InputStream for the next FlowFile. We then need to use a LimitingInputStream to ensure that we don't allow the InputStream + // to be read past the end of the FlowFile (since multiple FlowFiles' contents may be in the given Resource Claim Input Stream). + // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability + // without buffering data in memory. + final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); + final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); + return contentClaimInputStream; } else { claimCache.flush(claim); - final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset); + final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset); return rawInStream; } } catch (final ContentNotFoundException cnfe) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java index 94b9d2e3f7..169f0e2b24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java @@ -39,11 +39,16 @@ public class ContentClaimInputStream extends InputStream { private long markOffset; public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) { + this(contentRepository, contentClaim, claimOffset, null); + } + + public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) { this.contentRepository = contentRepository; this.contentClaim = contentClaim; this.claimOffset = claimOffset; this.currentOffset = claimOffset; + this.delegate = initialDelegate; } private InputStream getDelegate() throws IOException { @@ -132,7 +137,10 @@ public class ContentClaimInputStream extends InputStream { } if (currentOffset != markOffset) { - delegate.close(); + if (delegate != null) { + delegate.close(); + } + formDelegate(); StreamUtils.skip(delegate, markOffset - claimOffset); currentOffset = markOffset; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 498852ad99..71e7d6c3b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -552,6 +552,11 @@ public class FileSystemRepository implements ContentRepository { return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId()); } + public Path getPath(final ResourceClaim resourceClaim, final boolean verifyExists) throws ContentNotFoundException { + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); + return getPath(contentClaim, verifyExists); + } + public Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException { final ResourceClaim resourceClaim = claim.getResourceClaim(); final Path containerPath = containers.get(resourceClaim.getContainer()); @@ -874,6 +879,17 @@ public class FileSystemRepository implements ContentRepository { return claim.getLength(); } + @Override + public InputStream read(final ResourceClaim claim) throws IOException { + if (claim == null) { + return new ByteArrayInputStream(new byte[0]); + } + + final Path path = getPath(claim, true); + final FileInputStream fis = new FileInputStream(path.toFile()); + return fis; + } + @Override public InputStream read(final ContentClaim claim) throws IOException { if (claim == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index ba5a3f96d3..a8c867f954 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -468,6 +468,11 @@ public class VolatileContentRepository implements ContentRepository { return backupClaim == null ? getContent(claim).read() : getBackupRepository().read(backupClaim); } + @Override + public InputStream read(final ResourceClaim claim) throws IOException { + return read(new StandardContentClaim(claim, 0L)); + } + @Override public OutputStream write(final ContentClaim claim) throws IOException { final ContentClaim backupClaim = getBackupClaim(claim); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java index 676639cfd8..f9041d94ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java @@ -16,16 +16,10 @@ */ package org.apache.nifi.controller.repository.crypto; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.security.KeyManagementException; -import javax.crypto.CipherOutputStream; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.repository.FileSystemRepository; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.security.kms.EncryptionException; import org.apache.nifi.security.kms.KeyProvider; @@ -40,6 +34,14 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.crypto.CipherOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.security.KeyManagementException; + /** * This class is an implementation of the {@link FileSystemRepository} content repository which provides transparent * streaming encryption/decryption of content claim data during file system interaction. As of Apache NiFi 1.10.0 @@ -155,6 +157,16 @@ public class EncryptedFileSystemRepository extends FileSystemRepository { return super.exportTo(claim, destination, append, offset, length); } + @Override + public InputStream read(final ResourceClaim claim) { + throw new UnsupportedOperationException("Cannot read full ResourceClaim as a Stream when using EncryptedFileSystemRepository"); + } + + @Override + public boolean isResourceClaimStreamSupported() { + return false; + } + /** * Returns an InputStream (actually a {@link javax.crypto.CipherInputStream}) which wraps * the {@link java.io.FileInputStream} from the content repository claim on disk. This diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy index 5c03690022..7425f33544 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy @@ -161,7 +161,7 @@ class EncryptedFileSystemRepositoryTest { @Test void testReadNullContentClaimShouldReturnEmptyInputStream() { - final InputStream inputStream = repository.read(null) + final InputStream inputStream = repository.read((ContentClaim) null) final int read = inputStream.read() assert read == -1 } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index 505bc3f03b..9c65dce7e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -2745,6 +2745,10 @@ public class StandardProcessSessionIT { private Path getPath(final ContentClaim contentClaim) { final ResourceClaim claim = contentClaim.getResourceClaim(); + return getPath(claim); + } + + private Path getPath(final ResourceClaim claim) { return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId()); } @@ -2806,6 +2810,23 @@ public class StandardProcessSessionIT { } } + @Override + public InputStream read(final ResourceClaim claim) throws IOException { + if (disableRead) { + throw new IOException("Reading from repo is disabled by unit test"); + } + + if (claim == null) { + return new ByteArrayInputStream(new byte[0]); + } + + try { + return new FileInputStream(getPath(claim).toFile()); + } catch (final FileNotFoundException fnfe) { + throw new ContentNotFoundException(null, fnfe); + } + } + @Override public OutputStream write(final ContentClaim claim) throws IOException { final Path path = getPath(claim); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java index 9c42d7e26a..f20acc5224 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java @@ -213,6 +213,20 @@ public class ByteArrayContentRepository implements ContentRepository { return byteArrayContentClaim.read(); } + @Override + public InputStream read(final ResourceClaim claim) throws IOException { + if (claim == null) { + return new ByteArrayInputStream(new byte[0]); + } + + if (!(claim instanceof ByteArrayResourceClaim)) { + throw new IllegalArgumentException("Cannot access Resource Claim " + claim + " because the Resource Claim does not belong to this Content Repository"); + } + + final ByteArrayResourceClaim byteArrayResourceClaim = (ByteArrayResourceClaim) claim; + return byteArrayResourceClaim.read(); + } + @Override public OutputStream write(final ContentClaim claim) { final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java index f84ca4891d..bc88efe5f6 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java @@ -16,49 +16,28 @@ */ package org.apache.nifi.provenance; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.security.KeyManagementException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.toc.TocWriter; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.security.KeyManagementException; +import java.util.concurrent.atomic.AtomicLong; + public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter { private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class); - - private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; - - private ProvenanceEventEncryptor provenanceEventEncryptor; - - private static final TimedBuffer encryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - - private String keyId; - - private int debugFrequency; + public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; public static final int SERIALIZATION_VERSION = 1; - public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; - - public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed, - final int uncompressedBlockSize, final IdentifierLookup idLookup, - ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException, EncryptionException { - this(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY); - } + private ProvenanceEventEncryptor provenanceEventEncryptor; + private String keyId; public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize, final IdentifierLookup idLookup, ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException { super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup); this.provenanceEventEncryptor = provenanceEventEncryptor; - this.debugFrequency = debugFrequency; try { this.keyId = getNextAvailableKeyId(); @@ -69,101 +48,21 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter } @Override - public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException { - final long encryptStart = System.nanoTime(); - byte[] cipherBytes; + protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException { + final byte[] serialized = super.serializeEvent(event); + final String eventId = event.getBestEventIdentifier(); + try { - byte[] serialized; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); - final DataOutputStream dos = new DataOutputStream(baos)) { - writeRecord(record, 0L, dos); - serialized = baos.toByteArray(); - } - String eventId = record.getBestEventIdentifier(); - cipherBytes = encrypt(serialized, eventId); + final byte[] cipherBytes = encrypt(serialized, eventId); + return cipherBytes; } catch (EncryptionException e) { logger.error("Encountered an error: ", e); throw new IOException("Error encrypting the provenance record", e); } - final long encryptStop = System.nanoTime(); - - final long lockStart = System.nanoTime(); - final long writeStart; - final long startBytes; - final long endBytes; - final long recordIdentifier; - synchronized (this) { - writeStart = System.nanoTime(); - try { - recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); - startBytes = getBytesWritten(); - - ensureStreamState(recordIdentifier, startBytes); - - final DataOutputStream out = getBufferedOutputStream(); - final int recordIdOffset = (int) (recordIdentifier - getFirstEventId()); - out.writeInt(recordIdOffset); - out.writeInt(cipherBytes.length); - out.write(cipherBytes); - - getRecordCount().incrementAndGet(); - endBytes = getBytesWritten(); - } catch (final IOException ioe) { - markDirty(); - throw ioe; - } - } - - if (logger.isDebugEnabled()) { - // Collect stats and periodically dump them if log level is set to at least info. - final long writeNanos = System.nanoTime() - writeStart; - getWriteTimes().add(new TimestampedLong(writeNanos)); - - final long serializeNanos = lockStart - encryptStart; - getSerializeTimes().add(new TimestampedLong(serializeNanos)); - - final long encryptNanos = encryptStop - encryptStart; - getEncryptTimes().add(new TimestampedLong(encryptNanos)); - - final long lockNanos = writeStart - lockStart; - getLockTimes().add(new TimestampedLong(lockNanos)); - getBytesWrittenBuffer().add(new TimestampedLong(endBytes - startBytes)); - - final long recordCount = getTotalRecordCount().incrementAndGet(); - if (recordCount % debugFrequency == 0) { - printStats(); - } - } - - final long serializedLength = endBytes - startBytes; - final TocWriter tocWriter = getTocWriter(); - final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex(); - final File file = getFile(); - final String storageLocation = file.getParentFile().getName() + "/" + file.getName(); - return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes); } - private void printStats() { - final long sixtySecondsAgo = System.currentTimeMillis() - 60000L; - final Long writeNanosLast60 = getWriteTimes().getAggregateValue(sixtySecondsAgo).getValue(); - final Long lockNanosLast60 = getLockTimes().getAggregateValue(sixtySecondsAgo).getValue(); - final Long serializeNanosLast60 = getSerializeTimes().getAggregateValue(sixtySecondsAgo).getValue(); - final Long encryptNanosLast60 = getEncryptTimes().getAggregateValue(sixtySecondsAgo).getValue(); - final Long bytesWrittenLast60 = getBytesWrittenBuffer().getAggregateValue(sixtySecondsAgo).getValue(); - logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events, {} millis encrypting events", - TimeUnit.NANOSECONDS.toMillis(writeNanosLast60), - bytesWrittenLast60 / 1024 / 1024, - TimeUnit.NANOSECONDS.toMillis(lockNanosLast60), - TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60), - TimeUnit.NANOSECONDS.toMillis(encryptNanosLast60)); - } - - static TimedBuffer getEncryptTimes() { - return encryptTimes; - } - - private byte[] encrypt(byte[] serialized, String eventId) throws IOException, EncryptionException { - String keyId = getKeyId(); + private byte[] encrypt(byte[] serialized, String eventId) throws EncryptionException { + final String keyId = getKeyId(); try { return provenanceEventEncryptor.encrypt(serialized, eventId, keyId); } catch (Exception e) { @@ -192,8 +91,6 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter @Override public String toString() { - return "EncryptedSchemaRecordWriter" + - " using " + provenanceEventEncryptor + - " and current keyId " + keyId; + return "EncryptedSchemaRecordWriter[keyId=" + keyId + ", encryptor=" + provenanceEventEncryptor + "]"; } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java index f49e8a6eba..24ee0b5f1f 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.provenance; -import java.io.IOException; -import java.security.KeyManagementException; -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.nifi.authorization.Authorizer; @@ -38,6 +34,11 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import java.io.IOException; +import java.security.KeyManagementException; + /** * This class is an implementation of the {@link WriteAheadProvenanceRepository} provenance repository which provides transparent * block encryption/decryption of provenance event data during file system interaction. As of Apache NiFi 1.10.0 @@ -53,10 +54,13 @@ public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanc /** * This constructor exists solely for the use of the Java Service Loader mechanism and should not be used. */ + @SuppressWarnings("unused") public EncryptedWriteAheadProvenanceRepository() { super(); } + // Created via reflection from FlowController + @SuppressWarnings("unused") public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) { super(RepositoryConfiguration.create(nifiProperties)); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java index 05e6736c27..a60dcf4b0c 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java @@ -17,19 +17,6 @@ package org.apache.nifi.provenance; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; @@ -37,21 +24,24 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema; import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.toc.TocWriter; -import org.apache.nifi.provenance.util.ByteArrayDataOutputStream; -import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache; import org.apache.nifi.repository.schema.FieldMapRecord; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.SchemaRecordWriter; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { - private static final Logger logger = LoggerFactory.getLogger(EventIdFirstSchemaRecordWriter.class); - private static final RecordSchema eventSchema = LookupTableEventSchema.EVENT_SCHEMA; private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields()); private static final RecordSchema previousContentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields()); @@ -70,14 +60,6 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { private static final Map eventTypeMap; private static final List eventTypeNames; - private static final TimedBuffer serializeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - private static final TimedBuffer lockTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - private static final TimedBuffer writeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - private static final TimedBuffer bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - private static final AtomicLong totalRecordCount = new AtomicLong(0L); - - private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024); - private long firstEventId; private long systemTimeOffset; @@ -102,94 +84,89 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { queueIdMap = idLookup.invertQueueIdentifiers(); } - public EventIdFirstSchemaRecordWriter(final OutputStream out, final String storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed, - final int uncompressedBlockSize, final IdentifierLookup idLookup) throws IOException { - super(out, storageLocation, idGenerator, tocWriter, compressed, uncompressedBlockSize); - - this.idLookup = idLookup; - componentIdMap = idLookup.invertComponentIdentifiers(); - componentTypeMap = idLookup.invertComponentTypes(); - queueIdMap = idLookup.invertQueueIdentifiers(); - } @Override - public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException { + public Map writeRecords(final Iterable events) throws IOException { if (isDirty()) { throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository"); } - final long lockStart; - final long writeStart; - final long startBytes; - final long endBytes; - final long recordIdentifier; + final int heapThreshold = 1_000_000; + final Map storageSummaries = new HashMap<>(); - final long serializeStart = System.nanoTime(); - final ByteArrayDataOutputStream bados = streamCache.checkOut(); - try { - writeRecord(record, 0L, bados.getDataOutputStream()); + final Map serializedEvents = new LinkedHashMap<>(); + int totalBytes = 0; - lockStart = System.nanoTime(); - synchronized (this) { - writeStart = System.nanoTime(); - try { - recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); - startBytes = getBytesWritten(); + for (final ProvenanceEventRecord event : events) { + final byte[] serialized = serializeEvent(event); + serializedEvents.put(event, serialized); + totalBytes += serialized.length; - ensureStreamState(recordIdentifier, startBytes); - - final DataOutputStream out = getBufferedOutputStream(); - final int recordIdOffset = (int) (recordIdentifier - firstEventId); - out.writeInt(recordIdOffset); - - final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); - out.writeInt(baos.size()); - baos.writeTo(out); - - recordCount.incrementAndGet(); - endBytes = getBytesWritten(); - } catch (final IOException ioe) { - markDirty(); - throw ioe; - } - } - } finally { - streamCache.checkIn(bados); - } - - if (logger.isDebugEnabled()) { - // Collect stats and periodically dump them if log level is set to at least info. - final long writeNanos = System.nanoTime() - writeStart; - writeTimes.add(new TimestampedLong(writeNanos)); - - final long serializeNanos = lockStart - serializeStart; - serializeTimes.add(new TimestampedLong(serializeNanos)); - - final long lockNanos = writeStart - lockStart; - lockTimes.add(new TimestampedLong(lockNanos)); - bytesWritten.add(new TimestampedLong(endBytes - startBytes)); - - final long recordCount = totalRecordCount.incrementAndGet(); - if (recordCount % 1_000_000 == 0) { - final long sixtySecondsAgo = System.currentTimeMillis() - 60000L; - final Long writeNanosLast60 = writeTimes.getAggregateValue(sixtySecondsAgo).getValue(); - final Long lockNanosLast60 = lockTimes.getAggregateValue(sixtySecondsAgo).getValue(); - final Long serializeNanosLast60 = serializeTimes.getAggregateValue(sixtySecondsAgo).getValue(); - final Long bytesWrittenLast60 = bytesWritten.getAggregateValue(sixtySecondsAgo).getValue(); - logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events", - TimeUnit.NANOSECONDS.toMillis(writeNanosLast60), - bytesWrittenLast60 / 1024 / 1024, - TimeUnit.NANOSECONDS.toMillis(lockNanosLast60), - TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60)); + if (totalBytes >= heapThreshold) { + storeEvents(serializedEvents, storageSummaries); + recordCount.addAndGet(serializedEvents.size()); + serializedEvents.clear(); + totalBytes = 0; } } - final long serializedLength = endBytes - startBytes; - final TocWriter tocWriter = getTocWriter(); - final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex(); - final File file = getFile(); - final String storageLocation = file.getParentFile().getName() + "/" + file.getName(); - return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes); + storeEvents(serializedEvents, storageSummaries); + recordCount.addAndGet(serializedEvents.size()); + + return storageSummaries; + } + + protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dataOutputStream = new DataOutputStream(baos)) { + writeRecord(event, 0L, dataOutputStream); + dataOutputStream.flush(); + return baos.toByteArray(); + } + } + + private synchronized void storeEvents(final Map serializedEvents, final Map summaryMap) throws IOException { + for (final Map.Entry entry : serializedEvents.entrySet()) { + final ProvenanceEventRecord event = entry.getKey(); + final byte[] serialized = entry.getValue(); + + final long startBytes; + final long endBytes; + final long recordIdentifier; + + try { + recordIdentifier = event.getEventId() == -1 ? getIdGenerator().getAndIncrement() : event.getEventId(); + startBytes = getBytesWritten(); + + ensureStreamState(recordIdentifier, startBytes); + + final DataOutputStream out = getBufferedOutputStream(); + final int recordIdOffset = (int) (recordIdentifier - firstEventId); + out.writeInt(recordIdOffset); + + out.writeInt(serialized.length); + out.write(serialized); + + endBytes = getBytesWritten(); + } catch (final IOException ioe) { + markDirty(); + throw ioe; + } + + final long serializedLength = endBytes - startBytes; + final TocWriter tocWriter = getTocWriter(); + final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex(); + final File file = getFile(); + final String storageLocation = file.getParentFile().getName() + "/" + file.getName(); + final StorageSummary storageSummary = new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes); + summaryMap.put(event, storageSummary); + } + } + + @Override + public StorageSummary writeRecord(final ProvenanceEventRecord record) { + // This method should never be called because it's only called by super.writeRecords. That method is overridden in this class and never delegates to this method. + throw new UnsupportedOperationException(); } @Override @@ -245,47 +222,4 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { protected String getSerializationName() { return SERIALIZATION_NAME; } - - /* Getters for internal state written to by subclass EncryptedSchemaRecordWriter */ - - IdentifierLookup getIdLookup() { - return idLookup; - } - - SchemaRecordWriter getSchemaRecordWriter() { - return schemaRecordWriter; - } - - AtomicInteger getRecordCount() { - return recordCount; - } - - static TimedBuffer getSerializeTimes() { - return serializeTimes; - } - - static TimedBuffer getLockTimes() { - return lockTimes; - } - - static TimedBuffer getWriteTimes() { - return writeTimes; - } - - static TimedBuffer getBytesWrittenBuffer() { - return bytesWritten; - } - - static AtomicLong getTotalRecordCount() { - return totalRecordCount; - } - - long getFirstEventId() { - return firstEventId; - } - - long getSystemTimeOffset() { - return systemTimeOffset; - } - } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 2dc249d5f1..07eab61c9b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -108,6 +108,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { config = null; } + // Created via reflection from FlowController + @SuppressWarnings("unused") public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) { this(RepositoryConfiguration.create(nifiProperties)); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java index 21e2c0767b..bb679e7cc0 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java @@ -17,14 +17,6 @@ package org.apache.nifi.provenance.serialization; -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.provenance.AbstractRecordWriter; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.toc.TocWriter; @@ -34,6 +26,14 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + public abstract class CompressableRecordWriter extends AbstractRecordWriter { private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class); @@ -145,7 +145,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { } } - protected synchronized void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException { + protected void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException { // add a new block to the TOC if needed. if (getTocWriter() != null && (startBytes - blockStartOffset >= uncompressedBlockSize)) { blockStartOffset = startBytes; @@ -222,4 +222,5 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { protected abstract int getSerializationVersion(); protected abstract String getSerializationName(); + } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index 475c750d93..ad0c5b5289 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.provenance.serialization; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import java.util.Collection; -import java.util.zip.GZIPInputStream; import org.apache.nifi.properties.NiFiPropertiesLoader; import org.apache.nifi.provenance.ByteArraySchemaRecordReader; import org.apache.nifi.provenance.ByteArraySchemaRecordWriter; @@ -44,6 +33,18 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Collection; +import java.util.zip.GZIPInputStream; + public class RecordReaders { private static Logger logger = LoggerFactory.getLogger(RecordReaders.class); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index c9d2a22adf..5b5a9d6cdf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.provenance.serialization; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.toc.TocWriter; + import java.io.Closeable; import java.io.File; import java.io.IOException; - -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.toc.TocWriter; +import java.util.HashMap; +import java.util.Map; public interface RecordWriter extends Closeable { @@ -42,6 +44,17 @@ public interface RecordWriter extends Closeable { */ StorageSummary writeRecord(ProvenanceEventRecord record) throws IOException; + default Map writeRecords(Iterable events) throws IOException { + final Map locationMap = new HashMap<>(); + + for (final ProvenanceEventRecord nextEvent : events) { + final StorageSummary writerSummary = writeRecord(nextEvent); + locationMap.put(nextEvent, writerSummary); + } + + return locationMap; + } + /** * Flushes any data that is held in a buffer to the underlying storage mechanism * diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 1df84f6d41..c32de77336 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -320,11 +320,15 @@ public class WriteAheadStorePartition implements EventStorePartition { try { long maxId = -1L; int numEvents = 0; - for (final ProvenanceEventRecord nextEvent : events) { - final StorageSummary writerSummary = writer.writeRecord(nextEvent); + + final Map writerSummaries = writer.writeRecords(events); + for (final Map.Entry entry : writerSummaries.entrySet()) { + final ProvenanceEventRecord eventRecord = entry.getKey(); + final StorageSummary writerSummary = entry.getValue(); + final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName, writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten()); - locationMap.put(nextEvent, summaryWithIndex); + locationMap.put(eventRecord, summaryWithIndex); maxId = summaryWithIndex.getEventId(); numEvents++; } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java deleted file mode 100644 index 23aefb3ce9..0000000000 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.provenance.util; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; - -public class ByteArrayDataOutputStream { - private final ByteArrayOutputStream baos; - private final DataOutputStream dos; - - public ByteArrayDataOutputStream(final int initialCapacity) { - baos = new ByteArrayOutputStream(initialCapacity); - dos = new DataOutputStream(baos); - } - - public ByteArrayOutputStream getByteArrayOutputStream() { - return baos; - } - - public DataOutputStream getDataOutputStream() { - return dos; - } -} diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java deleted file mode 100644 index 953559093f..0000000000 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.provenance.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public class ByteArrayDataOutputStreamCache { - private final BlockingQueue queue; - private final int initialBufferSize; - private final int maxBufferSize; - - public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) { - this.queue = new LinkedBlockingQueue<>(maxCapacity); - this.initialBufferSize = initialBufferSize; - this.maxBufferSize = maxBufferSize; - } - - public ByteArrayDataOutputStream checkOut() { - final ByteArrayDataOutputStream stream = queue.poll(); - if (stream != null) { - return stream; - } - - return new ByteArrayDataOutputStream(initialBufferSize); - } - - public void checkIn(final ByteArrayDataOutputStream bados) { - final int size = bados.getByteArrayOutputStream().size(); - if (size > maxBufferSize) { - return; - } - - bados.getByteArrayOutputStream().reset(); - queue.offer(bados); - } -} diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy index c146982c2b..784eb9ac03 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy @@ -199,7 +199,7 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit // Act int encryptedRecordId = idGenerator.get() encryptedWriter.writeHeader(encryptedRecordId) - encryptedWriter.writeRecord(record) + encryptedWriter.writeRecords(Collections.singletonList(record)) encryptedWriter.close() logger.info("Wrote encrypted record ${encryptedRecordId} to journal") @@ -242,13 +242,13 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit // Act int standardRecordId = idGenerator.get() standardWriter.writeHeader(standardRecordId) - standardWriter.writeRecord(record) + standardWriter.writeRecords(Collections.singletonList(record)) standardWriter.close() logger.info("Wrote standard record ${standardRecordId} to journal") int encryptedRecordId = idGenerator.get() encryptedWriter.writeHeader(encryptedRecordId) - encryptedWriter.writeRecord(record) + encryptedWriter.writeRecords(Collections.singletonList(record)) encryptedWriter.close() logger.info("Wrote encrypted record ${encryptedRecordId} to journal") diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java index 4b2ca50bd0..ddb6d0219e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java @@ -17,21 +17,6 @@ package org.apache.nifi.provenance; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.toc.StandardTocReader; @@ -43,6 +28,23 @@ import org.apache.nifi.util.file.FileUtils; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public abstract class AbstractTestRecordReaderWriter { @BeforeClass @@ -62,7 +64,7 @@ public abstract class AbstractTestRecordReaderWriter { final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024); writer.writeHeader(1L); - writer.writeRecord(createEvent()); + writer.writeRecords(Collections.singletonList(createEvent())); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -96,7 +98,7 @@ public abstract class AbstractTestRecordReaderWriter { final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192); writer.writeHeader(1L); - writer.writeRecord(createEvent()); + writer.writeRecords(Collections.singletonList(createEvent())); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -117,7 +119,7 @@ public abstract class AbstractTestRecordReaderWriter { writer.writeHeader(1L); for (int i = 0; i < 10; i++) { - writer.writeRecord(createEvent()); + writer.writeRecords(Collections.singletonList(createEvent())); } writer.close(); @@ -156,7 +158,7 @@ public abstract class AbstractTestRecordReaderWriter { writer.writeHeader(1L); for (int i = 0; i < 10; i++) { - writer.writeRecord(createEvent()); + writer.writeRecords(Collections.singletonList(createEvent())); } writer.close(); @@ -198,7 +200,7 @@ public abstract class AbstractTestRecordReaderWriter { for (int i = 0; i < numEvents; i++) { final ProvenanceEventRecord event = createEvent(); events.add(event); - writer.writeRecord(event); + writer.writeRecords(Collections.singletonList(event)); } writer.close(); @@ -208,6 +210,8 @@ public abstract class AbstractTestRecordReaderWriter { final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { for (int i = 0; i < numEvents; i++) { + System.out.println(i); + final Optional eventOption = reader.skipToEvent(i); assertTrue(eventOption.isPresent()); assertEquals(i, eventOption.get().getEventId()); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java index a6833b44d7..b6b86a2926 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java @@ -17,9 +17,19 @@ package org.apache.nifi.provenance; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.toc.StandardTocReader; +import org.apache.nifi.provenance.toc.StandardTocWriter; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.util.file.FileUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; import java.io.File; import java.io.FileInputStream; @@ -34,19 +44,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.nifi.provenance.serialization.RecordReader; -import org.apache.nifi.provenance.serialization.RecordWriter; -import org.apache.nifi.provenance.toc.StandardTocReader; -import org.apache.nifi.provenance.toc.StandardTocWriter; -import org.apache.nifi.provenance.toc.TocReader; -import org.apache.nifi.provenance.toc.TocUtil; -import org.apache.nifi.provenance.toc.TocWriter; -import org.apache.nifi.util.file.FileUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter { private final AtomicLong idGenerator = new AtomicLong(0L); @@ -88,7 +88,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord final ProvenanceEventRecord record = builder.build(); writer.writeHeader(1L); - writer.writeRecord(record); + writer.writeRecords(Collections.singletonList(record)); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -146,7 +146,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord final ProvenanceEventRecord record = builder.build(); writer.writeHeader(1L); - writer.writeRecord(record); + writer.writeRecords(Collections.singletonList(record)); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -203,7 +203,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord final ProvenanceEventRecord record = builder.build(); writer.writeHeader(1L); - writer.writeRecord(record); + writer.writeRecords(Collections.singletonList(record)); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -261,7 +261,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord final ProvenanceEventRecord record = builder.build(); writer.writeHeader(1L); - writer.writeRecord(record); + writer.writeRecords(Collections.singletonList(record)); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -322,7 +322,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord final ProvenanceEventRecord record = builder.build(); writer.writeHeader(500_000L); - writer.writeRecord(record); + writer.writeRecords(Collections.singletonList(record)); writer.close(); final TocReader tocReader = new StandardTocReader(tocFile); @@ -382,12 +382,12 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord builder.setCurrentContentClaim("container-2", "section-2", "identifier-2", 2L, 2L); writer.writeHeader(500_000L); - writer.writeRecord(builder.build()); + writer.writeRecords(Collections.singletonList(builder.build())); builder.setEventId(1_000_001L); builder.setComponentId("4444"); builder.setComponentType("unit-test-component-1"); - writer.writeRecord(builder.build()); + writer.writeRecords(Collections.singletonList(builder.build())); writer.close(); @@ -435,7 +435,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord writer.writeHeader(0L); for (int i = 0; i < 100_000; i++) { - writer.writeRecord(createEvent()); + writer.writeRecords(Collections.singletonList(createEvent())); } }