NIFI-4794: Updated event writers to avoid creating a lot of byte[] by reusing buffers. Also removed synchronization on EventWriter when rolling over the writer and just moved the writing of the header to happen before making the writer available to any other threads. This reduces thread contention during rollover.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2437
This commit is contained in:
Mark Payne 2018-01-25 12:16:56 -05:00 committed by Matthew Burgess
parent 0bcb241db3
commit 9f95a10df9
8 changed files with 261 additions and 96 deletions

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.repository.schema;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ByteArrayCache {
private final BlockingQueue<byte[]> queue;
private final int bufferSize;
public ByteArrayCache(final int maxCapacity, final int bufferSize) {
this.queue = new LinkedBlockingQueue<>(maxCapacity);
this.bufferSize = bufferSize;
}
public byte[] checkOut() {
final byte[] array = queue.poll();
if (array != null) {
return array;
}
return new byte[bufferSize];
}
public void checkIn(final byte[] array) {
if (array.length != bufferSize) {
return;
}
queue.offer(array);
}
}

View File

@ -34,39 +34,47 @@ public class SchemaRecordWriter {
public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class);
private static final int CACHE_BUFFER_SIZE = 65536;
private static final ByteArrayCache byteArrayCache = new ByteArrayCache(32, CACHE_BUFFER_SIZE);
public void writeRecord(final Record record, final OutputStream out) throws IOException {
// write sentinel value to indicate that there is a record. This allows the reader to then read one
// byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
// knows that it should be able to continue reading.
out.write(1);
writeRecordFields(record, out);
final byte[] buffer = byteArrayCache.checkOut();
try {
writeRecordFields(record, out, buffer);
} finally {
byteArrayCache.checkIn(buffer);
}
}
private void writeRecordFields(final Record record, final OutputStream out) throws IOException {
writeRecordFields(record, record.getSchema(), out);
private void writeRecordFields(final Record record, final OutputStream out, final byte[] buffer) throws IOException {
writeRecordFields(record, record.getSchema(), out, buffer);
}
private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out) throws IOException {
private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out, final byte[] buffer) throws IOException {
final DataOutputStream dos = out instanceof DataOutputStream ? (DataOutputStream) out : new DataOutputStream(out);
for (final RecordField field : schema.getFields()) {
final Object value = record.getFieldValue(field);
try {
writeFieldRepetitionAndValue(field, value, dos);
writeFieldRepetitionAndValue(field, value, dos, buffer);
} catch (final Exception e) {
throw new IOException("Failed to write field '" + field.getFieldName() + "'", e);
}
}
}
private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos) throws IOException {
private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos, final byte[] buffer) throws IOException {
switch (field.getRepetition()) {
case EXACTLY_ONE: {
if (value == null) {
throw new IllegalArgumentException("Record does not have a value for the '" + field.getFieldName() + "' but the field is required");
}
writeFieldValue(field, value, dos);
writeFieldValue(field, value, dos, buffer);
break;
}
case ZERO_OR_MORE: {
@ -83,7 +91,7 @@ public class SchemaRecordWriter {
final Collection<?> collection = (Collection<?>) value;
dos.writeInt(collection.size());
for (final Object fieldValue : collection) {
writeFieldValue(field, fieldValue, dos);
writeFieldValue(field, fieldValue, dos, buffer);
}
break;
}
@ -93,14 +101,25 @@ public class SchemaRecordWriter {
break;
}
dos.write(1);
writeFieldValue(field, value, dos);
writeFieldValue(field, value, dos, buffer);
break;
}
}
}
private boolean allSingleByteInUtf8(final String value) {
for (int i = 0; i < value.length(); i++) {
final char ch = value.charAt(i);
if (ch < 1 || ch > 127) {
return false;
}
}
return true;
}
@SuppressWarnings("unchecked")
private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out) throws IOException {
private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out, final byte[] buffer) throws IOException {
switch (field.getFieldType()) {
case BOOLEAN:
out.writeBoolean((boolean) value);
@ -120,9 +139,27 @@ public class SchemaRecordWriter {
writeUTFLimited(out, (String) value, field.getFieldName());
break;
case LONG_STRING:
final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8);
out.writeInt(charArray.length);
out.write(charArray);
// In many cases, we will see a String value that consists solely of values in the range of
// 1-127, which means that in UTF-8 they will translate into a single byte each. If all characters
// in the string adhere to this, then we can skip calling String.getBytes() because that will allocate
// a new byte[] every time, which results in a lot of pressure on the garbage collector.
final String string = (String) value;
final int length = string.length();
if (length <= buffer.length && allSingleByteInUtf8(string)) {
out.writeInt(length);
for (int i = 0; i < length; i++) {
final char ch = string.charAt(i);
buffer[i] = (byte) ch;
}
out.write(buffer, 0, length);
} else {
final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8);
out.writeInt(charArray.length);
out.write(charArray);
}
break;
case MAP:
final Map<Object, Object> map = (Map<Object, Object>) value;
@ -132,19 +169,19 @@ public class SchemaRecordWriter {
final RecordField valueField = subFields.get(1);
for (final Map.Entry<Object, Object> entry : map.entrySet()) {
writeFieldRepetitionAndValue(keyField, entry.getKey(), out);
writeFieldRepetitionAndValue(valueField, entry.getValue(), out);
writeFieldRepetitionAndValue(keyField, entry.getKey(), out, buffer);
writeFieldRepetitionAndValue(valueField, entry.getValue(), out, buffer);
}
break;
case UNION:
final NamedValue namedValue = (NamedValue) value;
writeUTFLimited(out, namedValue.getName(), field.getFieldName());
final Record childRecord = (Record) namedValue.getValue();
writeRecordFields(childRecord, out);
writeRecordFields(childRecord, out, buffer);
break;
case COMPLEX:
final Record record = (Record) value;
writeRecordFields(record, out);
writeRecordFields(record, out, buffer);
break;
}
}

View File

@ -23,42 +23,29 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.schema.LookupTableEventRecord;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EncryptedSchemaRecordReader extends EventIdFirstSchemaRecordReader {
private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReader.class);
private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000;
private ProvenanceEventEncryptor provenanceEventEncryptor;
private static final TimedBuffer<TimestampedLong> decryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private int debugFrequency = DEFAULT_DEBUG_FREQUENCY;
public static final int SERIALIZATION_VERSION = 1;
public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars,
ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException {
this(inputStream, filename, tocReader, maxAttributeChars, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY);
}
public EncryptedSchemaRecordReader(final InputStream inputStream, final String filename, final TocReader tocReader, final int maxAttributeChars,
ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException {
ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException {
super(inputStream, filename, tocReader, maxAttributeChars);
this.provenanceEventEncryptor = provenanceEventEncryptor;
this.debugFrequency = debugFrequency;
}
@Override

View File

@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.schema.EventFieldNames;
import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
import org.apache.nifi.provenance.schema.LookupTableEventRecord;
@ -36,6 +37,8 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema;
import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.provenance.util.ByteArrayDataOutputStream;
import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache;
import org.apache.nifi.repository.schema.FieldMapRecord;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordSchema;
@ -73,6 +76,8 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
private static final TimedBuffer<TimestampedLong> bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
private static final AtomicLong totalRecordCount = new AtomicLong(0L);
private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024);
private long firstEventId;
private long systemTimeOffset;
@ -113,39 +118,43 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
}
final long serializeStart = System.nanoTime();
final byte[] serialized;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
final DataOutputStream dos = new DataOutputStream(baos)) {
writeRecord(record, 0L, dos);
serialized = baos.toByteArray();
}
final long lockStart = System.nanoTime();
final long lockStart;
final long writeStart;
final long startBytes;
final long endBytes;
final long recordIdentifier;
synchronized (this) {
writeStart = System.nanoTime();
try {
recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
startBytes = getBytesWritten();
ensureStreamState(recordIdentifier, startBytes);
final long serializeStart = System.nanoTime();
final ByteArrayDataOutputStream bados = streamCache.checkOut();
try {
writeRecord(record, 0L, bados.getDataOutputStream());
final DataOutputStream out = getBufferedOutputStream();
final int recordIdOffset = (int) (recordIdentifier - firstEventId);
out.writeInt(recordIdOffset);
out.writeInt(serialized.length);
out.write(serialized);
lockStart = System.nanoTime();
synchronized (this) {
writeStart = System.nanoTime();
try {
recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
startBytes = getBytesWritten();
recordCount.incrementAndGet();
endBytes = getBytesWritten();
} catch (final IOException ioe) {
markDirty();
throw ioe;
ensureStreamState(recordIdentifier, startBytes);
final DataOutputStream out = getBufferedOutputStream();
final int recordIdOffset = (int) (recordIdentifier - firstEventId);
out.writeInt(recordIdOffset);
final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
out.writeInt(baos.size());
baos.writeTo(out);
recordCount.incrementAndGet();
endBytes = getBytesWritten();
} catch (final IOException ioe) {
markDirty();
throw ioe;
}
}
} finally {
streamCache.checkIn(bados);
}
if (logger.isDebugEnabled()) {

View File

@ -56,7 +56,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
this.compressed = compressed;
this.fos = new FileOutputStream(file);
rawOutStream = new ByteCountingOutputStream(fos);
rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(fos));
this.uncompressedBlockSize = uncompressedBlockSize;
this.idGenerator = idGenerator;
}
@ -68,7 +68,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
this.compressed = compressed;
this.uncompressedBlockSize = uncompressedBlockSize;
this.rawOutStream = new ByteCountingOutputStream(out);
this.rawOutStream = new ByteCountingOutputStream(new BufferedOutputStream(out));
this.idGenerator = idGenerator;
}
@ -114,7 +114,6 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
final TocWriter tocWriter = getTocWriter();
final OutputStream writableStream;
if (compressed) {
// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
@ -128,16 +127,16 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
}
writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
final OutputStream writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
} else {
if (tocWriter != null && eventId != null) {
tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
}
writableStream = new BufferedOutputStream(rawOutStream, 65536);
this.byteCountingOut = rawOutStream;
}
this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
this.out = new DataOutputStream(byteCountingOut);
resetDirtyFlag();
} catch (final IOException ioe) {

View File

@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory;
public class WriteAheadStorePartition implements EventStorePartition {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class);
private final RepositoryConfiguration config;
private final File partitionDirectory;
private final String partitionName;
@ -253,48 +252,43 @@ public class WriteAheadStorePartition implements EventStorePartition {
final long nextEventId = idGenerator.get();
final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov");
final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true);
updatedWriter.writeHeader(nextEventId);
// Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has
// been fully initialized (i.e., the header has been written, etc.)
synchronized (updatedWriter) {
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount());
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount());
final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease);
if (updated) {
if (lease != null) {
lease.close();
}
if (!updated) {
try {
updatedWriter.close();
} catch (final Exception e) {
logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e);
}
updatedWriter.writeHeader(nextEventId);
updatedEventFile.delete();
return false;
}
synchronized (minEventIdToPathMap) {
minEventIdToPathMap.put(nextEventId, updatedEventFile);
}
if (lease != null) {
lease.close();
}
if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) {
boolean offered = false;
while (!offered && !closed) {
try {
offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression");
}
}
}
synchronized (minEventIdToPathMap) {
minEventIdToPathMap.put(nextEventId, updatedEventFile);
}
return true;
} else {
if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) {
boolean offered = false;
while (!offered && !closed) {
try {
updatedWriter.close();
} catch (final Exception e) {
logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e);
offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression");
}
updatedEventFile.delete();
return false;
}
}
return true;
}
private Map<ProvenanceEventRecord, StorageSummary> addEvents(final Iterable<ProvenanceEventRecord> events, final RecordWriter writer) throws IOException {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.util;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
public class ByteArrayDataOutputStream {
private final ByteArrayOutputStream baos;
private final DataOutputStream dos;
public ByteArrayDataOutputStream(final int initialCapacity) {
baos = new ByteArrayOutputStream(initialCapacity);
dos = new DataOutputStream(baos);
}
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
}
public DataOutputStream getDataOutputStream() {
return dos;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ByteArrayDataOutputStreamCache {
private final BlockingQueue<ByteArrayDataOutputStream> queue;
private final int initialBufferSize;
private final int maxBufferSize;
public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) {
this.queue = new LinkedBlockingQueue<>(maxCapacity);
this.initialBufferSize = initialBufferSize;
this.maxBufferSize = maxBufferSize;
}
public ByteArrayDataOutputStream checkOut() {
final ByteArrayDataOutputStream stream = queue.poll();
if (stream != null) {
return stream;
}
return new ByteArrayDataOutputStream(initialBufferSize);
}
public void checkIn(final ByteArrayDataOutputStream bados) {
final int size = bados.getByteArrayOutputStream().size();
if (size > maxBufferSize) {
return;
}
bados.getByteArrayOutputStream().reset();
queue.offer(bados);
}
}