diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java new file mode 100644 index 0000000000..32e851a01d --- /dev/null +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/ByteArrayCache.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.repository.schema; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ByteArrayCache { + private final BlockingQueue queue; + private final int bufferSize; + + public ByteArrayCache(final int maxCapacity, final int bufferSize) { + this.queue = new LinkedBlockingQueue<>(maxCapacity); + this.bufferSize = bufferSize; + } + + public byte[] checkOut() { + final byte[] array = queue.poll(); + if (array != null) { + return array; + } + + return new byte[bufferSize]; + } + + public void checkIn(final byte[] array) { + if (array.length != bufferSize) { + return; + } + + queue.offer(array); + } +} diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java index 5305e5bd01..67d558ae47 100644 --- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java +++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java @@ -34,39 +34,47 @@ public class SchemaRecordWriter { public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class); + private static final int CACHE_BUFFER_SIZE = 65536; + private static final ByteArrayCache byteArrayCache = new ByteArrayCache(32, CACHE_BUFFER_SIZE); public void writeRecord(final Record record, final OutputStream out) throws IOException { // write sentinel value to indicate that there is a record. This allows the reader to then read one // byte and check if -1. If so, the reader knows there are no more records. If not, then the reader // knows that it should be able to continue reading. out.write(1); - writeRecordFields(record, out); + + final byte[] buffer = byteArrayCache.checkOut(); + try { + writeRecordFields(record, out, buffer); + } finally { + byteArrayCache.checkIn(buffer); + } } - private void writeRecordFields(final Record record, final OutputStream out) throws IOException { - writeRecordFields(record, record.getSchema(), out); + private void writeRecordFields(final Record record, final OutputStream out, final byte[] buffer) throws IOException { + writeRecordFields(record, record.getSchema(), out, buffer); } - private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out) throws IOException { + private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out, final byte[] buffer) throws IOException { final DataOutputStream dos = out instanceof DataOutputStream ? (DataOutputStream) out : new DataOutputStream(out); for (final RecordField field : schema.getFields()) { final Object value = record.getFieldValue(field); try { - writeFieldRepetitionAndValue(field, value, dos); + writeFieldRepetitionAndValue(field, value, dos, buffer); } catch (final Exception e) { throw new IOException("Failed to write field '" + field.getFieldName() + "'", e); } } } - private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos) throws IOException { + private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos, final byte[] buffer) throws IOException { switch (field.getRepetition()) { case EXACTLY_ONE: { if (value == null) { throw new IllegalArgumentException("Record does not have a value for the '" + field.getFieldName() + "' but the field is required"); } - writeFieldValue(field, value, dos); + writeFieldValue(field, value, dos, buffer); break; } case ZERO_OR_MORE: { @@ -83,7 +91,7 @@ public class SchemaRecordWriter { final Collection collection = (Collection) value; dos.writeInt(collection.size()); for (final Object fieldValue : collection) { - writeFieldValue(field, fieldValue, dos); + writeFieldValue(field, fieldValue, dos, buffer); } break; } @@ -93,14 +101,25 @@ public class SchemaRecordWriter { break; } dos.write(1); - writeFieldValue(field, value, dos); + writeFieldValue(field, value, dos, buffer); break; } } } + private boolean allSingleByteInUtf8(final String value) { + for (int i = 0; i < value.length(); i++) { + final char ch = value.charAt(i); + if (ch < 1 || ch > 127) { + return false; + } + } + + return true; + } + @SuppressWarnings("unchecked") - private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out) throws IOException { + private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out, final byte[] buffer) throws IOException { switch (field.getFieldType()) { case BOOLEAN: out.writeBoolean((boolean) value); @@ -120,9 +139,27 @@ public class SchemaRecordWriter { writeUTFLimited(out, (String) value, field.getFieldName()); break; case LONG_STRING: - final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8); - out.writeInt(charArray.length); - out.write(charArray); + // In many cases, we will see a String value that consists solely of values in the range of + // 1-127, which means that in UTF-8 they will translate into a single byte each. If all characters + // in the string adhere to this, then we can skip calling String.getBytes() because that will allocate + // a new byte[] every time, which results in a lot of pressure on the garbage collector. + final String string = (String) value; + final int length = string.length(); + + if (length <= buffer.length && allSingleByteInUtf8(string)) { + out.writeInt(length); + + for (int i = 0; i < length; i++) { + final char ch = string.charAt(i); + buffer[i] = (byte) ch; + } + + out.write(buffer, 0, length); + } else { + final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8); + out.writeInt(charArray.length); + out.write(charArray); + } break; case MAP: final Map map = (Map) value; @@ -132,19 +169,19 @@ public class SchemaRecordWriter { final RecordField valueField = subFields.get(1); for (final Map.Entry entry : map.entrySet()) { - writeFieldRepetitionAndValue(keyField, entry.getKey(), out); - writeFieldRepetitionAndValue(valueField, entry.getValue(), out); + writeFieldRepetitionAndValue(keyField, entry.getKey(), out, buffer); + writeFieldRepetitionAndValue(valueField, entry.getValue(), out, buffer); } break; case UNION: final NamedValue namedValue = (NamedValue) value; writeUTFLimited(out, namedValue.getName(), field.getFieldName()); final Record childRecord = (Record) namedValue.getValue(); - writeRecordFields(childRecord, out); + writeRecordFields(childRecord, out, buffer); break; case COMPLEX: final Record record = (Record) value; - writeRecordFields(record, out); + writeRecordFields(record, out, buffer); break; } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java index fcd7fee53e..e83ce20f23 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordReader.java @@ -23,42 +23,29 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.Optional; -import java.util.concurrent.TimeUnit; + import org.apache.nifi.provenance.schema.LookupTableEventRecord; import org.apache.nifi.provenance.toc.TocReader; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.timebuffer.LongEntityAccess; -import org.apache.nifi.util.timebuffer.TimedBuffer; -import org.apache.nifi.util.timebuffer.TimestampedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader { private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class); - private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000; - private ProvenanceEventEncryptor provenanceEventEncryptor; - private static final TimedBuffer decryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); - - private int debugFrequency = DEFAULT_DEBUG_FREQUENCY; public static final int SERIALIZATION_VERSION = 1; public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter"; - public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, - ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException { - this(inputStream, filename, tocReader, maxAttributeChars, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY); - } public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars, - ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException { + ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException { super(inputStream, filename, tocReader, maxAttributeChars); this.provenanceEventEncryptor = provenanceEventEncryptor; - this.debugFrequency = debugFrequency; } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java index 8f5b2b2103..05e6736c27 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; import org.apache.nifi.provenance.schema.LookupTableEventRecord; @@ -36,6 +37,8 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema; import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.provenance.util.ByteArrayDataOutputStream; +import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache; import org.apache.nifi.repository.schema.FieldMapRecord; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; @@ -73,6 +76,8 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { private static final TimedBuffer bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess()); private static final AtomicLong totalRecordCount = new AtomicLong(0L); + private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024); + private long firstEventId; private long systemTimeOffset; @@ -113,39 +118,43 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository"); } - final long serializeStart = System.nanoTime(); - final byte[] serialized; - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); - final DataOutputStream dos = new DataOutputStream(baos)) { - writeRecord(record, 0L, dos); - serialized = baos.toByteArray(); - } - - final long lockStart = System.nanoTime(); + final long lockStart; final long writeStart; final long startBytes; final long endBytes; final long recordIdentifier; - synchronized (this) { - writeStart = System.nanoTime(); - try { - recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); - startBytes = getBytesWritten(); - ensureStreamState(recordIdentifier, startBytes); + final long serializeStart = System.nanoTime(); + final ByteArrayDataOutputStream bados = streamCache.checkOut(); + try { + writeRecord(record, 0L, bados.getDataOutputStream()); - final DataOutputStream out = getBufferedOutputStream(); - final int recordIdOffset = (int) (recordIdentifier - firstEventId); - out.writeInt(recordIdOffset); - out.writeInt(serialized.length); - out.write(serialized); + lockStart = System.nanoTime(); + synchronized (this) { + writeStart = System.nanoTime(); + try { + recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId(); + startBytes = getBytesWritten(); - recordCount.incrementAndGet(); - endBytes = getBytesWritten(); - } catch (final IOException ioe) { - markDirty(); - throw ioe; + ensureStreamState(recordIdentifier, startBytes); + + final DataOutputStream out = getBufferedOutputStream(); + final int recordIdOffset = (int) (recordIdentifier - firstEventId); + out.writeInt(recordIdOffset); + + final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); + out.writeInt(baos.size()); + baos.writeTo(out); + + recordCount.incrementAndGet(); + endBytes = getBytesWritten(); + } catch (final IOException ioe) { + markDirty(); + throw ioe; + } } + } finally { + streamCache.checkIn(bados); } if (logger.isDebugEnabled()) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java index b5646009b0..21e2c0767b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java @@ -56,7 +56,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { this.compressed = compressed; this.fos = new FileOutputStream(file); - rawOutStream = new ByteCountingOutputStream(fos); + rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(fos)); this.uncompressedBlockSize = uncompressedBlockSize; this.idGenerator = idGenerator; } @@ -68,7 +68,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { this.compressed = compressed; this.uncompressedBlockSize = uncompressedBlockSize; - this.rawOutStream = new ByteCountingOutputStream(out); + this.rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(out)); this.idGenerator = idGenerator; } @@ -114,7 +114,6 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten(); final TocWriter tocWriter = getTocWriter(); - final OutputStream writableStream; if (compressed) { // because of the way that GZIPOutputStream works, we need to call close() on it in order for it // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap @@ -128,16 +127,16 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter { tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } - writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + final OutputStream writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536); + this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); } else { if (tocWriter != null && eventId != null) { tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId); } - writableStream = new BufferedOutputStream(rawOutStream, 65536); + this.byteCountingOut = rawOutStream; } - this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset); this.out = new DataOutputStream(byteCountingOut); resetDirtyFlag(); } catch (final IOException ioe) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index 22d2a5f07a..2de78f8a85 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory; public class WriteAheadStorePartition implements EventStorePartition { private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class); - private final RepositoryConfiguration config; private final File partitionDirectory; private final String partitionName; @@ -253,48 +252,43 @@ public class WriteAheadStorePartition implements EventStorePartition { final long nextEventId = idGenerator.get(); final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); + updatedWriter.writeHeader(nextEventId); - // Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has - // been fully initialized (i.e., the header has been written, etc.) - synchronized (updatedWriter) { - final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); - final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); + final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); + final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - if (updated) { - if (lease != null) { - lease.close(); - } + if (!updated) { + try { + updatedWriter.close(); + } catch (final Exception e) { + logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + } - updatedWriter.writeHeader(nextEventId); + updatedEventFile.delete(); + return false; + } - synchronized (minEventIdToPathMap) { - minEventIdToPathMap.put(nextEventId, updatedEventFile); - } + if (lease != null) { + lease.close(); + } - if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { - boolean offered = false; - while (!offered && !closed) { - try { - offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); - } - } - } + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.put(nextEventId, updatedEventFile); + } - return true; - } else { + if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { + boolean offered = false; + while (!offered && !closed) { try { - updatedWriter.close(); - } catch (final Exception e) { - logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); } - - updatedEventFile.delete(); - return false; } } + + return true; } private Map addEvents(final Iterable events, final RecordWriter writer) throws IOException { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java new file mode 100644 index 0000000000..23aefb3ce9 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.util; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +public class ByteArrayDataOutputStream { + private final ByteArrayOutputStream baos; + private final DataOutputStream dos; + + public ByteArrayDataOutputStream(final int initialCapacity) { + baos = new ByteArrayOutputStream(initialCapacity); + dos = new DataOutputStream(baos); + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return baos; + } + + public DataOutputStream getDataOutputStream() { + return dos; + } +} diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java new file mode 100644 index 0000000000..953559093f --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ByteArrayDataOutputStreamCache { + private final BlockingQueue queue; + private final int initialBufferSize; + private final int maxBufferSize; + + public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) { + this.queue = new LinkedBlockingQueue<>(maxCapacity); + this.initialBufferSize = initialBufferSize; + this.maxBufferSize = maxBufferSize; + } + + public ByteArrayDataOutputStream checkOut() { + final ByteArrayDataOutputStream stream = queue.poll(); + if (stream != null) { + return stream; + } + + return new ByteArrayDataOutputStream(initialBufferSize); + } + + public void checkIn(final ByteArrayDataOutputStream bados) { + final int size = bados.getByteArrayOutputStream().size(); + if (size > maxBufferSize) { + return; + } + + bados.getByteArrayOutputStream().reset(); + queue.offer(bados); + } +}