NIFI-7646, NIFI-8222: Instead of having StandardProcessSession call ContentRepository.read(ContentClaim), introduced a new ContentRepository.read(ResourceClaim) and hold open the InputStream to the ResourceClaim. This can't be supported by EncryptedContentRepository, so introduced a method to allow using this or not. The benefit here is that when we have many FlowFiles read within a session, such as when using MergeContent/MergeRecord or a processor configured with a Run Duration, we can hold open a single InputStream instead of constantly opening FileInputStreams and seeking to the appropriate location. This is much faster.

- Instead of entering a 'synchronized' block for every provenance event, serialize up to 1 MB worth of data, and then enter synchronized block to write that data out. This avoids large amounts of lock contention and context switches

NIFI-7646: Removed TODO and unused Jackson dependency

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4818
This commit is contained in:
Mark Payne 2020-07-17 09:12:47 -04:00 committed by Matthew Burgess
parent 200c04c6d0
commit 528fce2407
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
23 changed files with 345 additions and 462 deletions

View File

@ -25,6 +25,7 @@ public class LimitingInputStream extends InputStream {
private final long limit; private final long limit;
private long bytesRead = 0; private long bytesRead = 0;
private volatile boolean limitReached = false; private volatile boolean limitReached = false;
private long markOffset = -1L;
/** /**
* Constructs a limited input stream whereby if the limit is reached all * Constructs a limited input stream whereby if the limit is reached all
@ -112,6 +113,7 @@ public class LimitingInputStream extends InputStream {
@Override @Override
public void mark(int readlimit) { public void mark(int readlimit) {
in.mark(readlimit); in.mark(readlimit);
markOffset = bytesRead;
} }
@Override @Override
@ -122,6 +124,11 @@ public class LimitingInputStream extends InputStream {
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
in.reset(); in.reset();
if (markOffset >= 0) {
bytesRead = markOffset;
}
markOffset = -1;
} }
public long getLimit() { public long getLimit() {

View File

@ -252,6 +252,24 @@ public interface ContentRepository {
*/ */
InputStream read(ContentClaim claim) throws IOException; InputStream read(ContentClaim claim) throws IOException;
/**
* Provides access ot the input stream for the entire Resource Claim
* @param claim the resource claim to read from
* @return InputStream over the content of the entire Resource Claim
* @throws IOException if unable to read
*/
InputStream read(ResourceClaim claim) throws IOException;
/**
* Indicates whether or not this Content Repository supports obtaining an InputStream for
* an entire Resource Claim. If this method returns <code>false</code>, the {@link #read(ResourceClaim)} should not
* be called and instead {@link #read(ContentClaim)} should always be used
* @return <code>true</code> if reading an entire Resource Claim is allowed, <code>false</code> otherwise
*/
default boolean isResourceClaimStreamSupported() {
return true;
}
/** /**
* Obtains an OutputStream to the content for the given claim. * Obtains an OutputStream to the content for the given claim.
* *

View File

@ -59,10 +59,12 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.Closeable; import java.io.Closeable;
@ -145,8 +147,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private int flowFilesIn = 0, flowFilesOut = 0; private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L; private long contentSizeIn = 0L, contentSizeOut = 0L;
private ContentClaim currentReadClaim = null; private ResourceClaim currentReadClaim = null;
private ContentClaimInputStream currentReadClaimStream = null; private ByteCountingInputStream currentReadClaimStream = null;
private long processingStartTime; private long processingStartTime;
// List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
@ -2259,12 +2261,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
context.getProvenanceRepository().registerEvents(iterable); context.getProvenanceRepository().registerEvents(iterable);
context.getFlowFileRepository().updateRepository(expiredRecords); context.getFlowFileRepository().updateRepository(expiredRecords);
} catch (final IOException e) { } catch (final IOException e) {
LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e); LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e.toString(), e);
} }
} }
private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset, final boolean allowCachingOfStream) throws ContentNotFoundException { private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long contentClaimOffset, final boolean allowCachingOfStream) throws ContentNotFoundException {
// If there's no content, don't bother going to the Content Repository because it is generally expensive and we know // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know
// that there is no actual content. // that there is no actual content.
if (flowFile.getSize() == 0L) { if (flowFile.getSize() == 0L) {
@ -2275,15 +2277,18 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// If the recursion set is empty, we can use the same input stream that we already have open. However, if // If the recursion set is empty, we can use the same input stream that we already have open. However, if
// the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) { if (allowCachingOfStream && readRecursionSet.isEmpty() && !writeRecursionSet.contains(flowFile) && context.getContentRepository().isResourceClaimStreamSupported()) {
if (currentReadClaim == claim) { if (currentReadClaim == claim.getResourceClaim()) {
if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) { final long resourceClaimOffset = claim.getOffset() + contentClaimOffset;
final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset(); if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= resourceClaimOffset) {
final long bytesToSkip = resourceClaimOffset - currentReadClaimStream.getBytesConsumed();
if (bytesToSkip > 0) { if (bytesToSkip > 0) {
StreamUtils.skip(currentReadClaimStream, bytesToSkip); StreamUtils.skip(currentReadClaimStream, bytesToSkip);
} }
return new DisableOnCloseInputStream(currentReadClaimStream); final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
return contentClaimInputStream;
} }
} }
@ -2293,17 +2298,25 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
currentReadClaimStream.close(); currentReadClaimStream.close();
} }
currentReadClaim = claim; currentReadClaim = claim.getResourceClaim();
currentReadClaimStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset); final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
StreamUtils.skip(contentRepoStream, claim.getOffset());
final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset());
currentReadClaimStream = byteCountingInputStream;
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can
// reuse the same InputStream for the next FlowFile // reuse the same InputStream for the next FlowFile. We then need to use a LimitingInputStream to ensure that we don't allow the InputStream
final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream); // to be read past the end of the FlowFile (since multiple FlowFiles' contents may be in the given Resource Claim Input Stream).
return disableOnClose; // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability
// without buffering data in memory.
final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
return contentClaimInputStream;
} else { } else {
claimCache.flush(claim); claimCache.flush(claim);
final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset); final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset);
return rawInStream; return rawInStream;
} }
} catch (final ContentNotFoundException cnfe) { } catch (final ContentNotFoundException cnfe) {

View File

@ -39,11 +39,16 @@ public class ContentClaimInputStream extends InputStream {
private long markOffset; private long markOffset;
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) { public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) {
this(contentRepository, contentClaim, claimOffset, null);
}
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) {
this.contentRepository = contentRepository; this.contentRepository = contentRepository;
this.contentClaim = contentClaim; this.contentClaim = contentClaim;
this.claimOffset = claimOffset; this.claimOffset = claimOffset;
this.currentOffset = claimOffset; this.currentOffset = claimOffset;
this.delegate = initialDelegate;
} }
private InputStream getDelegate() throws IOException { private InputStream getDelegate() throws IOException {
@ -132,7 +137,10 @@ public class ContentClaimInputStream extends InputStream {
} }
if (currentOffset != markOffset) { if (currentOffset != markOffset) {
if (delegate != null) {
delegate.close(); delegate.close();
}
formDelegate(); formDelegate();
StreamUtils.skip(delegate, markOffset - claimOffset); StreamUtils.skip(delegate, markOffset - claimOffset);
currentOffset = markOffset; currentOffset = markOffset;

View File

@ -552,6 +552,11 @@ public class FileSystemRepository implements ContentRepository {
return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId()); return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
} }
public Path getPath(final ResourceClaim resourceClaim, final boolean verifyExists) throws ContentNotFoundException {
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
return getPath(contentClaim, verifyExists);
}
public Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException { public Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
final ResourceClaim resourceClaim = claim.getResourceClaim(); final ResourceClaim resourceClaim = claim.getResourceClaim();
final Path containerPath = containers.get(resourceClaim.getContainer()); final Path containerPath = containers.get(resourceClaim.getContainer());
@ -874,6 +879,17 @@ public class FileSystemRepository implements ContentRepository {
return claim.getLength(); return claim.getLength();
} }
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
final Path path = getPath(claim, true);
final FileInputStream fis = new FileInputStream(path.toFile());
return fis;
}
@Override @Override
public InputStream read(final ContentClaim claim) throws IOException { public InputStream read(final ContentClaim claim) throws IOException {
if (claim == null) { if (claim == null) {

View File

@ -468,6 +468,11 @@ public class VolatileContentRepository implements ContentRepository {
return backupClaim == null ? getContent(claim).read() : getBackupRepository().read(backupClaim); return backupClaim == null ? getContent(claim).read() : getBackupRepository().read(backupClaim);
} }
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
return read(new StandardContentClaim(claim, 0L));
}
@Override @Override
public OutputStream write(final ContentClaim claim) throws IOException { public OutputStream write(final ContentClaim claim) throws IOException {
final ContentClaim backupClaim = getBackupClaim(claim); final ContentClaim backupClaim = getBackupClaim(claim);

View File

@ -16,16 +16,10 @@
*/ */
package org.apache.nifi.controller.repository.crypto; package org.apache.nifi.controller.repository.crypto;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.KeyManagementException;
import javax.crypto.CipherOutputStream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.FileSystemRepository; import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.security.kms.EncryptionException; import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.kms.KeyProvider; import org.apache.nifi.security.kms.KeyProvider;
@ -40,6 +34,14 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.crypto.CipherOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.KeyManagementException;
/** /**
* This class is an implementation of the {@link FileSystemRepository} content repository which provides transparent * This class is an implementation of the {@link FileSystemRepository} content repository which provides transparent
* streaming encryption/decryption of content claim data during file system interaction. As of Apache NiFi 1.10.0 * streaming encryption/decryption of content claim data during file system interaction. As of Apache NiFi 1.10.0
@ -155,6 +157,16 @@ public class EncryptedFileSystemRepository extends FileSystemRepository {
return super.exportTo(claim, destination, append, offset, length); return super.exportTo(claim, destination, append, offset, length);
} }
@Override
public InputStream read(final ResourceClaim claim) {
throw new UnsupportedOperationException("Cannot read full ResourceClaim as a Stream when using EncryptedFileSystemRepository");
}
@Override
public boolean isResourceClaimStreamSupported() {
return false;
}
/** /**
* Returns an InputStream (actually a {@link javax.crypto.CipherInputStream}) which wraps * Returns an InputStream (actually a {@link javax.crypto.CipherInputStream}) which wraps
* the {@link java.io.FileInputStream} from the content repository claim on disk. This * the {@link java.io.FileInputStream} from the content repository claim on disk. This

View File

@ -161,7 +161,7 @@ class EncryptedFileSystemRepositoryTest {
@Test @Test
void testReadNullContentClaimShouldReturnEmptyInputStream() { void testReadNullContentClaimShouldReturnEmptyInputStream() {
final InputStream inputStream = repository.read(null) final InputStream inputStream = repository.read((ContentClaim) null)
final int read = inputStream.read() final int read = inputStream.read()
assert read == -1 assert read == -1
} }

View File

@ -2745,6 +2745,10 @@ public class StandardProcessSessionIT {
private Path getPath(final ContentClaim contentClaim) { private Path getPath(final ContentClaim contentClaim) {
final ResourceClaim claim = contentClaim.getResourceClaim(); final ResourceClaim claim = contentClaim.getResourceClaim();
return getPath(claim);
}
private Path getPath(final ResourceClaim claim) {
return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId()); return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
} }
@ -2806,6 +2810,23 @@ public class StandardProcessSessionIT {
} }
} }
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
if (disableRead) {
throw new IOException("Reading from repo is disabled by unit test");
}
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
try {
return new FileInputStream(getPath(claim).toFile());
} catch (final FileNotFoundException fnfe) {
throw new ContentNotFoundException(null, fnfe);
}
}
@Override @Override
public OutputStream write(final ContentClaim claim) throws IOException { public OutputStream write(final ContentClaim claim) throws IOException {
final Path path = getPath(claim); final Path path = getPath(claim);

View File

@ -213,6 +213,20 @@ public class ByteArrayContentRepository implements ContentRepository {
return byteArrayContentClaim.read(); return byteArrayContentClaim.read();
} }
@Override
public InputStream read(final ResourceClaim claim) throws IOException {
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
if (!(claim instanceof ByteArrayResourceClaim)) {
throw new IllegalArgumentException("Cannot access Resource Claim " + claim + " because the Resource Claim does not belong to this Content Repository");
}
final ByteArrayResourceClaim byteArrayResourceClaim = (ByteArrayResourceClaim) claim;
return byteArrayResourceClaim.read();
}
@Override @Override
public OutputStream write(final ContentClaim claim) { public OutputStream write(final ContentClaim claim) {
final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim); final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);

View File

@ -16,49 +16,28 @@
*/ */
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.security.KeyManagementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.security.KeyManagementException;
import java.util.concurrent.atomic.AtomicLong;
public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter { public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class); private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class);
public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000;
private ProvenanceEventEncryptor provenanceEventEncryptor;
private static final TimedBuffer<TimestampedLong> encryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private String keyId;
private int debugFrequency;
public static final int SERIALIZATION_VERSION = 1; public static final int SERIALIZATION_VERSION = 1;
public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; private ProvenanceEventEncryptor provenanceEventEncryptor;
private String keyId;
public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
final int uncompressedBlockSize, final IdentifierLookup idLookup,
ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException, EncryptionException {
this(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY);
}
public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed, public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
final int uncompressedBlockSize, final IdentifierLookup idLookup, final int uncompressedBlockSize, final IdentifierLookup idLookup,
ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException { ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException {
super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup); super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup);
this.provenanceEventEncryptor = provenanceEventEncryptor; this.provenanceEventEncryptor = provenanceEventEncryptor;
this.debugFrequency = debugFrequency;
try { try {
this.keyId = getNextAvailableKeyId(); this.keyId = getNextAvailableKeyId();
@ -69,101 +48,21 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
} }
@Override @Override
public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException { protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
final long encryptStart = System.nanoTime(); final byte[] serialized = super.serializeEvent(event);
byte[] cipherBytes; final String eventId = event.getBestEventIdentifier();
try { try {
byte[] serialized; final byte[] cipherBytes = encrypt(serialized, eventId);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); return cipherBytes;
final DataOutputStream dos = new DataOutputStream(baos)) {
writeRecord(record, 0L, dos);
serialized = baos.toByteArray();
}
String eventId = record.getBestEventIdentifier();
cipherBytes = encrypt(serialized, eventId);
} catch (EncryptionException e) { } catch (EncryptionException e) {
logger.error("Encountered an error: ", e); logger.error("Encountered an error: ", e);
throw new IOException("Error encrypting the provenance record", e); throw new IOException("Error encrypting the provenance record", e);
} }
final long encryptStop = System.nanoTime();
final long lockStart = System.nanoTime();
final long writeStart;
final long startBytes;
final long endBytes;
final long recordIdentifier;
synchronized (this) {
writeStart = System.nanoTime();
try {
recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
startBytes = getBytesWritten();
ensureStreamState(recordIdentifier, startBytes);
final DataOutputStream out = getBufferedOutputStream();
final int recordIdOffset = (int) (recordIdentifier - getFirstEventId());
out.writeInt(recordIdOffset);
out.writeInt(cipherBytes.length);
out.write(cipherBytes);
getRecordCount().incrementAndGet();
endBytes = getBytesWritten();
} catch (final IOException ioe) {
markDirty();
throw ioe;
}
} }
if (logger.isDebugEnabled()) { private byte[] encrypt(byte[] serialized, String eventId) throws EncryptionException {
// Collect stats and periodically dump them if log level is set to at least info. final String keyId = getKeyId();
final long writeNanos = System.nanoTime() - writeStart;
getWriteTimes().add(new TimestampedLong(writeNanos));
final long serializeNanos = lockStart - encryptStart;
getSerializeTimes().add(new TimestampedLong(serializeNanos));
final long encryptNanos = encryptStop - encryptStart;
getEncryptTimes().add(new TimestampedLong(encryptNanos));
final long lockNanos = writeStart - lockStart;
getLockTimes().add(new TimestampedLong(lockNanos));
getBytesWrittenBuffer().add(new TimestampedLong(endBytes - startBytes));
final long recordCount = getTotalRecordCount().incrementAndGet();
if (recordCount % debugFrequency == 0) {
printStats();
}
}
final long serializedLength = endBytes - startBytes;
final TocWriter tocWriter = getTocWriter();
final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
final File file = getFile();
final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
}
private void printStats() {
final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
final Long writeNanosLast60 = getWriteTimes().getAggregateValue(sixtySecondsAgo).getValue();
final Long lockNanosLast60 = getLockTimes().getAggregateValue(sixtySecondsAgo).getValue();
final Long serializeNanosLast60 = getSerializeTimes().getAggregateValue(sixtySecondsAgo).getValue();
final Long encryptNanosLast60 = getEncryptTimes().getAggregateValue(sixtySecondsAgo).getValue();
final Long bytesWrittenLast60 = getBytesWrittenBuffer().getAggregateValue(sixtySecondsAgo).getValue();
logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events, {} millis encrypting events",
TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
bytesWrittenLast60 / 1024 / 1024,
TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60),
TimeUnit.NANOSECONDS.toMillis(encryptNanosLast60));
}
static TimedBuffer<TimestampedLong> getEncryptTimes() {
return encryptTimes;
}
private byte[] encrypt(byte[] serialized, String eventId) throws IOException, EncryptionException {
String keyId = getKeyId();
try { try {
return provenanceEventEncryptor.encrypt(serialized, eventId, keyId); return provenanceEventEncryptor.encrypt(serialized, eventId, keyId);
} catch (Exception e) { } catch (Exception e) {
@ -192,8 +91,6 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
@Override @Override
public String toString() { public String toString() {
return "EncryptedSchemaRecordWriter" + return "EncryptedSchemaRecordWriter[keyId=" + keyId + ", encryptor=" + provenanceEventEncryptor + "]";
" using " + provenanceEventEncryptor +
" and current keyId " + keyId;
} }
} }

View File

@ -16,10 +16,6 @@
*/ */
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import java.io.IOException;
import java.security.KeyManagementException;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Authorizer;
@ -38,6 +34,11 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.security.KeyManagementException;
/** /**
* This class is an implementation of the {@link WriteAheadProvenanceRepository} provenance repository which provides transparent * This class is an implementation of the {@link WriteAheadProvenanceRepository} provenance repository which provides transparent
* block encryption/decryption of provenance event data during file system interaction. As of Apache NiFi 1.10.0 * block encryption/decryption of provenance event data during file system interaction. As of Apache NiFi 1.10.0
@ -53,10 +54,13 @@ public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanc
/** /**
* This constructor exists solely for the use of the Java Service Loader mechanism and should not be used. * This constructor exists solely for the use of the Java Service Loader mechanism and should not be used.
*/ */
@SuppressWarnings("unused")
public EncryptedWriteAheadProvenanceRepository() { public EncryptedWriteAheadProvenanceRepository() {
super(); super();
} }
// Created via reflection from FlowController
@SuppressWarnings("unused")
public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) { public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
super(RepositoryConfiguration.create(nifiProperties)); super(RepositoryConfiguration.create(nifiProperties));
} }

View File

@ -17,19 +17,6 @@
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventFieldNames;
import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.schema.LookupTableEventRecord;
@ -37,21 +24,24 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema;
import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.provenance.util.ByteArrayDataOutputStream;
import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache;
import org.apache.nifi.repository.schema.FieldMapRecord; import org.apache.nifi.repository.schema.FieldMapRecord;
import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.RecordSchema;
import org.apache.nifi.repository.schema.SchemaRecordWriter; import org.apache.nifi.repository.schema.SchemaRecordWriter;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer; import java.io.ByteArrayOutputStream;
import org.apache.nifi.util.timebuffer.TimestampedLong; import java.io.DataOutputStream;
import org.slf4j.Logger; import java.io.File;
import org.slf4j.LoggerFactory; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(EventIdFirstSchemaRecordWriter.class);
private static final RecordSchema eventSchema = LookupTableEventSchema.EVENT_SCHEMA; private static final RecordSchema eventSchema = LookupTableEventSchema.EVENT_SCHEMA;
private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields()); private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
private static final RecordSchema previousContentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields()); private static final RecordSchema previousContentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields());
@ -70,14 +60,6 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
private static final Map<String, Integer> eventTypeMap; private static final Map<String, Integer> eventTypeMap;
private static final List<String> eventTypeNames; private static final List<String> eventTypeNames;
private static final TimedBuffer<TimestampedLong> serializeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private static final TimedBuffer<TimestampedLong> lockTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private static final TimedBuffer<TimestampedLong> writeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private static final TimedBuffer<TimestampedLong> bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private static final AtomicLong totalRecordCount = new AtomicLong(0L);
private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024);
private long firstEventId; private long firstEventId;
private long systemTimeOffset; private long systemTimeOffset;
@ -102,38 +84,58 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
queueIdMap = idLookup.invertQueueIdentifiers(); queueIdMap = idLookup.invertQueueIdentifiers();
} }
public EventIdFirstSchemaRecordWriter(final OutputStream out, final String storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed,
final int uncompressedBlockSize, final IdentifierLookup idLookup) throws IOException {
super(out, storageLocation, idGenerator, tocWriter, compressed, uncompressedBlockSize);
this.idLookup = idLookup;
componentIdMap = idLookup.invertComponentIdentifiers();
componentTypeMap = idLookup.invertComponentTypes();
queueIdMap = idLookup.invertQueueIdentifiers();
}
@Override @Override
public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException { public Map<ProvenanceEventRecord, StorageSummary> writeRecords(final Iterable<ProvenanceEventRecord> events) throws IOException {
if (isDirty()) { if (isDirty()) {
throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository"); throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
} }
final long lockStart; final int heapThreshold = 1_000_000;
final long writeStart; final Map<ProvenanceEventRecord, StorageSummary> storageSummaries = new HashMap<>();
final Map<ProvenanceEventRecord, byte[]> serializedEvents = new LinkedHashMap<>();
int totalBytes = 0;
for (final ProvenanceEventRecord event : events) {
final byte[] serialized = serializeEvent(event);
serializedEvents.put(event, serialized);
totalBytes += serialized.length;
if (totalBytes >= heapThreshold) {
storeEvents(serializedEvents, storageSummaries);
recordCount.addAndGet(serializedEvents.size());
serializedEvents.clear();
totalBytes = 0;
}
}
storeEvents(serializedEvents, storageSummaries);
recordCount.addAndGet(serializedEvents.size());
return storageSummaries;
}
protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dataOutputStream = new DataOutputStream(baos)) {
writeRecord(event, 0L, dataOutputStream);
dataOutputStream.flush();
return baos.toByteArray();
}
}
private synchronized void storeEvents(final Map<ProvenanceEventRecord, byte[]> serializedEvents, final Map<ProvenanceEventRecord, StorageSummary> summaryMap) throws IOException {
for (final Map.Entry<ProvenanceEventRecord, byte[]> entry : serializedEvents.entrySet()) {
final ProvenanceEventRecord event = entry.getKey();
final byte[] serialized = entry.getValue();
final long startBytes; final long startBytes;
final long endBytes; final long endBytes;
final long recordIdentifier; final long recordIdentifier;
final long serializeStart = System.nanoTime();
final ByteArrayDataOutputStream bados = streamCache.checkOut();
try { try {
writeRecord(record, 0L, bados.getDataOutputStream()); recordIdentifier = event.getEventId() == -1 ? getIdGenerator().getAndIncrement() : event.getEventId();
lockStart = System.nanoTime();
synchronized (this) {
writeStart = System.nanoTime();
try {
recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
startBytes = getBytesWritten(); startBytes = getBytesWritten();
ensureStreamState(recordIdentifier, startBytes); ensureStreamState(recordIdentifier, startBytes);
@ -142,54 +144,29 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
final int recordIdOffset = (int) (recordIdentifier - firstEventId); final int recordIdOffset = (int) (recordIdentifier - firstEventId);
out.writeInt(recordIdOffset); out.writeInt(recordIdOffset);
final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); out.writeInt(serialized.length);
out.writeInt(baos.size()); out.write(serialized);
baos.writeTo(out);
recordCount.incrementAndGet();
endBytes = getBytesWritten(); endBytes = getBytesWritten();
} catch (final IOException ioe) { } catch (final IOException ioe) {
markDirty(); markDirty();
throw ioe; throw ioe;
} }
}
} finally {
streamCache.checkIn(bados);
}
if (logger.isDebugEnabled()) {
// Collect stats and periodically dump them if log level is set to at least info.
final long writeNanos = System.nanoTime() - writeStart;
writeTimes.add(new TimestampedLong(writeNanos));
final long serializeNanos = lockStart - serializeStart;
serializeTimes.add(new TimestampedLong(serializeNanos));
final long lockNanos = writeStart - lockStart;
lockTimes.add(new TimestampedLong(lockNanos));
bytesWritten.add(new TimestampedLong(endBytes - startBytes));
final long recordCount = totalRecordCount.incrementAndGet();
if (recordCount % 1_000_000 == 0) {
final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
final Long writeNanosLast60 = writeTimes.getAggregateValue(sixtySecondsAgo).getValue();
final Long lockNanosLast60 = lockTimes.getAggregateValue(sixtySecondsAgo).getValue();
final Long serializeNanosLast60 = serializeTimes.getAggregateValue(sixtySecondsAgo).getValue();
final Long bytesWrittenLast60 = bytesWritten.getAggregateValue(sixtySecondsAgo).getValue();
logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events",
TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
bytesWrittenLast60 / 1024 / 1024,
TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60));
}
}
final long serializedLength = endBytes - startBytes; final long serializedLength = endBytes - startBytes;
final TocWriter tocWriter = getTocWriter(); final TocWriter tocWriter = getTocWriter();
final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex(); final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
final File file = getFile(); final File file = getFile();
final String storageLocation = file.getParentFile().getName() + "/" + file.getName(); final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes); final StorageSummary storageSummary = new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
summaryMap.put(event, storageSummary);
}
}
@Override
public StorageSummary writeRecord(final ProvenanceEventRecord record) {
// This method should never be called because it's only called by super.writeRecords. That method is overridden in this class and never delegates to this method.
throw new UnsupportedOperationException();
} }
@Override @Override
@ -245,47 +222,4 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
protected String getSerializationName() { protected String getSerializationName() {
return SERIALIZATION_NAME; return SERIALIZATION_NAME;
} }
/* Getters for internal state written to by subclass EncryptedSchemaRecordWriter */
IdentifierLookup getIdLookup() {
return idLookup;
}
SchemaRecordWriter getSchemaRecordWriter() {
return schemaRecordWriter;
}
AtomicInteger getRecordCount() {
return recordCount;
}
static TimedBuffer<TimestampedLong> getSerializeTimes() {
return serializeTimes;
}
static TimedBuffer<TimestampedLong> getLockTimes() {
return lockTimes;
}
static TimedBuffer<TimestampedLong> getWriteTimes() {
return writeTimes;
}
static TimedBuffer<TimestampedLong> getBytesWrittenBuffer() {
return bytesWritten;
}
static AtomicLong getTotalRecordCount() {
return totalRecordCount;
}
long getFirstEventId() {
return firstEventId;
}
long getSystemTimeOffset() {
return systemTimeOffset;
}
} }

View File

@ -108,6 +108,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
config = null; config = null;
} }
// Created via reflection from FlowController
@SuppressWarnings("unused")
public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) { public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
this(RepositoryConfiguration.create(nifiProperties)); this(RepositoryConfiguration.create(nifiProperties));
} }

View File

@ -17,14 +17,6 @@
package org.apache.nifi.provenance.serialization; package org.apache.nifi.provenance.serialization;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.AbstractRecordWriter; import org.apache.nifi.provenance.AbstractRecordWriter;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.provenance.toc.TocWriter;
@ -34,6 +26,14 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
public abstract class CompressableRecordWriter extends AbstractRecordWriter { public abstract class CompressableRecordWriter extends AbstractRecordWriter {
private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class); private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class);
@ -145,7 +145,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
} }
} }
protected synchronized void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException { protected void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException {
// add a new block to the TOC if needed. // add a new block to the TOC if needed.
if (getTocWriter() != null && (startBytes - blockStartOffset >= uncompressedBlockSize)) { if (getTocWriter() != null && (startBytes - blockStartOffset >= uncompressedBlockSize)) {
blockStartOffset = startBytes; blockStartOffset = startBytes;
@ -222,4 +222,5 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
protected abstract int getSerializationVersion(); protected abstract int getSerializationVersion();
protected abstract String getSerializationName(); protected abstract String getSerializationName();
} }

View File

@ -16,17 +16,6 @@
*/ */
package org.apache.nifi.provenance.serialization; package org.apache.nifi.provenance.serialization;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.zip.GZIPInputStream;
import org.apache.nifi.properties.NiFiPropertiesLoader; import org.apache.nifi.properties.NiFiPropertiesLoader;
import org.apache.nifi.provenance.ByteArraySchemaRecordReader; import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
import org.apache.nifi.provenance.ByteArraySchemaRecordWriter; import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
@ -44,6 +33,18 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.zip.GZIPInputStream;
public class RecordReaders { public class RecordReaders {
private static Logger logger = LoggerFactory.getLogger(RecordReaders.class); private static Logger logger = LoggerFactory.getLogger(RecordReaders.class);

View File

@ -16,12 +16,14 @@
*/ */
package org.apache.nifi.provenance.serialization; package org.apache.nifi.provenance.serialization;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.toc.TocWriter;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import org.apache.nifi.provenance.ProvenanceEventRecord; import java.util.Map;
import org.apache.nifi.provenance.toc.TocWriter;
public interface RecordWriter extends Closeable { public interface RecordWriter extends Closeable {
@ -42,6 +44,17 @@ public interface RecordWriter extends Closeable {
*/ */
StorageSummary writeRecord(ProvenanceEventRecord record) throws IOException; StorageSummary writeRecord(ProvenanceEventRecord record) throws IOException;
default Map<ProvenanceEventRecord, StorageSummary> writeRecords(Iterable<ProvenanceEventRecord> events) throws IOException {
final Map<ProvenanceEventRecord, StorageSummary> locationMap = new HashMap<>();
for (final ProvenanceEventRecord nextEvent : events) {
final StorageSummary writerSummary = writeRecord(nextEvent);
locationMap.put(nextEvent, writerSummary);
}
return locationMap;
}
/** /**
* Flushes any data that is held in a buffer to the underlying storage mechanism * Flushes any data that is held in a buffer to the underlying storage mechanism
* *

View File

@ -320,11 +320,15 @@ public class WriteAheadStorePartition implements EventStorePartition {
try { try {
long maxId = -1L; long maxId = -1L;
int numEvents = 0; int numEvents = 0;
for (final ProvenanceEventRecord nextEvent : events) {
final StorageSummary writerSummary = writer.writeRecord(nextEvent); final Map<ProvenanceEventRecord, StorageSummary> writerSummaries = writer.writeRecords(events);
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : writerSummaries.entrySet()) {
final ProvenanceEventRecord eventRecord = entry.getKey();
final StorageSummary writerSummary = entry.getValue();
final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName, final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName,
writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten()); writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten());
locationMap.put(nextEvent, summaryWithIndex); locationMap.put(eventRecord, summaryWithIndex);
maxId = summaryWithIndex.getEventId(); maxId = summaryWithIndex.getEventId();
numEvents++; numEvents++;
} }

View File

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.util;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
public class ByteArrayDataOutputStream {
private final ByteArrayOutputStream baos;
private final DataOutputStream dos;
public ByteArrayDataOutputStream(final int initialCapacity) {
baos = new ByteArrayOutputStream(initialCapacity);
dos = new DataOutputStream(baos);
}
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
}
public DataOutputStream getDataOutputStream() {
return dos;
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ByteArrayDataOutputStreamCache {
private final BlockingQueue<ByteArrayDataOutputStream> queue;
private final int initialBufferSize;
private final int maxBufferSize;
public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) {
this.queue = new LinkedBlockingQueue<>(maxCapacity);
this.initialBufferSize = initialBufferSize;
this.maxBufferSize = maxBufferSize;
}
public ByteArrayDataOutputStream checkOut() {
final ByteArrayDataOutputStream stream = queue.poll();
if (stream != null) {
return stream;
}
return new ByteArrayDataOutputStream(initialBufferSize);
}
public void checkIn(final ByteArrayDataOutputStream bados) {
final int size = bados.getByteArrayOutputStream().size();
if (size > maxBufferSize) {
return;
}
bados.getByteArrayOutputStream().reset();
queue.offer(bados);
}
}

View File

@ -199,7 +199,7 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
// Act // Act
int encryptedRecordId = idGenerator.get() int encryptedRecordId = idGenerator.get()
encryptedWriter.writeHeader(encryptedRecordId) encryptedWriter.writeHeader(encryptedRecordId)
encryptedWriter.writeRecord(record) encryptedWriter.writeRecords(Collections.singletonList(record))
encryptedWriter.close() encryptedWriter.close()
logger.info("Wrote encrypted record ${encryptedRecordId} to journal") logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
@ -242,13 +242,13 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
// Act // Act
int standardRecordId = idGenerator.get() int standardRecordId = idGenerator.get()
standardWriter.writeHeader(standardRecordId) standardWriter.writeHeader(standardRecordId)
standardWriter.writeRecord(record) standardWriter.writeRecords(Collections.singletonList(record))
standardWriter.close() standardWriter.close()
logger.info("Wrote standard record ${standardRecordId} to journal") logger.info("Wrote standard record ${standardRecordId} to journal")
int encryptedRecordId = idGenerator.get() int encryptedRecordId = idGenerator.get()
encryptedWriter.writeHeader(encryptedRecordId) encryptedWriter.writeHeader(encryptedRecordId)
encryptedWriter.writeRecord(record) encryptedWriter.writeRecords(Collections.singletonList(record))
encryptedWriter.close() encryptedWriter.close()
logger.info("Wrote encrypted record ${encryptedRecordId} to journal") logger.info("Wrote encrypted record ${encryptedRecordId} to journal")

View File

@ -17,21 +17,6 @@
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.StandardTocReader; import org.apache.nifi.provenance.toc.StandardTocReader;
@ -43,6 +28,23 @@ import org.apache.nifi.util.file.FileUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public abstract class AbstractTestRecordReaderWriter { public abstract class AbstractTestRecordReaderWriter {
@BeforeClass @BeforeClass
@ -62,7 +64,7 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024); final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(createEvent()); writer.writeRecords(Collections.singletonList(createEvent()));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -96,7 +98,7 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192); final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(createEvent()); writer.writeRecords(Collections.singletonList(createEvent()));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -117,7 +119,7 @@ public abstract class AbstractTestRecordReaderWriter {
writer.writeHeader(1L); writer.writeHeader(1L);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent()); writer.writeRecords(Collections.singletonList(createEvent()));
} }
writer.close(); writer.close();
@ -156,7 +158,7 @@ public abstract class AbstractTestRecordReaderWriter {
writer.writeHeader(1L); writer.writeHeader(1L);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent()); writer.writeRecords(Collections.singletonList(createEvent()));
} }
writer.close(); writer.close();
@ -198,7 +200,7 @@ public abstract class AbstractTestRecordReaderWriter {
for (int i = 0; i < numEvents; i++) { for (int i = 0; i < numEvents; i++) {
final ProvenanceEventRecord event = createEvent(); final ProvenanceEventRecord event = createEvent();
events.add(event); events.add(event);
writer.writeRecord(event); writer.writeRecords(Collections.singletonList(event));
} }
writer.close(); writer.close();
@ -208,6 +210,8 @@ public abstract class AbstractTestRecordReaderWriter {
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) { final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < numEvents; i++) { for (int i = 0; i < numEvents; i++) {
System.out.println(i);
final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(i); final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(i);
assertTrue(eventOption.isPresent()); assertTrue(eventOption.isPresent());
assertEquals(i, eventOption.get().getEventId()); assertEquals(i, eventOption.get().getEventId());

View File

@ -17,9 +17,19 @@
package org.apache.nifi.provenance; package org.apache.nifi.provenance;
import static org.junit.Assert.assertEquals; import org.apache.nifi.provenance.serialization.RecordReader;
import static org.junit.Assert.assertNotNull; import org.apache.nifi.provenance.serialization.RecordWriter;
import static org.junit.Assert.assertNull; import org.apache.nifi.provenance.toc.StandardTocReader;
import org.apache.nifi.provenance.toc.StandardTocWriter;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -34,19 +44,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.serialization.RecordReader; import static org.junit.Assert.assertEquals;
import org.apache.nifi.provenance.serialization.RecordWriter; import static org.junit.Assert.assertNotNull;
import org.apache.nifi.provenance.toc.StandardTocReader; import static org.junit.Assert.assertNull;
import org.apache.nifi.provenance.toc.StandardTocWriter;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter { public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter {
private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicLong idGenerator = new AtomicLong(0L);
@ -88,7 +88,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(record); writer.writeRecords(Collections.singletonList(record));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -146,7 +146,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(record); writer.writeRecords(Collections.singletonList(record));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -203,7 +203,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(record); writer.writeRecords(Collections.singletonList(record));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -261,7 +261,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
writer.writeHeader(1L); writer.writeHeader(1L);
writer.writeRecord(record); writer.writeRecords(Collections.singletonList(record));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -322,7 +322,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
writer.writeHeader(500_000L); writer.writeHeader(500_000L);
writer.writeRecord(record); writer.writeRecords(Collections.singletonList(record));
writer.close(); writer.close();
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
@ -382,12 +382,12 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
builder.setCurrentContentClaim("container-2", "section-2", "identifier-2", 2L, 2L); builder.setCurrentContentClaim("container-2", "section-2", "identifier-2", 2L, 2L);
writer.writeHeader(500_000L); writer.writeHeader(500_000L);
writer.writeRecord(builder.build()); writer.writeRecords(Collections.singletonList(builder.build()));
builder.setEventId(1_000_001L); builder.setEventId(1_000_001L);
builder.setComponentId("4444"); builder.setComponentId("4444");
builder.setComponentType("unit-test-component-1"); builder.setComponentType("unit-test-component-1");
writer.writeRecord(builder.build()); writer.writeRecords(Collections.singletonList(builder.build()));
writer.close(); writer.close();
@ -435,7 +435,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
writer.writeHeader(0L); writer.writeHeader(0L);
for (int i = 0; i < 100_000; i++) { for (int i = 0; i < 100_000; i++) {
writer.writeRecord(createEvent()); writer.writeRecords(Collections.singletonList(createEvent()));
} }
} }