From 9f95a10df93f4e1cb5c80b62e06afe8b1d64314f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 Jan 2018 12:16:56 -0500 Subject: [PATCH] NIFI-4794: Updated event writers to avoid creating a lot of byte[] by reusing buffers. Also removed synchronization on EventWriter when rolling over the writer and just moved the writing of the header to happen before making the writer available to any other threads. This reduces thread contention during rollover. Signed-off-by: Matthew Burgess This closes #2437 --- .../repository/schema/ByteArrayCache.java | 48 +++++++++++++ .../repository/schema/SchemaRecordWriter.java | 71 ++++++++++++++----- .../EncryptedSchemaRecordReader.java | 17 +---- .../EventIdFirstSchemaRecordWriter.java | 59 ++++++++------- .../CompressableRecordWriter.java | 11 ++- .../store/WriteAheadStorePartition.java | 60 +++++++--------- .../util/ByteArrayDataOutputStream.java | 39 ++++++++++ .../util/ByteArrayDataOutputStreamCache.java | 52 ++++++++++++++ 8 files changed, 261 insertions(+), 96 deletions(-) create mode 100644 nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java create mode 100644 nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java create mode 100644 nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java new file mode 100644 index 0000000000..32e851a01d --- /dev/null +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.repository.schema; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ByteArrayCache { + private final BlockingQueue queue; + private final int bufferSize; + + public ByteArrayCache(final int maxCapacity, final int bufferSize) { + this.queue = new LinkedBlockingQueue<>(maxCapacity); + this.bufferSize = bufferSize; + } + + public byte[] checkOut() { + final byte[] array = queue.poll(); + if (array != null) { + return array; + } + + return new byte[bufferSize]; + } + + public void checkIn(final byte[] array) { + if (array.length != bufferSize) { + return; + } + + queue.offer(array); + } +} diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java index 5305e5bd01..67d558ae47 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java @@ -34,39 +34,47 @@ public class SchemaRecordWriter { public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class); + private static final int CACHE_BUFFER_SIZE = 65536; + private static final ByteArrayCache byteArrayCache = new ByteArrayCache(32, CACHE_BUFFER_SIZE); public void writeRecord(final Record record, final OutputStream out) throws IOException { // write sentinel value to indicate that there is a record. This allows the reader to then read one // byte and check if -1. If so, the reader knows there are no more records. If not, then the reader // knows that it should be able to continue reading. out.write(1); - writeRecordFields(record, out); + + final byte[] buffer = byteArrayCache.checkOut(); + try { + writeRecordFields(record, out, buffer); + } finally { + byteArrayCache.checkIn(buffer); + } } - private void writeRecordFields(final Record record, final OutputStream out) throws IOException { - writeRecordFields(record, record.getSchema(), out); + private void writeRecordFields(final Record record, final OutputStream out, final byte[] buffer) throws IOException { + writeRecordFields(record, record.getSchema(), out, buffer); } - private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out) throws IOException { + private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out, final byte[] buffer) throws IOException { final DataOutputStream dos = out instanceof DataOutputStream ? (DataOutputStream) out : new DataOutputStream(out); for (final RecordField field : schema.getFields()) { final Object value = record.getFieldValue(field); try { - writeFieldRepetitionAndValue(field, value, dos); + writeFieldRepetitionAndValue(field, value, dos, buffer); } catch (final Exception e) { throw new IOException("Failed to write field '" + field.getFieldName() + "'", e); } } } - private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos) throws IOException { + private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos, final byte[] buffer) throws IOException { switch (field.getRepetition()) { case EXACTLY_ONE: { if (value == null) { throw new IllegalArgumentException("Record does not have a value for the '" + field.getFieldName() + "' but the field is required"); } - writeFieldValue(field, value, dos); + writeFieldValue(field, value, dos, buffer); break; } case ZERO_OR_MORE: { @@ -83,7 +91,7 @@ public class SchemaRecordWriter { final Collection collection = (Collection) value; dos.writeInt(collection.size()); for (final Object fieldValue : collection) { - writeFieldValue(field, fieldValue, dos); + writeFieldValue(field, fieldValue, dos, buffer); } break; } @@ -93,14 +101,25 @@ public class SchemaRecordWriter { break; } dos.write(1); - writeFieldValue(field, value, dos); + writeFieldValue(field, value, dos, buffer); break; } } } + private boolean allSingleByteInUtf8(final String value) { + for (int i = 0; i < value.length(); i++) { + final char ch = value.charAt(i); + if (ch < 1 || ch > 127) { + return false; + } + } + + return true; + } + @SuppressWarnings("unchecked") - private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out) throws IOException { + private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out, final byte[] buffer) throws IOException { switch (field.getFieldType()) { case BOOLEAN: out.writeBoolean((boolean) value); @@ -120,9 +139,27 @@ public class SchemaRecordWriter { writeUTFLimited(out, (String) value, field.getFieldName()); break; case LONG_STRING: - final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8); - out.writeInt(charArray.length); - out.write(charArray); + // In many cases, we will see a String value that consists solely of values in the range of + // 1-127, which means that in UTF-8 they will translate into a single byte each. If all characters + // in the string adhere to this, then we can skip calling String.getBytes() because that will allocate + // a new byte[] every time, which results in a lot of pressure on the garbage collector. + final String string = (String) value; + final int length = string.length(); + + if (length <= buffer.length && allSingleByteInUtf8(string)) { + out.writeInt(length); + + for (int i = 0; i < length; i++) { + final char ch = string.charAt(i); + buffer[i] = (byte) ch; + } + + out.write(buffer, 0, length); + } else { + final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8); + out.writeInt(charArray.length); + out.write(charArray); + } break; case MAP: final Map map = (Map) value; @@ -132,19 +169,19 @@ public class SchemaRecordWriter { final RecordField valueField = subFields.get(1); for (final Map.Entry entry : map.entrySet()) { - writeFieldRepetitionAndValue(keyField, entry.getKey(), out); - writeFieldRepetitionAndValue(valueField, entry.getValue(), out); + writeFieldRepetitionAndValue(keyField, entry.getKey(), out, buffer); + writeFieldRepetitionAndValue(valueField, entry.getValue(), out, buffer); } break; case UNION: final NamedValue namedValue = (NamedValue) value; writeUTFLimited(out, namedValue.getName(), field.getFieldName()); final Record childRecord = (Record) namedValue.getValue(); - writeRecordFields(childRecord, out); + writeRecordFields(childRecord, out, buffer); break; case COMPLEX: final Record record = (Record) value; - writeRecordFields(record, out); + writeRecordFields(record, out, buffer); break; } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java index fcd7fee53e..e83ce20f23 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java @@ -23,42 +23,29 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.Optional; -import java.util.concurrent.TimeUnit; + import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader { private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class); - private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; - private ProvenanceEventEncryptor provenanceEventEncryptor; - private static final TimedBuffer decryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - - private int debugFrequency = DEFAULT_DEBUG_FREQUENCY; public static final int SERIALIZATION_VERSION = 1; public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; - public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, - ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException { - this(inputStream, filename, tocReader, maxAttributeChars, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY); - } public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, - ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException { + ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException { super(inputStream, filename, tocReader, maxAttributeChars); this.provenanceEventEncryptor = provenanceEventEncryptor; - this.debugFrequency = debugFrequency; } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java index 8f5b2b2103..05e6736c27 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; @@ -36,6 +37,8 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema; import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.provenance.util.ByteArrayDataOutputStream; +import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache; import org.apache.nifi.repository.schema.FieldMapRecord; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; @@ -73,6 +76,8 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { private static final TimedBuffer bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); private static final AtomicLong totalRecordCount = new AtomicLong(0L); + private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024); + private long firstEventId; private long systemTimeOffset; @@ -113,39 +118,43 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository"); } - final long serializeStart = System.nanoTime(); - final byte[] serialized; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); - final DataOutputStream dos = new DataOutputStream(baos)) { - writeRecord(record, 0L, dos); - serialized = baos.toByteArray(); - } - - final long lockStart = System.nanoTime(); + final long lockStart; final long writeStart; final long startBytes; final long endBytes; final long recordIdentifier; - synchronized (this) { - writeStart = System.nanoTime(); - try { - recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); - startBytes = getBytesWritten(); - ensureStreamState(recordIdentifier, startBytes); + final long serializeStart = System.nanoTime(); + final ByteArrayDataOutputStream bados = streamCache.checkOut(); + try { + writeRecord(record, 0L, bados.getDataOutputStream()); - final DataOutputStream out = getBufferedOutputStream(); - final int recordIdOffset = (int) (recordIdentifier - firstEventId); - out.writeInt(recordIdOffset); - out.writeInt(serialized.length); - out.write(serialized); + lockStart = System.nanoTime(); + synchronized (this) { + writeStart = System.nanoTime(); + try { + recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); + startBytes = getBytesWritten(); - recordCount.incrementAndGet(); - endBytes = getBytesWritten(); - } catch (final IOException ioe) { - markDirty(); - throw ioe; + ensureStreamState(recordIdentifier, startBytes); + + final DataOutputStream out = getBufferedOutputStream(); + final int recordIdOffset = (int) (recordIdentifier - firstEventId); + out.writeInt(recordIdOffset); + + final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); + out.writeInt(baos.size()); + baos.writeTo(out); + + recordCount.incrementAndGet(); + endBytes = getBytesWritten(); + } catch (final IOException ioe) { + markDirty(); + throw ioe; + } } + } finally { + streamCache.checkIn(bados); } if (logger.isDebugEnabled()) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java index b5646009b0..21e2c0767b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java @@ -56,7 +56,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { this.compressed = compressed; this.fos = new FileOutputStream(file); - rawOutStream = new ByteCountingOutputStream(fos); + rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(fos)); this.uncompressedBlockSize = uncompressedBlockSize; this.idGenerator = idGenerator; } @@ -68,7 +68,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { this.compressed = compressed; this.uncompressedBlockSize = uncompressedBlockSize; - this.rawOutStream = new ByteCountingOutputStream(out); + this.rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(out)); this.idGenerator = idGenerator; } @@ -114,7 +114,6 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); final TocWriter tocWriter = getTocWriter(); - final OutputStream writableStream; if (compressed) { // because of the way that GZIPOutputStream works, we need to call close() on it in order for it // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap @@ -128,16 +127,16 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } - writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + final OutputStream writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); } else { if (tocWriter != null && eventId != null) { tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } - writableStream = new BufferedOutputStream(rawOutStream, 65536); + this.byteCountingOut = rawOutStream; } - this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); this.out = new DataOutputStream(byteCountingOut); resetDirtyFlag(); } catch (final IOException ioe) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 22d2a5f07a..2de78f8a85 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory; public class WriteAheadStorePartition implements EventStorePartition { private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class); - private final RepositoryConfiguration config; private final File partitionDirectory; private final String partitionName; @@ -253,48 +252,43 @@ public class WriteAheadStorePartition implements EventStorePartition { final long nextEventId = idGenerator.get(); final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); + updatedWriter.writeHeader(nextEventId); - // Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has - // been fully initialized (i.e., the header has been written, etc.) - synchronized (updatedWriter) { - final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); - final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); + final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); + final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - if (updated) { - if (lease != null) { - lease.close(); - } + if (!updated) { + try { + updatedWriter.close(); + } catch (final Exception e) { + logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + } - updatedWriter.writeHeader(nextEventId); + updatedEventFile.delete(); + return false; + } - synchronized (minEventIdToPathMap) { - minEventIdToPathMap.put(nextEventId, updatedEventFile); - } + if (lease != null) { + lease.close(); + } - if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { - boolean offered = false; - while (!offered && !closed) { - try { - offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); - } - } - } + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.put(nextEventId, updatedEventFile); + } - return true; - } else { + if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { + boolean offered = false; + while (!offered && !closed) { try { - updatedWriter.close(); - } catch (final Exception e) { - logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); } - - updatedEventFile.delete(); - return false; } } + + return true; } private Map addEvents(final Iterable events, final RecordWriter writer) throws IOException { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java new file mode 100644 index 0000000000..23aefb3ce9 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.util; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +public class ByteArrayDataOutputStream { + private final ByteArrayOutputStream baos; + private final DataOutputStream dos; + + public ByteArrayDataOutputStream(final int initialCapacity) { + baos = new ByteArrayOutputStream(initialCapacity); + dos = new DataOutputStream(baos); + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return baos; + } + + public DataOutputStream getDataOutputStream() { + return dos; + } +} diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java new file mode 100644 index 0000000000..953559093f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ByteArrayDataOutputStreamCache { + private final BlockingQueue queue; + private final int initialBufferSize; + private final int maxBufferSize; + + public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) { + this.queue = new LinkedBlockingQueue<>(maxCapacity); + this.initialBufferSize = initialBufferSize; + this.maxBufferSize = maxBufferSize; + } + + public ByteArrayDataOutputStream checkOut() { + final ByteArrayDataOutputStream stream = queue.poll(); + if (stream != null) { + return stream; + } + + return new ByteArrayDataOutputStream(initialBufferSize); + } + + public void checkIn(final ByteArrayDataOutputStream bados) { + final int size = bados.getByteArrayOutputStream().size(); + if (size > maxBufferSize) { + return; + } + + bados.getByteArrayOutputStream().reset(); + queue.offer(bados); + } +}