NIFI-5533: Be more efficient with heap utilization

- Updated FlowFile Repo / Write Ahead Log so that any update that writes more than 1 MB of data is written to a file inside the FlowFile Repo rather than being buffered in memory
 - Update SplitText so that it does not hold FlowFiles that are not the latest version in heap. Doing them from being garbage collected, so while the Process Session is holding the latest version of the FlowFile, SplitText is holding an older version, and this results in two copies of the same FlowFile object

NIFI-5533: Checkpoint

NIFI-5533: Bug Fixes

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2974
This commit is contained in:
Mark Payne 2018-08-17 14:08:14 -04:00 committed by Matthew Burgess
parent c87d791938
commit c425bd2880
21 changed files with 827 additions and 225 deletions

View File

@ -16,12 +16,6 @@
*/
package org.apache.nifi.attribute.expression.language;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.evaluation.Evaluator;
import org.apache.nifi.attribute.expression.language.evaluation.literals.StringLiteralEvaluator;
import org.apache.nifi.attribute.expression.language.evaluation.selection.AllAttributesEvaluator;
@ -34,7 +28,14 @@ import org.apache.nifi.attribute.expression.language.evaluation.selection.MultiN
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class StandardPreparedQuery implements PreparedQuery {
private static final String EMPTY_STRING = "";
private final List<Expression> expressions;
private volatile VariableImpact variableImpact;
@ -45,6 +46,14 @@ public class StandardPreparedQuery implements PreparedQuery {
@Override
public String evaluateExpressions(final Map<String, String> valMap, final AttributeValueDecorator decorator, final Map<String, String> stateVariables) throws ProcessException {
if (expressions.isEmpty()) {
return EMPTY_STRING;
}
if (expressions.size() == 1) {
final String evaluated = expressions.get(0).evaluate(valMap, decorator, stateVariables);
return evaluated == null ? EMPTY_STRING : evaluated;
}
final StringBuilder sb = new StringBuilder();
for (final Expression expression : expressions) {

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.repository.schema;
import java.io.Closeable;
import java.io.IOException;
public interface RecordIterator extends Closeable {
Record next() throws IOException;
boolean isNext() throws IOException;
}

View File

@ -17,8 +17,11 @@
package org.apache.nifi.repository.schema;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -30,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
public class SchemaRecordReader {
private final RecordSchema schema;
@ -56,15 +58,24 @@ public class SchemaRecordReader {
}
public Record readRecord(final InputStream in) throws IOException {
final int sentinelByte = in.read();
if (sentinelByte < 0) {
final int recordIndicator = in.read();
if (recordIndicator < 0) {
return null;
}
if (sentinelByte != 1) {
throw new IOException("Expected to read a Sentinel Byte of '1' but got a value of '" + sentinelByte + "' instead");
if (recordIndicator == SchemaRecordWriter.EXTERNAL_FILE_INDICATOR) {
throw new IOException("Expected to read a Sentinel Byte of '1' indicating that the next record is inline but the Sentinel value was '" + SchemaRecordWriter.EXTERNAL_FILE_INDICATOR
+ ", indicating that data was written to an External File. This data cannot be recovered via calls to #readRecord(InputStream) but must be recovered via #readRecords(InputStream)");
}
if (recordIndicator != 1) {
throw new IOException("Expected to read a Sentinel Byte of '1' but got a value of '" + recordIndicator + "' instead");
}
return readInlineRecord(in);
}
private Record readInlineRecord(final InputStream in) throws IOException {
final List<RecordField> schemaFields = schema.getFields();
final Map<RecordField, Object> fields = new HashMap<>(schemaFields.size());
@ -76,6 +87,53 @@ public class SchemaRecordReader {
return new FieldMapRecord(fields, schema);
}
public RecordIterator readRecords(final InputStream in) throws IOException {
final int recordIndicator = in.read();
if (recordIndicator < 0) {
return null;
}
if (recordIndicator == SchemaRecordWriter.INLINE_RECORD_INDICATOR) {
final Record nextRecord = readInlineRecord(in);
return new SingleRecordIterator(nextRecord);
}
if (recordIndicator != SchemaRecordWriter.EXTERNAL_FILE_INDICATOR) {
throw new IOException("Expected to read a Sentinel Byte of '" + SchemaRecordWriter.INLINE_RECORD_INDICATOR + "' or '" + SchemaRecordWriter.EXTERNAL_FILE_INDICATOR
+ "' but encountered a value of '" + recordIndicator + "' instead");
}
final DataInputStream dis = new DataInputStream(in);
final String externalFilename = dis.readUTF();
final File externalFile = new File(externalFilename);
final FileInputStream fis = new FileInputStream(externalFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final RecordIterator recordIterator = new RecordIterator() {
@Override
public Record next() throws IOException {
return readRecord(bufferedIn);
}
@Override
public boolean isNext() throws IOException {
bufferedIn.mark(1);
final int nextByte = bufferedIn.read();
bufferedIn.reset();
return (nextByte > -1);
}
@Override
public void close() throws IOException {
bufferedIn.close();
}
};
return recordIterator;
}
private Object readField(final InputStream in, final RecordField field) throws IOException {
switch (field.getRepetition()) {

View File

@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
@ -30,6 +31,8 @@ import java.util.List;
import java.util.Map;
public class SchemaRecordWriter {
static final int INLINE_RECORD_INDICATOR = 1;
static final int EXTERNAL_FILE_INDICATOR = 8;
public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
@ -41,7 +44,7 @@ public class SchemaRecordWriter {
// write sentinel value to indicate that there is a record. This allows the reader to then read one
// byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
// knows that it should be able to continue reading.
out.write(1);
out.write(INLINE_RECORD_INDICATOR);
final byte[] buffer = byteArrayCache.checkOut();
try {
@ -226,4 +229,8 @@ public class SchemaRecordWriter {
return charsInOriginal;
}
public void writeExternalFileReference(final DataOutputStream out, final File externalFile) throws IOException {
out.write(EXTERNAL_FILE_INDICATOR);
out.writeUTF(externalFile.getAbsolutePath());
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.repository.schema;
public class SingleRecordIterator implements RecordIterator {
private final Record record;
private boolean consumed = false;
public SingleRecordIterator(final Record record) {
this.record = record;
}
@Override
public Record next() {
if (consumed) {
return null;
}
consumed = true;
return record;
}
@Override
public void close() {
}
@Override
public boolean isNext() {
return !consumed;
}
}

View File

@ -39,15 +39,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class);
private static final int DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES = 5 * 1024 * 1024; // 5 MB
private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0);
private static final int JOURNAL_ENCODING_VERSION = 1;
private static final byte TRANSACTION_FOLLOWS = 64;
@ -55,9 +59,11 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
private static final int NUL_BYTE = 0;
private final File journalFile;
private final File overflowDirectory;
private final long initialTransactionId;
private final SerDeFactory<T> serdeFactory;
private final ObjectPool<ByteArrayDataOutputStream> streamPool;
private final int maxInHeapSerializationBytes;
private SerDe<T> serde;
private FileOutputStream fileOut;
@ -72,13 +78,56 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block
public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId) {
this(journalFile, serdeFactory, streamPool, initialTransactionId, DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES);
}
public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId,
final int maxInHeapSerializationBytes) {
this.journalFile = journalFile;
this.overflowDirectory = new File(journalFile.getParentFile(), "overflow-" + getBaseFilename(journalFile));
this.serdeFactory = serdeFactory;
this.serde = serdeFactory.createSerDe(null);
this.streamPool = streamPool;
this.initialTransactionId = initialTransactionId;
this.currentTransactionId = initialTransactionId;
this.maxInHeapSerializationBytes = maxInHeapSerializationBytes;
}
public void dispose() {
logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", journalFile.getName());
if (!journalFile.delete() && journalFile.exists()) {
logger.warn("Unable to delete expired journal file " + journalFile + "; this file should be deleted manually.");
}
if (overflowDirectory.exists()) {
final File[] overflowFiles = overflowDirectory.listFiles();
if (overflowFiles == null) {
logger.warn("Unable to obtain listing of files that exist in 'overflow directory' " + overflowDirectory
+ " - this directory and any files within it can now be safely removed manually");
return;
}
for (final File overflowFile : overflowFiles) {
if (!overflowFile.delete() && overflowFile.exists()) {
logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow file' " + overflowFile + " - this file should be removed manually");
}
}
if (!overflowDirectory.delete()) {
logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow directory' " + overflowDirectory + " - this file should be removed manually");
}
}
}
private static String getBaseFilename(final File file) {
final String name = file.getName();
final int index = name.lastIndexOf(".");
if (index < 0) {
return name;
}
return name.substring(0, index);
}
private synchronized OutputStream getOutputStream() throws FileNotFoundException {
@ -181,12 +230,64 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
checkState();
File overflowFile = null;
final ByteArrayDataOutputStream bados = streamPool.borrowObject();
try {
for (final T record : records) {
final Object recordId = serde.getRecordIdentifier(record);
final T previousRecordState = recordLookup.lookup(recordId);
serde.serializeEdit(previousRecordState, record, bados.getDataOutputStream());
FileOutputStream overflowFileOut = null;
try {
DataOutputStream dataOut = bados.getDataOutputStream();
for (final T record : records) {
final Object recordId = serde.getRecordIdentifier(record);
final T previousRecordState = recordLookup.lookup(recordId);
serde.serializeEdit(previousRecordState, record, dataOut);
final int size = bados.getByteArrayOutputStream().size();
if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) {
if (!overflowDirectory.exists()) {
Files.createDirectory(overflowDirectory.toPath());
}
// If we have exceeded our threshold for how much to serialize in memory,
// flush the in-memory representation to an 'overflow file' and then update
// the Data Output Stream that is used to write to the file also.
overflowFile = new File(overflowDirectory, UUID.randomUUID().toString());
logger.debug("Length of update with {} records exceeds in-memory max of {} bytes. Overflowing to {}", records.size(), maxInHeapSerializationBytes, overflowFile);
overflowFileOut = new FileOutputStream(overflowFile);
bados.getByteArrayOutputStream().writeTo(overflowFileOut);
bados.getByteArrayOutputStream().reset();
// change dataOut to point to the File's Output Stream so that all subsequent records are written to the file.
dataOut = new DataOutputStream(new BufferedOutputStream(overflowFileOut));
// We now need to write to the ByteArrayOutputStream a pointer to the overflow file
// so that what is written to the actual journal is that pointer.
serde.writeExternalFileReference(overflowFile, bados.getDataOutputStream());
}
}
dataOut.flush();
// If we overflowed to an external file, we need to be sure that we sync to disk before
// updating the Journal. Otherwise, we could get to a state where the Journal was flushed to disk without the
// external file being flushed. This would result in a missed update to the FlowFile Repository.
if (overflowFileOut != null) {
if (logger.isDebugEnabled()) { // avoid calling File.length() if not necessary
logger.debug("Length of update to overflow file is {} bytes", overflowFile.length());
}
overflowFileOut.getFD().sync();
}
} finally {
if (overflowFileOut != null) {
try {
overflowFileOut.close();
} catch (final Exception e) {
logger.warn("Failed to close open file handle to overflow file {}", overflowFile, e);
}
}
}
final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
@ -210,12 +311,20 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size());
} catch (final Throwable t) {
poison(t);
if (overflowFile != null) {
if (!overflowFile.delete() && overflowFile.exists()) {
logger.warn("Failed to cleanup temporary overflow file " + overflowFile + " - this file should be cleaned up manually.");
}
}
throw t;
} finally {
streamPool.returnObject(bados);
}
}
private void checkState() throws IOException {
if (poisoned) {
throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. "
@ -335,7 +444,7 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn);
final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn);
while (transactionByteCountingIn.getBytesConsumed() < transactionLength) {
while (transactionByteCountingIn.getBytesConsumed() < transactionLength || serde.isMoreInExternalFile()) {
final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion());
// Update our RecordMap so that we have the most up-to-date version of the Record.

View File

@ -300,10 +300,8 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
snapshot.writeSnapshot(snapshotCapture);
for (final File existingJournal : existingJournals) {
logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", existingJournal.getName());
if (!existingJournal.delete() && existingJournal.exists()) {
logger.warn("Unable to delete expired journal file " + existingJournal + "; this file should be deleted manually.");
}
final WriteAheadJournal journal = new LengthDelimitedJournal<>(existingJournal, serdeFactory, streamPool, nextTransactionId);
journal.dispose();
}
final long totalNanos = System.nanoTime() - startNanos;

View File

@ -53,4 +53,9 @@ public interface WriteAheadJournal<T> extends Closeable {
* @return <code>true</code> if the journal is healthy and can be written to, <code>false</code> if either the journal has been closed or is poisoned
*/
boolean isHealthy();
/**
* Destroys any resources that the journal occupies
*/
void dispose();
}

View File

@ -1103,6 +1103,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final S record;
try {
record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
if (record == null) {
throw new EOFException();
}
} catch (final EOFException eof) {
throw eof;
} catch (final Exception e) {

View File

@ -18,6 +18,7 @@ package org.wali;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
@ -151,4 +152,37 @@ public interface SerDe<T> {
*/
default void close() throws IOException {
}
/**
* Optional method that a SerDe can support that indicates that the contents of the next update should be found
* in the given external File.
*
* @param externalFile the file that contains the update information
* @param out the DataOutputStream to write the external file reference to
* @throws IOException if unable to write the update
* @throws UnsupportedOperationException if this SerDe does not support this operation
*/
default void writeExternalFileReference(File externalFile, DataOutputStream out) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Indicates whether or not a call to {@link #writeExternalFileReference(File, DataOutputStream)} is valid for this implementation
* @return <code>true</code> if calls to {@link #writeExternalFileReference(File, DataOutputStream)} are supported, <code>false</code> if calling
* the method will result in an {@link UnsupportedOperationException} being thrown.
*/
default boolean isWriteExternalFileReferenceSupported() {
return false;
}
/**
* If the last call to read data from this SerDe resulted in data being read from an External File, and there is more data in that External File,
* then this method will return <code>true</code>. Otherwise, it will return <code>false</code>.
*
* @return <code>true</code> if more data available in External File, <code>false</code> otherwise.
* @throws IOException if unable to read from External File to determine data availability
*/
default boolean isMoreInExternalFile() throws IOException {
return false;
}
}

View File

@ -89,7 +89,7 @@ public interface WriteAheadRepository<T> {
* <p>
* Recovers all External Swap locations that were persisted. If this method
* is to be called, it must be called AFTER {@link #recoverRecords()} and
* BEFORE {@link update}.
* BEFORE {@link #update(Collection, boolean)}}.
* </p>
*
* @return swap location

View File

@ -17,10 +17,17 @@
package org.apache.nifi.wali;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
import java.io.File;
import java.io.IOException;
@ -38,22 +45,69 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestSequentialAccessWriteAheadLog {
@Rule
public TestName testName = new TestName();
@Test
public void testUpdateWithExternalFile() throws IOException {
final DummyRecordSerde serde = new DummyRecordSerde();
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 350_000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.shutdown();
assertEquals(1, serde.getExternalFileReferences().size());
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testUpdateWithExternalFileFollowedByInlineUpdate() throws IOException {
final DummyRecordSerde serde = new DummyRecordSerde();
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 350_000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
final DummyRecord subsequentRecord = new DummyRecord("350001", UpdateType.CREATE);
repo.update(Collections.singleton(subsequentRecord), false);
repo.shutdown();
assertEquals(1, serde.getExternalFileReferences().size());
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
final Set<DummyRecord> expectedRecords = new HashSet<>(records);
expectedRecords.add(subsequentRecord);
assertEquals(expectedRecords, new HashSet<>(recovered));
}
@Test
public void testRecoverWithNoCheckpoint() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
@ -145,12 +199,15 @@ public class TestSequentialAccessWriteAheadLog {
}
private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() throws IOException {
return createWriteRepo(new DummyRecordSerde());
}
private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo(final DummyRecordSerde serde) throws IOException {
final File targetDir = new File("target");
final File storageDir = new File(targetDir, testName.getMethodName());
deleteRecursively(storageDir);
assertTrue(storageDir.mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);

View File

@ -16,17 +16,31 @@
*/
package org.wali;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
public class DummyRecordSerde implements SerDe<DummyRecord> {
private static final int INLINE_RECORD_INDICATOR = 1;
private static final int EXTERNAL_FILE_INDICATOR = 8;
private int throwIOEAfterNserializeEdits = -1;
private int throwOOMEAfterNserializeEdits = -1;
private int serializeEditCount = 0;
private final Set<File> externalFilesWritten = new HashSet<>();
private Queue<DummyRecord> externalRecords;
@SuppressWarnings("fallthrough")
@Override
public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
@ -37,6 +51,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME");
}
out.write(INLINE_RECORD_INDICATOR);
out.writeUTF(record.getUpdateType().name());
out.writeUTF(record.getId());
@ -72,6 +87,57 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
@Override
@SuppressWarnings("fallthrough")
public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
if (externalRecords != null) {
final DummyRecord record = externalRecords.poll();
if (record != null) {
return record;
}
externalRecords = null;
}
final int recordLocationIndicator = in.read();
if (recordLocationIndicator == EXTERNAL_FILE_INDICATOR) {
final String externalFilename = in.readUTF();
final File externalFile = new File(externalFilename);
try (final InputStream fis = new FileInputStream(externalFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream dis = new DataInputStream(bufferedIn)) {
externalRecords = new LinkedBlockingQueue<>();
DummyRecord record;
while ((record = deserializeRecordInline(dis, version, true)) != null) {
externalRecords.offer(record);
}
return externalRecords.poll();
}
} else if (recordLocationIndicator == INLINE_RECORD_INDICATOR) {
return deserializeRecordInline(in, version, false);
} else {
throw new IOException("Encountered invalid record location indicator: " + recordLocationIndicator);
}
}
@Override
public boolean isMoreInExternalFile() {
return externalRecords != null && !externalRecords.isEmpty();
}
private DummyRecord deserializeRecordInline(final DataInputStream in, final int version, final boolean expectInlineRecordIndicator) throws IOException {
if (expectInlineRecordIndicator) {
final int locationIndicator = in.read();
if (locationIndicator < 0) {
return null;
}
if (locationIndicator != INLINE_RECORD_INDICATOR) {
throw new IOException("Expected inline record indicator but encountered " + locationIndicator);
}
}
final String updateTypeName = in.readUTF();
final UpdateType updateType = UpdateType.valueOf(updateTypeName);
final String id = in.readUTF();
@ -135,4 +201,21 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
public String getLocation(final DummyRecord record) {
return record.getSwapLocation();
}
@Override
public boolean isWriteExternalFileReferenceSupported() {
return true;
}
@Override
public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
out.write(EXTERNAL_FILE_INDICATOR);
out.writeUTF(externalFile.getAbsolutePath());
externalFilesWritten.add(externalFile);
}
public Set<File> getExternalFileReferences() {
return Collections.unmodifiableSet(externalFilesWritten);
}
}

View File

@ -17,6 +17,10 @@
package org.apache.nifi.schema.access;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
@ -26,10 +30,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;
public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
@Override
@ -53,11 +53,21 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
}
}
String schemaText = avroSchemaTextCache.get(schema);
String schemaText;
synchronized (avroSchemaTextCache) {
schemaText = avroSchemaTextCache.get(schema);
}
if (schemaText == null) {
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
schemaText = avroSchema.toString();
avroSchemaTextCache.put(schema, schemaText);
synchronized (avroSchemaTextCache) {
final String existing = avroSchemaTextCache.putIfAbsent(schema, schemaText);
if (existing != null) {
schemaText = existing;
}
}
}
return Collections.singletonMap("avro.schema", schemaText);

View File

@ -17,12 +17,6 @@
package org.apache.nifi.controller.repository;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@ -34,6 +28,7 @@ import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
import org.apache.nifi.repository.schema.FieldType;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordIterator;
import org.apache.nifi.repository.schema.RecordSchema;
import org.apache.nifi.repository.schema.Repetition;
import org.apache.nifi.repository.schema.SchemaRecordReader;
@ -43,6 +38,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
private static final int MAX_ENCODING_VERSION = 2;
@ -51,7 +53,8 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
private final ResourceClaimManager resourceClaimManager;
private volatile RecordSchema recoverySchema;
private volatile SchemaRecordReader reader;
private RecordIterator recordIterator = null;
public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
this.resourceClaimManager = resourceClaimManager;
@ -101,7 +104,8 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
@Override
public void readHeader(final DataInputStream in) throws IOException {
recoverySchema = RecordSchema.readFrom(in);
final RecordSchema recoverySchema = RecordSchema.readFrom(in);
reader = SchemaRecordReader.fromSchema(recoverySchema);
}
@Override
@ -120,8 +124,41 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
@Override
public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
final Record updateRecord = reader.readRecord(in);
if (recordIterator != null) {
final RepositoryRecord record = nextRecord();
if (record != null) {
return record;
}
recordIterator.close();
}
recordIterator = reader.readRecords(in);
if (recordIterator == null) {
return null;
}
return nextRecord();
}
private RepositoryRecord nextRecord() throws IOException {
final Record record;
try {
record = recordIterator.next();
} catch (final Exception e) {
recordIterator.close();
recordIterator = null;
throw e;
}
if (record == null) {
return null;
}
return createRepositoryRecord(record);
}
private RepositoryRecord createRepositoryRecord(final Record updateRecord) throws IOException {
if (updateRecord == null) {
// null may be returned by reader.readRecord() if it encounters end-of-stream
return null;
@ -246,4 +283,18 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
return MAX_ENCODING_VERSION;
}
@Override
public boolean isWriteExternalFileReferenceSupported() {
return true;
}
@Override
public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
new SchemaRecordWriter().writeExternalFileReference(out, externalFile);
}
@Override
public boolean isMoreInExternalFile() throws IOException {
return recordIterator != null && recordIterator.isNext();
}
}

View File

@ -578,7 +578,7 @@ public class FileSystemRepository implements ContentRepository {
}
final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
final String section = String.valueOf(modulatedSectionIndex);
final String section = String.valueOf(modulatedSectionIndex).intern();
final String claimId = System.currentTimeMillis() + "-" + currentIndex;
resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true);

View File

@ -68,6 +68,7 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -98,6 +99,11 @@ import java.util.stream.Collectors;
* <p/>
*/
public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
private static final int SOURCE_EVENT_BIT_INDEXES = (1 << ProvenanceEventType.CREATE.ordinal())
| (1 << ProvenanceEventType.FORK.ordinal())
| (1 << ProvenanceEventType.JOIN.ordinal())
| (1 << ProvenanceEventType.RECEIVE.ordinal())
| (1 << ProvenanceEventType.FETCH.ordinal());
private static final AtomicLong idGenerator = new AtomicLong(0L);
private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
@ -110,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims");
private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new ConcurrentHashMap<>();
@ -253,7 +259,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
List<ProvenanceEventRecord> autoTerminatedEvents = null;
// validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
final Map<Long, StandardRepositoryRecord> toAdd = new HashMap<>();
for (final StandardRepositoryRecord record : records.values()) {
if (record.isMarkedForDelete()) {
continue;
@ -317,7 +323,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship());
// put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException
toAdd.put(clone, newRecord);
toAdd.put(clone.getId(), newRecord);
}
}
}
@ -365,10 +371,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* points to the Original Claim -- which has already been removed!
*
*/
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final StandardRepositoryRecord record = entry.getValue();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
@ -380,10 +383,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// an issue if we only updated the FlowFile attributes.
decrementClaimCount(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
if (LOG.isInfoEnabled()) {
final FlowFileRecord flowFile = record.getCurrent();
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
}
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
// records which have been updated - remove original if exists
decrementClaimCount(record.getOriginalClaim());
@ -544,10 +551,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFileEvent.setFlowFilesSent(flowFilesSent);
flowFileEvent.setBytesSent(bytesSent);
final long now = System.currentTimeMillis();
long lineageMillis = 0L;
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final long lineageDuration = System.currentTimeMillis() - flowFile.getLineageStartDate();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
final FlowFile flowFile = record.getCurrent();
final long lineageDuration = now - flowFile.getLineageStartDate();
lineageMillis += lineageDuration;
}
flowFileEvent.setAggregateLineageMillis(lineageMillis);
@ -566,13 +574,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private Map<String, Long> combineCounters(final Map<String, Long> first, final Map<String, Long> second) {
if (first == null && second == null) {
final boolean firstEmpty = first == null || first.isEmpty();
final boolean secondEmpty = second == null || second.isEmpty();
if (firstEmpty && secondEmpty) {
return null;
}
if (first == null) {
if (firstEmpty) {
return second;
}
if (second == null) {
if (secondEmpty) {
return first;
}
@ -582,14 +593,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return combined;
}
private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
Set<ProvenanceEventType> eventTypes = map.get(id);
if (eventTypes == null) {
eventTypes = new HashSet<>();
map.put(id, eventTypes);
}
private void addEventType(final Map<String, BitSet> map, final String id, final ProvenanceEventType eventType) {
final BitSet eventTypes = map.computeIfAbsent(id, key -> new BitSet());
eventTypes.set(eventType.ordinal());
}
eventTypes.add(eventType);
private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
return records.get(flowFile.getId());
}
private void updateProvenanceRepo(final Checkpoint checkpoint) {
@ -600,7 +610,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
// for this, so that we are able to ensure that the events are submitted in the proper order.
final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>();
final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>();
final Map<String, BitSet> eventTypesPerFlowFileId = new HashMap<>();
final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;
@ -613,7 +623,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ProvenanceEventBuilder builder = entry.getValue();
final FlowFile flowFile = entry.getKey();
updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile));
updateEventContentClaims(builder, flowFile, checkpoint.getRecord(flowFile));
final ProvenanceEventRecord event = builder.build();
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
@ -692,14 +702,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
if (checkpoint.createdFlowFiles.contains(flowFileId)) {
final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
final BitSet registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
boolean creationEventRegistered = false;
if (registeredTypes != null) {
if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)
|| registeredTypes.contains(ProvenanceEventType.FETCH)) {
if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal())
|| registeredTypes.get(ProvenanceEventType.FORK.ordinal())
|| registeredTypes.get(ProvenanceEventType.JOIN.ordinal())
|| registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal())
|| registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) {
creationEventRegistered = true;
}
}
@ -802,7 +813,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) {
verifyTaskActive();
final StandardRepositoryRecord repoRecord = records.get(flowFile);
final StandardRepositoryRecord repoRecord = getRecord(flowFile);
if (repoRecord == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
}
@ -839,12 +850,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private StandardProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records,
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<Long, StandardRepositoryRecord> records,
final boolean updateAttributes, final long commitNanos) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
final StandardRepositoryRecord repoRecord = records.get(eventFlowFile);
final StandardRepositoryRecord repoRecord = records.get(eventFlowFile.getId());
if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
final ContentClaim currentClaim = repoRecord.getCurrentClaim();
@ -910,7 +921,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* @param records records
* @return true if spurious route
*/
private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) {
private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<Long, StandardRepositoryRecord> records) {
if (event.getEventType() == ProvenanceEventType.ROUTE) {
final String relationshipName = event.getRelationship();
final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
@ -919,10 +930,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
// as it may be cloning the FlowFile and adding to multiple connections.
if (connectionsForRelationship.size() == 1) {
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) {
final FlowFileRecord flowFileRecord = entry.getKey();
for (final StandardRepositoryRecord repoRecord : records.values()) {
final FlowFileRecord flowFileRecord = repoRecord.getCurrent();
if (event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) {
final StandardRepositoryRecord repoRecord = entry.getValue();
if (repoRecord.getOriginalQueue() == null) {
return false;
}
@ -1077,35 +1087,35 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StringBuilder details = new StringBuilder(1024).append("[");
final int initLen = details.length();
int filesListed = 0;
for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) {
for (StandardRepositoryRecord repoRecord : records.values()) {
if (filesListed >= MAX_ROLLBACK_FLOWFILES_TO_LOG) {
break;
}
filesListed++;
final FlowFileRecord entryKey = entry.getKey();
final StandardRepositoryRecord entryValue = entry.getValue();
if (details.length() > initLen) {
details.append(", ");
}
if (entryValue.getOriginalQueue() != null && entryValue.getOriginalQueue().getIdentifier() != null) {
if (repoRecord.getOriginalQueue() != null && repoRecord.getOriginalQueue().getIdentifier() != null) {
details.append("queue=")
.append(entryValue.getOriginalQueue().getIdentifier())
.append(repoRecord.getOriginalQueue().getIdentifier())
.append("/");
}
details.append("filename=")
.append(entryKey.getAttribute(CoreAttributes.FILENAME.key()))
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.FILENAME.key()))
.append("/uuid=")
.append(entryKey.getAttribute(CoreAttributes.UUID.key()));
.append(repoRecord.getCurrent().getAttribute(CoreAttributes.UUID.key()));
}
if (records.entrySet().size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
if (records.size() > MAX_ROLLBACK_FLOWFILES_TO_LOG) {
if (details.length() > initLen) {
details.append(", ");
}
details.append(records.entrySet().size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
details.append(records.size() - MAX_ROLLBACK_FLOWFILES_TO_LOG)
.append(" additional Flowfiles not listed");
} else if (filesListed == 0) {
details.append("none");
}
details.append("]");
return details.toString();
}
@ -1216,7 +1226,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
if (record == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
}
@ -1275,8 +1285,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final FlowFile flowFile : flowFiles) {
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final StandardRepositoryRecord repoRecord = this.records.remove(flowFile);
newOwner.records.put(flowFileRecord, repoRecord);
final StandardRepositoryRecord repoRecord = this.records.remove(flowFile.getId());
newOwner.records.put(flowFileRecord.getId(), repoRecord);
// Adjust the counts for Connections for each FlowFile that was pulled from a Connection.
// We do not have to worry about accounting for 'input counts' on connections because those
@ -1348,9 +1358,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Set<String> modifiedFlowFileIds = new HashSet<>();
int largestTransferSetSize = 0;
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
for (final Map.Entry<Long, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final StandardRepositoryRecord record = entry.getValue();
final FlowFile flowFile = record.getCurrent();
final Relationship relationship = record.getTransferRelationship();
if (Relationship.SELF.equals(relationship)) {
@ -1479,7 +1489,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) {
final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
records.put(flowFile, record);
records.put(flowFile.getId(), record);
flowFilesIn++;
contentSizeIn += flowFile.getSize();
@ -1655,16 +1665,17 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
final String uuid = UUID.randomUUID().toString();
attrs.put(CoreAttributes.FILENAME.key(), uuid);
attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
attrs.put(CoreAttributes.UUID.key(), uuid);
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(attrs)
.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs);
records.put(fFile, record);
records.put(fFile.getId(), record);
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
return fFile;
}
@ -1681,7 +1692,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
example = validateRecordState(example);
final StandardRepositoryRecord exampleRepoRecord = records.get(example);
final StandardRepositoryRecord exampleRepoRecord = getRecord(example);
final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
final ContentClaim claim = exampleRepoRecord.getCurrentClaim();
if (offset + size > example.getSize()) {
@ -1702,7 +1713,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(clone, clone.getAttributes());
records.put(clone, record);
records.put(clone.getId(), record);
if (offset == 0L && size == example.getSize()) {
provenanceReporter.clone(example, clone);
@ -1730,7 +1741,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
eventBuilder.setComponentType(processorType);
eventBuilder.addParentFlowFile(parent);
updateEventContentClaims(eventBuilder, parent, records.get(parent));
updateEventContentClaims(eventBuilder, parent, getRecord(parent));
forkEventBuilders.put(parent, eventBuilder);
}
@ -1752,7 +1763,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
record.setWorking(newFile);
@ -1768,7 +1779,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return flowFile;
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
record.setWorking(newFile, key, value);
@ -1780,7 +1791,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final Map<String, String> updatedAttributes;
if (attributes.containsKey(CoreAttributes.UUID.key())) {
@ -1794,6 +1805,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord newFile = ffBuilder.build();
record.setWorking(newFile, updatedAttributes);
return newFile;
}
@ -1806,7 +1818,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return flowFile;
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
record.setWorking(newFile, key, null);
return newFile;
@ -1821,7 +1833,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return flowFile;
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
final Map<String, String> updatedAttrs = new HashMap<>();
@ -1842,7 +1854,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
if (keyPattern == null) {
@ -1895,7 +1907,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// the relationship specified is not known in this session/context
throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
@ -1913,7 +1925,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
if (record.getOriginalQueue() == null) {
throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
}
@ -1951,7 +1963,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long queuedTime = System.currentTimeMillis();
long contentSize = 0L;
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final StandardRepositoryRecord record = getRecord(flowFileRecord);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record, queuedTime);
@ -1972,7 +1985,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
record.markForDelete();
removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
@ -1996,7 +2009,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFiles = validateRecordState(flowFiles);
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
record.markForDelete();
removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
@ -2195,7 +2208,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
try {
ensureNotAppending(record.getCurrentClaim());
@ -2251,7 +2264,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
try {
ensureNotAppending(record.getCurrentClaim());
@ -2400,7 +2413,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Collection<StandardRepositoryRecord> sourceRecords = new ArrayList<>();
for (final FlowFile source : sources) {
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
sourceRecords.add(record);
try {
@ -2411,7 +2424,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
final StandardRepositoryRecord destinationRecord = records.get(destination);
final StandardRepositoryRecord destinationRecord = getRecord(destination);
final ContentRepository contentRepo = context.getContentRepository();
final ContentClaim newClaim;
try {
@ -2437,7 +2450,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final boolean useDemarcator = demarcator != null && demarcator.length > 0;
final int numSources = sources.size();
for (final FlowFile source : sources) {
final StandardRepositoryRecord sourceRecord = records.get(source);
final StandardRepositoryRecord sourceRecord = getRecord(source);
final long copied = contentRepo.exportTo(sourceRecord.getCurrentClaim(), out, sourceRecord.getCurrentClaimOffset(), source.getSize());
writtenCount += copied;
@ -2473,7 +2486,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeTemporaryClaim(destinationRecord);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
destinationRecord.setWorking(newFile);
records.put(newFile, destinationRecord);
return newFile;
}
@ -2495,7 +2507,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public OutputStream write(FlowFile source) {
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
ContentClaim newClaim = null;
try {
@ -2618,7 +2630,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
long writtenToFlowFile = 0L;
ContentClaim newClaim = null;
@ -2677,7 +2689,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
long newSize = 0L;
// Get the current Content Claim from the record and see if we already have
@ -2858,7 +2870,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public FlowFile write(FlowFile source, final StreamCallback writer) {
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
final ContentClaim currClaim = record.getCurrentClaim();
long writtenToFlowFile = 0L;
@ -2932,6 +2944,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
.build();
record.setWorking(newFile);
return newFile;
}
@ -2946,7 +2959,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
}
final StandardRepositoryRecord record = records.get(destination);
final StandardRepositoryRecord record = getRecord(destination);
final ContentClaim newClaim;
final long claimOffset;
@ -2992,7 +3005,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
destination = validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
final StandardRepositoryRecord record = getRecord(destination);
ContentClaim newClaim = null;
final long claimOffset = 0L;
@ -3030,7 +3043,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public void exportTo(FlowFile source, final Path destination, final boolean append) {
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
try {
ensureNotAppending(record.getCurrentClaim());
@ -3049,7 +3062,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public void exportTo(FlowFile source, final OutputStream destination) {
verifyTaskActive();
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final StandardRepositoryRecord record = getRecord(source);
if(record.getCurrentClaim() == null) {
return;
@ -3137,7 +3150,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new IllegalStateException(flowFile + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
final StandardRepositoryRecord record = records.get(flowFile);
final StandardRepositoryRecord record = getRecord(flowFile);
if (record == null) {
rollback();
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
@ -3170,11 +3183,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* <code>false</code> otherwise.
*/
boolean isFlowFileKnown(final FlowFile flowFile) {
return records.containsKey(flowFile);
return records.containsKey(flowFile.getId());
}
private FlowFile getMostRecent(final FlowFile flowFile) {
final StandardRepositoryRecord existingRecord = records.get(flowFile);
final StandardRepositoryRecord existingRecord = getRecord(flowFile);
return existingRecord == null ? flowFile : existingRecord.getCurrent();
}
@ -3183,10 +3196,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
parent = getMostRecent(parent);
final String uuid = UUID.randomUUID().toString();
final Map<String, String> newAttributes = new HashMap<>(3);
newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
newAttributes.put(CoreAttributes.UUID.key(), uuid);
final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
@ -3210,7 +3225,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord fFile = fFileBuilder.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes);
records.put(fFile, record);
records.put(fFile.getId(), record);
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
registerForkEvent(parent, fFile);
@ -3247,9 +3262,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
final String uuid = UUID.randomUUID().toString();
newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
newAttributes.put(CoreAttributes.UUID.key(), uuid);
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes)
@ -3258,7 +3274,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes);
records.put(fFile, record);
records.put(fFile.getId(), record);
createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
registerJoinEvent(fFile, parents);
@ -3339,7 +3355,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final List<ProvenanceEventRecord> autoTerminatedEvents = new ArrayList<>();
private final Set<ProvenanceEventRecord> reportedEvents = new LinkedHashSet<>();
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>();
@ -3392,5 +3408,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.contentSizeIn += session.contentSizeIn;
this.contentSizeOut += session.contentSizeOut;
}
private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
return records.get(flowFile.getId());
}
}
}

View File

@ -16,14 +16,6 @@
*/
package org.apache.nifi.processor;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
@ -41,6 +33,15 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.util.Connectables;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
private final ProcessorNode procNode;
@ -49,6 +50,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService
private final StringEncryptor encryptor;
private final StateManager stateManager;
private final TaskTermination taskTermination;
private final Map<PropertyDescriptor, String> properties;
public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
final TaskTermination taskTermination) {
@ -71,6 +73,8 @@ public class StandardProcessContext implements ProcessContext, ControllerService
preparedQueries.put(desc, pq);
}
}
properties = Collections.unmodifiableMap(processorNode.getProperties());
}
private void verifyTaskActive() {
@ -82,7 +86,17 @@ public class StandardProcessContext implements ProcessContext, ControllerService
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
verifyTaskActive();
return getProperty(descriptor.getName());
final String setPropertyValue = properties.get(descriptor);
if (setPropertyValue != null) {
return new StandardPropertyValue(setPropertyValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry());
}
// Get the "canonical" Property Descriptor from the Processor
final PropertyDescriptor canonicalDescriptor = procNode.getProcessor().getPropertyDescriptor(descriptor.getName());
final String defaultValue = canonicalDescriptor.getDefaultValue();
return new StandardPropertyValue(defaultValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry());
}
/**
@ -99,7 +113,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return null;
}
final String setPropertyValue = procNode.getProperty(descriptor);
final String setPropertyValue = properties.get(descriptor);
final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor), procNode.getVariableRegistry());
@ -138,7 +152,7 @@ public class StandardProcessContext implements ProcessContext, ControllerService
@Override
public Map<PropertyDescriptor, String> getProperties() {
verifyTaskActive();
return procNode.getProperties();
return properties;
}
@Override

View File

@ -16,30 +16,31 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.processor.Relationship;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.processor.Relationship;
public class StandardRepositoryRecord implements RepositoryRecord {
private RepositoryRecordType type = null;
private RepositoryRecordType type;
private FlowFileRecord workingFlowFileRecord = null;
private Relationship transferRelationship = null;
private FlowFileQueue destination = null;
private final FlowFileRecord originalFlowFileRecord;
private final FlowFileQueue originalQueue;
private String swapLocation;
private final Map<String, String> updatedAttributes = new HashMap<>();
private final Map<String, String> originalAttributes;
private Map<String, String> updatedAttributes = null;
private List<ContentClaim> transientClaims;
private final long startNanos = System.nanoTime();
/**
* Creates a new record which has no original claim or flow file - it is entirely new
*
@ -66,7 +67,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
this.originalFlowFileRecord = originalFlowFileRecord;
this.type = RepositoryRecordType.SWAP_OUT;
this.swapLocation = swapLocation;
this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes();
this.originalAttributes = originalFlowFileRecord == null ? Collections.emptyMap() : originalFlowFileRecord.getAttributes();
}
@Override
@ -113,30 +114,48 @@ public class StandardRepositoryRecord implements RepositoryRecord {
workingFlowFileRecord = flowFile;
}
private Map<String, String> initializeUpdatedAttributes() {
if (updatedAttributes == null) {
updatedAttributes = new HashMap<>();
}
return updatedAttributes;
}
public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) {
workingFlowFileRecord = flowFile;
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
if (type == RepositoryRecordType.CREATE) {
return;
}
// If setting attribute to same value as original, don't add to updated attributes
final String currentValue = originalAttributes.get(attributeKey);
if (currentValue == null || !currentValue.equals(attributeValue)) {
updatedAttributes.put(attributeKey, attributeValue);
initializeUpdatedAttributes().put(attributeKey, attributeValue);
}
}
public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) {
workingFlowFileRecord = flowFile;
// In the case that the type is CREATE, we know that all attributes are updated attributes, so no need to store them.
if (type == RepositoryRecordType.CREATE) {
return;
}
for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) {
final String currentValue = originalAttributes.get(entry.getKey());
if (currentValue == null || !currentValue.equals(entry.getValue())) {
updatedAttributes.put(entry.getKey(), entry.getValue());
initializeUpdatedAttributes().put(entry.getKey(), entry.getValue());
}
}
}
@Override
public boolean isAttributesChanged() {
return !updatedAttributes.isEmpty();
return type == RepositoryRecordType.CREATE || (updatedAttributes != null && !updatedAttributes.isEmpty());
}
public void markForAbort() {
@ -196,7 +215,11 @@ public class StandardRepositoryRecord implements RepositoryRecord {
}
Map<String, String> getUpdatedAttributes() {
return updatedAttributes;
if (type == RepositoryRecordType.CREATE) {
return getCurrent().getAttributes();
}
return updatedAttributes == null ? Collections.emptyMap() : updatedAttributes;
}
@Override

View File

@ -16,31 +16,14 @@
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -65,6 +48,25 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.util.TextLineDemarcator;
import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SideEffectFree
@SupportsBatching
@ -158,19 +160,17 @@ public class SplitText extends AbstractProcessor {
private static final Set<Relationship> relationships;
static {
properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{
LINE_SPLIT_COUNT,
FRAGMENT_MAX_SIZE,
HEADER_LINE_COUNT,
HEADER_MARKER,
REMOVE_TRAILING_NEWLINES
}));
properties = Collections.unmodifiableList(Arrays.asList(
LINE_SPLIT_COUNT,
FRAGMENT_MAX_SIZE,
HEADER_LINE_COUNT,
HEADER_MARKER,
REMOVE_TRAILING_NEWLINES));
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{
REL_ORIGINAL,
REL_SPLITS,
REL_FAILURE
})));
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_ORIGINAL,
REL_SPLITS,
REL_FAILURE)));
}
private volatile boolean removeTrailingNewLines;
@ -259,9 +259,11 @@ public class SplitText extends AbstractProcessor {
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
final String fragmentId = UUID.randomUUID().toString();
List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
final List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size());
processSession.transfer(originalFlowFile, REL_ORIGINAL);
if (!splitFlowFiles.isEmpty()) {
processSession.transfer(splitFlowFiles, REL_SPLITS);
}
@ -291,6 +293,7 @@ public class SplitText extends AbstractProcessor {
*/
private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
if (splitInfo != null) {
@ -305,7 +308,11 @@ public class SplitText extends AbstractProcessor {
fragmentId, fragmentIndex++, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
splitFlowFiles.add(splitFlowFile);
} else {
for (SplitInfo computedSplitInfo : computedSplitsInfo) {
final Iterator<SplitInfo> itr = computedSplitsInfo.iterator();
while (itr.hasNext()) {
final SplitInfo computedSplitInfo = itr.next();
itr.remove();
long length = this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
boolean proceedWithClone = headerFlowFile != null || length > 0;
if (proceedWithClone) {
@ -326,16 +333,24 @@ public class SplitText extends AbstractProcessor {
splitFlowFiles.add(splitFlowFile);
}
}
// Update fragment.count with real split count (i.e. don't count files for which there was no clone)
for (FlowFile splitFlowFile : splitFlowFiles) {
splitFlowFile = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index starts at 1 (see above)
final String fragmentCount = String.valueOf(fragmentIndex - 1); // -1 because the index starts at 1 (see above)
final ListIterator<FlowFile> flowFileItr = splitFlowFiles.listIterator();
while (flowFileItr.hasNext()) {
FlowFile splitFlowFile = flowFileItr.next();
final FlowFile updated = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, fragmentCount);
flowFileItr.set(updated);
}
}
getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
getLogger().info("Split {} into {} FlowFiles{}", new Object[] {sourceFlowFile, splitFlowFiles.size(), headerFlowFile == null ? " containing headers." : "."});
if (headerFlowFile != null) {
processSession.remove(headerFlowFile);
}
return splitFlowFiles;
}

View File

@ -16,23 +16,6 @@
*/
package org.apache.nifi.processors.attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -75,6 +58,24 @@ import org.apache.nifi.update.attributes.FlowFilePolicy;
import org.apache.nifi.update.attributes.Rule;
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@SupportsBatching
@ -97,6 +98,13 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
private final static Set<Relationship> statelessRelationshipSet;
private final static Set<Relationship> statefulRelationshipSet;
private final Map<String, String> canonicalValueLookup = new LinkedHashMap<String, String>() {
@Override
protected boolean removeEldestEntry(final Map.Entry eldest) {
return size() > 100;
}
};
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All successful FlowFiles are routed to this relationship").name("success").build();
@ -619,6 +627,30 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
}
/**
* This method caches a 'canonical' value for a given attribute value. When this processor is used to update an attribute or add a new
* attribute, if Expression Language is used, we may well end up with a new String object for each attribute for each FlowFile. As a result,
* we will store a different String object for the attribute value of every FlowFile, meaning that we have to keep a lot of String objects
* in heap. By using this 'canonical lookup', we are able to keep only a single String object on the heap.
*
* For example, if we have a property named "abc" and the value is "${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc"
* and xyz="xyz", then would end up with 1,000 String objects with a value of "abcxyz". By using this canonical representation, we are able to
* instead hold a single String whose value is "abcxyz" instead of holding 1,000 String objects in heap (1,000 String objects may still be created
* when calling PropertyValue.evaluateAttributeExpressions, but this way those values are garbage collected).
*
* @param attributeValue the value whose canonical value should be return
* @return the canonical representation of the given attribute value
*/
private synchronized String getCanonicalRepresentation(final String attributeValue) {
final String canonical = this.canonicalValueLookup.get(attributeValue);
if (canonical != null) {
return canonical;
}
this.canonicalValueLookup.put(attributeValue, attributeValue);
return attributeValue;
}
// Executes the specified action on the specified flowfile.
private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile,
final Map<String, String> stateInitialAttributes, final Map<String, String> stateWorkingAttributes) {
@ -688,7 +720,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
if (notDeleted || setStatefulAttribute) {
try {
final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue();
String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue();
newAttributeValue = getCanonicalRepresentation(newAttributeValue);
// log if appropriate
if (debugEnabled) {