diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java new file mode 100644 index 0000000000..1b3346b5f7 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/BlockingQueuePool.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class BlockingQueuePool implements ObjectPool { + private final BlockingQueue queue; + private final Supplier creationFunction; + private final Predicate reuseCheck; + private final Consumer returnPreparation; + + public BlockingQueuePool(final int maxSize, final Supplier creationFunction, final Predicate reuseCheck, final Consumer returnPreparation) { + this.queue = new LinkedBlockingQueue<>(maxSize); + this.creationFunction = creationFunction; + this.reuseCheck = reuseCheck; + this.returnPreparation = returnPreparation; + } + + @Override + public T borrowObject() { + final T existing = queue.poll(); + if (existing != null) { + return existing; + } + + return creationFunction.get(); + } + + @Override + public void returnObject(final T somethingBorrowed) { + if (reuseCheck.test(somethingBorrowed)) { + returnPreparation.accept(somethingBorrowed); + queue.offer(somethingBorrowed); + } + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java new file mode 100644 index 0000000000..1468d49f9e --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +/** + * A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream. + * This allows us to obtain the DataOutputStream itself so that we can perform + * writeXYZ methods and also allows us to obtain the underlying ByteArrayOutputStream + * for performing methods such as size(), reset(), writeTo() + */ +public class ByteArrayDataOutputStream { + private final ByteArrayOutputStream baos; + private final DataOutputStream dos; + + public ByteArrayDataOutputStream(final int intiialBufferSize) { + this.baos = new ByteArrayOutputStream(intiialBufferSize); + this.dos = new DataOutputStream(baos); + } + + public DataOutputStream getDataOutputStream() { + return dos; + } + + public ByteArrayOutputStream getByteArrayOutputStream() { + return baos; + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java new file mode 100644 index 0000000000..0dad62ceee --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + +public class HashMapSnapshot implements WriteAheadSnapshot, RecordLookup { + private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class); + private static final int ENCODING_VERSION = 1; + + private final ConcurrentMap recordMap = new ConcurrentHashMap<>(); + private final SerDeFactory serdeFactory; + private final Set swapLocations = Collections.synchronizedSet(new HashSet<>()); + private final File storageDirectory; + + public HashMapSnapshot(final File storageDirectory, final SerDeFactory serdeFactory) { + this.serdeFactory = serdeFactory; + this.storageDirectory = storageDirectory; + } + + private SnapshotHeader validateHeader(final DataInputStream dataIn) throws IOException { + final String snapshotClass = dataIn.readUTF(); + logger.debug("Snapshot Class Name for {} is {}", storageDirectory, snapshotClass); + if (!snapshotClass.equals(HashMapSnapshot.class.getName())) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using the " + + snapshotClass + " class; cannot restore using " + getClass().getName()); + } + + final int snapshotVersion = dataIn.readInt(); + logger.debug("Snapshot version for {} is {}", storageDirectory, snapshotVersion); + if (snapshotVersion > getVersion()) { + throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using version " + + snapshotVersion + " of the " + snapshotClass + " class; cannot restore using Version " + getVersion()); + } + + final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now + logger.debug("Serde encoding for Snapshot at {} is {}", storageDirectory, serdeEncoding); + + final int serdeVersion = dataIn.readInt(); + logger.debug("Serde version for Snapshot at {} is {}", storageDirectory, serdeVersion); + + final long maxTransactionId = dataIn.readLong(); + logger.debug("Max Transaction ID for Snapshot at {} is {}", storageDirectory, maxTransactionId); + + final int numRecords = dataIn.readInt(); + logger.debug("Number of Records for Snapshot at {} is {}", storageDirectory, numRecords); + + final SerDe serde = serdeFactory.createSerDe(serdeEncoding); + serde.readHeader(dataIn); + + return new SnapshotHeader(serde, serdeVersion, maxTransactionId, numRecords); + } + + @Override + public SnapshotRecovery recover() throws IOException { + final File partialFile = getPartialFile(); + final File snapshotFile = getSnapshotFile(); + final boolean partialExists = partialFile.exists(); + final boolean snapshotExists = snapshotFile.exists(); + + // If there is no snapshot (which is the case before the first snapshot is ever created), then just + // return an empty recovery. + if (!partialExists && !snapshotExists) { + return SnapshotRecovery.emptyRecovery(); + } + + if (partialExists && snapshotExists) { + // both files exist -- assume NiFi crashed/died while checkpointing. Delete the partial file. + Files.delete(partialFile.toPath()); + } else if (partialExists) { + // partial exists but snapshot does not -- we must have completed + // creating the partial, deleted the snapshot + // but crashed before renaming the partial to the snapshot. Just + // rename partial to snapshot + Files.move(partialFile.toPath(), snapshotFile.toPath()); + } + + if (snapshotFile.length() == 0) { + logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this); + return SnapshotRecovery.emptyRecovery(); + } + + // At this point, we know the snapshotPath exists because if it didn't, then we either returned null + // or we renamed partialPath to snapshotPath. So just Recover from snapshotPath. + try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new FileInputStream(snapshotFile)))) { + // Ensure that the header contains the information that we expect and retrieve the relevant information from the header. + final SnapshotHeader header = validateHeader(dataIn); + + final SerDe serde = header.getSerDe(); + final int serdeVersion = header.getSerDeVersion(); + final int numRecords = header.getNumRecords(); + final long maxTransactionId = header.getMaxTransactionId(); + + // Read all of the records that we expect to receive. + for (int i = 0; i < numRecords; i++) { + final T record = serde.deserializeRecord(dataIn, serdeVersion); + if (record == null) { + throw new EOFException(); + } + + final UpdateType updateType = serde.getUpdateType(record); + if (updateType == UpdateType.DELETE) { + logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored"); + continue; + } + + logger.trace("Recovered from snapshot: {}", record); + recordMap.put(serde.getRecordIdentifier(record), record); + } + + // Determine the location of any swap files. + final int numSwapRecords = dataIn.readInt(); + final Set swapLocations = new HashSet<>(); + for (int i = 0; i < numSwapRecords; i++) { + swapLocations.add(dataIn.readUTF()); + } + this.swapLocations.addAll(swapLocations); + + logger.info("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", + new Object[] {this, numRecords, swapLocations.size(), maxTransactionId}); + + return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId); + } + } + + @Override + public void update(final Collection records) { + // This implementation of Snapshot keeps a ConcurrentHashMap of all 'active' records + // (meaning records that have not been removed and are not swapped out), keyed by the + // Record Identifier. It keeps only the most up-to-date version of the Record. This allows + // us to write the snapshot very quickly without having to re-process the journal files. + // For each update, then, we will update the record in the map. + for (final T record : records) { + final Object recordId = serdeFactory.getRecordIdentifier(record); + final UpdateType updateType = serdeFactory.getUpdateType(record); + + switch (updateType) { + case DELETE: + recordMap.remove(recordId); + break; + case SWAP_OUT: + final String location = serdeFactory.getLocation(record); + if (location == null) { + logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but " + + "no indicator of where the Record is to be Swapped Out to; these records may be " + + "lost when the repository is restored!"); + } else { + recordMap.remove(recordId); + this.swapLocations.add(location); + } + break; + case SWAP_IN: + final String swapLocation = serdeFactory.getLocation(record); + if (swapLocation == null) { + logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no " + + "indicator of where the Record is to be Swapped In from; these records may be duplicated " + + "when the repository is restored!"); + } else { + swapLocations.remove(swapLocation); + } + recordMap.put(recordId, record); + break; + default: + recordMap.put(recordId, record); + break; + } + } + } + + @Override + public int getRecordCount() { + return recordMap.size(); + } + + @Override + public T lookup(final Object recordId) { + return recordMap.get(recordId); + } + + + @Override + public SnapshotCapture prepareSnapshot(final long maxTransactionId) { + return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId); + } + + private int getVersion() { + return ENCODING_VERSION; + } + + private File getPartialFile() { + return new File(storageDirectory, "checkpoint.partial"); + } + + private File getSnapshotFile() { + return new File(storageDirectory, "checkpoint"); + } + + @Override + public synchronized void writeSnapshot(final SnapshotCapture snapshot) throws IOException { + final SerDe serde = serdeFactory.createSerDe(null); + + final File snapshotFile = getSnapshotFile(); + final File partialFile = getPartialFile(); + + // We must ensure that we do not overwrite the existing Snapshot file directly because if NiFi were + // to be killed or crash when we are partially finished, we'd end up with no viable Snapshot file at all. + // To avoid this, we write to a 'partial' file, then delete the existing Snapshot file, if it exists, and + // rename 'partial' to Snaphsot. That way, if NiFi crashes, we can still restore the Snapshot by first looking + // for a Snapshot file and restoring it, if it exists. If it does not exist, then we restore from the partial file, + // assuming that NiFi crashed after deleting the Snapshot file and before renaming the partial file. + // + // If there is no Snapshot file currently but there is a Partial File, then this indicates + // that we have deleted the Snapshot file and failed to rename the Partial File. We don't want + // to overwrite the Partial file, because doing so could potentially lose data. Instead, we must + // first rename it to Snapshot and then write to the partial file. + if (!snapshotFile.exists() && partialFile.exists()) { + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + // Write to the partial file. + try (final FileOutputStream fileOut = new FileOutputStream(getPartialFile()); + final OutputStream bufferedOut = new BufferedOutputStream(fileOut); + final DataOutputStream dataOut = new DataOutputStream(bufferedOut)) { + + // Write out the header + dataOut.writeUTF(HashMapSnapshot.class.getName()); + dataOut.writeInt(getVersion()); + dataOut.writeUTF(serde.getClass().getName()); + dataOut.writeInt(serde.getVersion()); + dataOut.writeLong(snapshot.getMaxTransactionId()); + dataOut.writeInt(snapshot.getRecords().size()); + serde.writeHeader(dataOut); + + // Serialize each record + for (final T record : snapshot.getRecords().values()) { + logger.trace("Checkpointing {}", record); + serde.serializeRecord(record, dataOut); + } + + // Write out the number of swap locations, followed by the swap locations themselves. + dataOut.writeInt(snapshot.getSwapLocations().size()); + for (final String swapLocation : snapshot.getSwapLocations()) { + dataOut.writeUTF(swapLocation); + } + + // Ensure that we flush the Buffered Output Stream and then perform an fsync(). + // This ensures that the data is fully written to disk before we delete the existing snapshot. + dataOut.flush(); + fileOut.getChannel().force(false); + } + + // If the snapshot file exists, delete it + if (snapshotFile.exists()) { + if (!snapshotFile.delete()) { + logger.warn("Unable to delete existing Snapshot file " + snapshotFile); + } + } + + // Rename the partial file to Snapshot. + final boolean rename = partialFile.renameTo(snapshotFile); + if (!rename) { + throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile); + } + } + + + public class Snapshot implements SnapshotCapture { + private final Map records; + private final long maxTransactionId; + private final Set swapLocations; + + public Snapshot(final Map records, final Set swapLocations, final long maxTransactionId) { + this.records = records; + this.swapLocations = swapLocations; + this.maxTransactionId = maxTransactionId; + } + + @Override + public final Map getRecords() { + return records; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Set getSwapLocations() { + return swapLocations; + } + } + + private class SnapshotHeader { + private final SerDe serde; + private final int serdeVersion; + private final int numRecords; + private final long maxTransactionId; + + public SnapshotHeader(final SerDe serde, final int serdeVersion, final long maxTransactionId, final int numRecords) { + this.serde = serde; + this.serdeVersion = serdeVersion; + this.maxTransactionId = maxTransactionId; + this.numRecords = numRecords; + } + + public SerDe getSerDe() { + return serde; + } + + public int getSerDeVersion() { + return serdeVersion; + } + + public long getMaxTransactionId() { + return maxTransactionId; + } + + public int getNumRecords() { + return numRecords; + } + } + +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java new file mode 100644 index 0000000000..2832396d30 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalRecovery.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface JournalRecovery { + + int getUpdateCount(); + + long getMaxTransactionId(); + + boolean isEOFExceptionEncountered(); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java new file mode 100644 index 0000000000..ee21036666 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/JournalSummary.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface JournalSummary { + /** + * @return the Transaction ID of the first transaction written + */ + long getFirstTransactionId(); + + /** + * @return the Transaction ID of the last transaction written + */ + long getLastTransactionId(); + + /** + * @return the number of transactions that were written to the journal + */ + int getTransactionCount(); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java new file mode 100644 index 0000000000..0b2a8d3db3 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + +public class LengthDelimitedJournal implements WriteAheadJournal { + private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class); + private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0); + private static final int JOURNAL_ENCODING_VERSION = 1; + private static final byte TRANSACTION_FOLLOWS = 64; + private static final byte JOURNAL_COMPLETE = 127; + private static final int NUL_BYTE = 0; + + private final File journalFile; + private final long initialTransactionId; + private final SerDeFactory serdeFactory; + private final ObjectPool streamPool; + + private SerDe serde; + private FileOutputStream fileOut; + private BufferedOutputStream bufferedOut; + + private long currentTransactionId; + private int transactionCount; + private boolean headerWritten = false; + + private volatile boolean poisoned = false; + private volatile boolean closed = false; + private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block + + public LengthDelimitedJournal(final File journalFile, final SerDeFactory serdeFactory, final ObjectPool streamPool, final long initialTransactionId) { + this.journalFile = journalFile; + this.serdeFactory = serdeFactory; + this.serde = serdeFactory.createSerDe(null); + this.streamPool = streamPool; + + this.initialTransactionId = initialTransactionId; + this.currentTransactionId = initialTransactionId; + } + + private synchronized OutputStream getOutputStream() throws FileNotFoundException { + if (fileOut == null) { + fileOut = new FileOutputStream(journalFile); + bufferedOut = new BufferedOutputStream(fileOut); + } + + return bufferedOut; + } + + + @Override + public synchronized void writeHeader() throws IOException { + try { + final DataOutputStream outStream = new DataOutputStream(getOutputStream()); + outStream.writeUTF(LengthDelimitedJournal.class.getName()); + outStream.writeInt(JOURNAL_ENCODING_VERSION); + + serde = serdeFactory.createSerDe(null); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(baos)) { + + serde.writeHeader(dos); + dos.flush(); + + final int serdeHeaderLength = baos.size(); + outStream.writeInt(serdeHeaderLength); + baos.writeTo(outStream); + } + + outStream.flush(); + } catch (final Throwable t) { + poison(t); + + final IOException ioe = (t instanceof IOException) ? (IOException) t : new IOException("Failed to create journal file " + journalFile, t); + logger.error("Failed to create new journal file {} due to {}", journalFile, ioe.toString(), ioe); + throw ioe; + } + + headerWritten = true; + } + + private synchronized SerDeAndVersion validateHeader(final DataInputStream in) throws IOException { + final String journalClassName = in.readUTF(); + logger.debug("Write Ahead Log Class Name for {} is {}", journalFile, journalClassName); + if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) { + throw new IOException("Invalid header information - " + journalFile + " does not appear to be a valid journal file."); + } + + final int encodingVersion = in.readInt(); + logger.debug("Encoding version for {} is {}", journalFile, encodingVersion); + if (encodingVersion > JOURNAL_ENCODING_VERSION) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " but this version of the code only understands version " + JOURNAL_ENCODING_VERSION + " and below"); + } + + final String serdeClassName = in.readUTF(); + logger.debug("Serde Class Name for {} is {}", journalFile, serdeClassName); + final SerDe serde; + try { + serde = serdeFactory.createSerDe(serdeClassName); + } catch (final IllegalArgumentException iae) { + throw new IOException("Cannot read journal file " + journalFile + " because the serializer/deserializer used was " + serdeClassName + + " but this repository is configured to use a different type of serializer/deserializer"); + } + + final int serdeVersion = in.readInt(); + logger.debug("Serde version is {}", serdeVersion); + if (serdeVersion > serde.getVersion()) { + throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion + + " of the serializer/deserializer but this version of the code only understands version " + serde.getVersion() + " and below"); + } + + final int serdeHeaderLength = in.readInt(); + final InputStream serdeHeaderIn = new LimitingInputStream(in, serdeHeaderLength); + final DataInputStream dis = new DataInputStream(serdeHeaderIn); + serde.readHeader(dis); + + return new SerDeAndVersion(serde, serdeVersion); + } + + + @Override + public void update(final Collection records, final RecordLookup recordLookup) throws IOException { + if (!headerWritten) { + throw new IllegalStateException("Cannot update journal file " + journalFile + " because no header has been written yet."); + } + + if (records.isEmpty()) { + return; + } + + checkState(); + + final ByteArrayDataOutputStream bados = streamPool.borrowObject(); + try { + for (final T record : records) { + final Object recordId = serde.getRecordIdentifier(record); + final T previousRecordState = recordLookup.lookup(recordId); + serde.serializeEdit(previousRecordState, record, bados.getDataOutputStream()); + } + + final ByteArrayOutputStream baos = bados.getByteArrayOutputStream(); + final OutputStream out = getOutputStream(); + + final long transactionId; + synchronized (this) { + transactionId = currentTransactionId++; + transactionCount++; + + transactionPreamble.clear(); + transactionPreamble.putLong(transactionId); + transactionPreamble.putInt(baos.size()); + + out.write(TRANSACTION_FOLLOWS); + out.write(transactionPreamble.array()); + baos.writeTo(out); + out.flush(); + } + + logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size()); + } catch (final Throwable t) { + poison(t); + throw t; + } finally { + streamPool.returnObject(bados); + } + } + + private void checkState() throws IOException { + if (poisoned) { + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. " + + "If the repository is able to checkpoint, then this problem will resolve itself. However, if the repository is unable to be checkpointed " + + "(for example, due to being out of storage space or having too many open files), then this issue may require manual intervention."); + } + + if (closed) { + throw new IOException("Cannot update journal file " + journalFile + " because this journal has already been closed"); + } + } + + private void poison(final Throwable t) { + this.poisoned = true; + + try { + if (fileOut != null) { + fileOut.close(); + } + + closed = true; + } catch (final IOException innerIOE) { + t.addSuppressed(innerIOE); + } + } + + @Override + public synchronized void fsync() throws IOException { + checkState(); + + try { + if (fileOut != null) { + fileOut.getChannel().force(false); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + closed = true; + + try { + if (fileOut != null) { + if (!poisoned) { + fileOut.write(JOURNAL_COMPLETE); + } + + fileOut.close(); + } + } catch (final IOException ioe) { + poison(ioe); + } + } + + @Override + public JournalRecovery recoverRecords(final Map recordMap, final Set swapLocations) throws IOException { + long maxTransactionId = -1L; + int updateCount = 0; + + boolean eofException = false; + logger.info("Recovering records from journal {}", journalFile); + final double journalLength = journalFile.length(); + + try (final InputStream fis = new FileInputStream(journalFile); + final InputStream bufferedIn = new BufferedInputStream(fis); + final ByteCountingInputStream byteCountingIn = new ByteCountingInputStream(bufferedIn); + final DataInputStream in = new DataInputStream(byteCountingIn)) { + + try { + // Validate that the header is what we expect and obtain the appropriate SerDe and Version information + final SerDeAndVersion serdeAndVersion = validateHeader(in); + final SerDe serde = serdeAndVersion.getSerDe(); + + // Ensure that we get a valid transaction indicator + int transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + long consumedAtLog = 0L; + + // We don't want to apply the updates in a transaction until we've finished recovering the entire + // transaction. Otherwise, we could apply say 8 out of 10 updates and then hit an EOF. In such a case, + // we want to rollback the entire transaction. We handle this by not updating recordMap or swapLocations + // variables directly but instead keeping track of the things that occurred and then once we've read the + // entire transaction, we can apply those updates to the recordMap and swapLocations. + final Map transactionRecordMap = new HashMap<>(); + final Set idsRemoved = new HashSet<>(); + final Set swapLocationsRemoved = new HashSet<>(); + final Set swapLocationsAdded = new HashSet<>(); + int transactionUpdates = 0; + + // While we have a transaction to recover, recover it + while (transactionIndicator == TRANSACTION_FOLLOWS) { + transactionRecordMap.clear(); + idsRemoved.clear(); + swapLocationsRemoved.clear(); + swapLocationsAdded.clear(); + transactionUpdates = 0; + + // Format is + final long transactionId = in.readLong(); + maxTransactionId = Math.max(maxTransactionId, transactionId); + final int transactionLength = in.readInt(); + + // Use SerDe to deserialize the update. We use a LimitingInputStream to ensure that the SerDe is not able to read past its intended + // length, in case there is a bug in the SerDe. We then use a ByteCountingInputStream so that we can ensure that all of the data has + // been read and throw EOFException otherwise. + final InputStream transactionLimitingIn = new LimitingInputStream(in, transactionLength); + final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn); + final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn); + + while (transactionByteCountingIn.getBytesConsumed() < transactionLength) { + final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion()); + + // Update our RecordMap so that we have the most up-to-date version of the Record. + final Object recordId = serde.getRecordIdentifier(record); + final UpdateType updateType = serde.getUpdateType(record); + + switch (updateType) { + case DELETE: { + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + break; + } + case SWAP_IN: { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.add(location); + swapLocationsAdded.remove(location); + transactionRecordMap.put(recordId, record); + } + break; + } + case SWAP_OUT: { + final String location = serde.getLocation(record); + if (location == null) { + logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record"); + } else { + swapLocationsRemoved.remove(location); + swapLocationsAdded.add(location); + idsRemoved.add(recordId); + transactionRecordMap.remove(recordId); + } + + break; + } + default: { + transactionRecordMap.put(recordId, record); + idsRemoved.remove(recordId); + break; + } + } + + transactionUpdates++; + } + + // Apply the transaction + for (final Object id : idsRemoved) { + recordMap.remove(id); + } + recordMap.putAll(transactionRecordMap); + swapLocations.removeAll(swapLocationsRemoved); + swapLocations.addAll(swapLocationsAdded); + updateCount += transactionUpdates; + + // Check if there is another transaction to read + transactionIndicator = in.read(); + if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) { + throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of " + + transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted."); + } + + // If we have a very large journal (for instance, if checkpoint is not called for a long time, or if there is a problem rolling over + // the journal), then we want to occasionally notify the user that we are, in fact, making progress, so that it doesn't appear that + // NiFi has become "stuck". + final long consumed = byteCountingIn.getBytesConsumed(); + if (consumed - consumedAtLog > 50_000_000) { + final double percentage = consumed / journalLength * 100D; + final String pct = new DecimalFormat("#.00").format(percentage); + logger.info("{}% of the way finished recovering journal {}, having recovered {} updates", pct, journalFile, updateCount); + consumedAtLog = consumed; + } + } + } catch (final EOFException eof) { + eofException = true; + logger.warn("Encountered unexpected End-of-File when reading journal file {}; assuming that NiFi was shutdown unexpectedly and continuing recovery", journalFile); + } catch (final Exception e) { + // If the stream consists solely of NUL bytes, then we want to treat it + // the same as an EOF because we see this happen when we suddenly lose power + // while writing to a file. However, if that is not the case, then something else has gone wrong. + // In such a case, there is not much that we can do but to re-throw the Exception. + if (remainingBytesAllNul(in)) { + logger.warn("Failed to recover some of the data from Write-Ahead Log Journal because encountered trailing NUL bytes. " + + "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes." + + "The following Exception was encountered while recovering the updates to the journal:", e); + } else { + throw e; + } + } + } + + logger.info("Successfully recovered {} updates from journal {}", updateCount, journalFile); + return new StandardJournalRecovery(updateCount, maxTransactionId, eofException); + } + + /** + * In the case of a sudden power loss, it is common - at least in a Linux journaling File System - + * that the partition file that is being written to will have many trailing "NUL bytes" (0's). + * If this happens, then on restart we want to treat this as an incomplete transaction, so we detect + * this case explicitly. + * + * @param in the input stream to scan + * @return true if the InputStream contains no data or contains only NUL bytes + * @throws IOException if unable to read from the given InputStream + */ + private boolean remainingBytesAllNul(final InputStream in) throws IOException { + int nextByte; + while ((nextByte = in.read()) != -1) { + if (nextByte != NUL_BYTE) { + return false; + } + } + + return true; + } + + + @Override + public synchronized JournalSummary getSummary() { + if (transactionCount < 1) { + return INACTIVE_JOURNAL_SUMMARY; + } + + return new StandardJournalSummary(initialTransactionId, currentTransactionId - 1, transactionCount); + } + + private class SerDeAndVersion { + private final SerDe serde; + private final int version; + + public SerDeAndVersion(final SerDe serde, final int version) { + this.serde = serde; + this.version = version; + } + + public SerDe getSerDe() { + return serde; + } + + public int getVersion() { + return version; + } + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java new file mode 100644 index 0000000000..16f1a36abc --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ObjectPool.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface ObjectPool { + + T borrowObject(); + + void returnObject(T somethingBorrowed); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java new file mode 100644 index 0000000000..4c7b834329 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/RecordLookup.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public interface RecordLookup { + + /** + * Returns the Record with the given identifier, or null if no such record exists + * + * @param identifier the identifier of the record to lookup + * @return the Record with the given identifier, or null if no such record exists + */ + T lookup(Object identifier); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java new file mode 100644 index 0000000000..e4a1db7bf9 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDeFactory; +import org.wali.SyncListener; +import org.wali.WriteAheadRepository; + +/** + *

+ * This implementation of WriteAheadRepository provides the ability to write all updates to the + * repository sequentially by writing to a single journal file. Serialization of data into bytes + * happens outside of any lock contention and is done so using recycled byte buffers. As such, + * we occur minimal garbage collection and the theoretical throughput of this repository is equal + * to the throughput of the underlying disk itself. + *

+ * + *

+ * This implementation makes the assumption that only a single thread will ever issue updates for + * a given Record at any one time. I.e., the implementation is thread-safe but cannot guarantee + * that records are recovered correctly if two threads simultaneously update the write-ahead log + * with updates for the same record. + *

+ */ +public class SequentialAccessWriteAheadLog implements WriteAheadRepository { + private static final int PARTITION_INDEX = 0; + private static final Logger logger = LoggerFactory.getLogger(SequentialAccessWriteAheadLog.class); + private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); + private static final int MAX_BUFFERS = 64; + private static final int BUFFER_SIZE = 256 * 1024; + + private final File storageDirectory; + private final File journalsDirectory; + private final SerDeFactory serdeFactory; + private final SyncListener syncListener; + + private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock(); + private final Lock journalReadLock = journalRWLock.readLock(); + private final Lock journalWriteLock = journalRWLock.writeLock(); + private final ObjectPool streamPool = new BlockingQueuePool<>(MAX_BUFFERS, + () -> new ByteArrayDataOutputStream(BUFFER_SIZE), + stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE, + stream -> stream.getByteArrayOutputStream().reset()); + + private final WriteAheadSnapshot snapshot; + private final RecordLookup recordLookup; + private SnapshotRecovery snapshotRecovery; + + private volatile boolean recovered = false; + private WriteAheadJournal journal; + private volatile long nextTransactionId = 0L; + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory serdeFactory) throws IOException { + this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER); + } + + public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory serdeFactory, final SyncListener syncListener) throws IOException { + if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { + throw new IOException("Directory " + storageDirectory + " does not exist and cannot be created"); + } + if (!storageDirectory.isDirectory()) { + throw new IOException("File " + storageDirectory + " is a regular file and not a directory"); + } + + final HashMapSnapshot hashMapSnapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + this.snapshot = hashMapSnapshot; + this.recordLookup = hashMapSnapshot; + + this.storageDirectory = storageDirectory; + this.journalsDirectory = new File(storageDirectory, "journals"); + if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) { + throw new IOException("Directory " + journalsDirectory + " does not exist and cannot be created"); + } + + recovered = false; + + this.serdeFactory = serdeFactory; + this.syncListener = (syncListener == null) ? SyncListener.NOP_SYNC_LISTENER : syncListener; + } + + @Override + public int update(final Collection records, final boolean forceSync) throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot update repository until record recovery has been performed"); + } + + journalReadLock.lock(); + try { + journal.update(records, recordLookup); + + if (forceSync) { + journal.fsync(); + syncListener.onSync(PARTITION_INDEX); + } + + snapshot.update(records); + } finally { + journalReadLock.unlock(); + } + + return PARTITION_INDEX; + } + + @Override + public synchronized Collection recoverRecords() throws IOException { + if (recovered) { + throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced"); + } + + logger.info("Recovering records from Write-Ahead Log at {}", storageDirectory); + + final long recoverStart = System.nanoTime(); + recovered = true; + snapshotRecovery = snapshot.recover(); + + final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart); + + final Map recoveredRecords = snapshotRecovery.getRecords(); + final Set swapLocations = snapshotRecovery.getRecoveredSwapLocations(); + + final File[] journalFiles = journalsDirectory.listFiles(this::isJournalFile); + if (journalFiles == null) { + throw new IOException("Cannot access the list of files in directory " + journalsDirectory + "; please ensure that appropriate file permissions are set."); + } + + if (snapshotRecovery.getRecoveryFile() == null) { + logger.info("No Snapshot File to recover from at {}. Now recovering records from {} journal files", storageDirectory, journalFiles.length); + } else { + logger.info("Successfully recovered {} records and {} swap files from Snapshot at {} with Max Transaction ID of {} in {} milliseconds. Now recovering records from {} journal files", + recoveredRecords.size(), swapLocations.size(), snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(), + snapshotRecoveryMillis, journalFiles.length); + } + + final List orderedJournalFiles = Arrays.asList(journalFiles); + Collections.sort(orderedJournalFiles, new Comparator() { + @Override + public int compare(final File o1, final File o2) { + final long transactionId1 = getMinTransactionId(o1); + final long transactionId2 = getMinTransactionId(o2); + + return Long.compare(transactionId1, transactionId2); + } + }); + + final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId(); + + int totalUpdates = 0; + int journalFilesRecovered = 0; + int journalFilesSkipped = 0; + long maxTransactionId = snapshotTransactionId; + + for (final File journalFile : orderedJournalFiles) { + final long journalMinTransactionId = getMinTransactionId(journalFile); + if (journalMinTransactionId < snapshotTransactionId) { + logger.debug("Will not recover records from journal file {} because the minimum Transaction ID for that journal is {} and the Transaction ID recovered from Snapshot was {}", + journalFile, journalMinTransactionId, snapshotTransactionId); + + journalFilesSkipped++; + continue; + } + + logger.debug("Min Transaction ID for journal {} is {}, so will recover records from journal", journalFile, journalMinTransactionId); + journalFilesRecovered++; + + try (final WriteAheadJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final JournalRecovery journalRecovery = journal.recoverRecords(recoveredRecords, swapLocations); + final int updates = journalRecovery.getUpdateCount(); + + logger.debug("Recovered {} updates from journal {}", updates, journalFile); + totalUpdates += updates; + maxTransactionId = Math.max(maxTransactionId, journalRecovery.getMaxTransactionId()); + } + } + + logger.debug("Recovered {} updates from {} journal files and skipped {} journal files because their data was already encapsulated in the snapshot", + totalUpdates, journalFilesRecovered, journalFilesSkipped); + this.nextTransactionId = maxTransactionId + 1; + + final long recoverNanos = System.nanoTime() - recoverStart; + final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); + logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis); + + checkpoint(); + + return recoveredRecords.values(); + } + + private long getMinTransactionId(final File journalFile) { + final String filename = journalFile.getName(); + final String numeral = filename.substring(0, filename.indexOf(".")); + return Long.parseLong(numeral); + } + + private boolean isJournalFile(final File file) { + if (!file.isFile()) { + return false; + } + + final String filename = file.getName(); + return JOURNAL_FILENAME_PATTERN.matcher(filename).matches(); + } + + @Override + public synchronized Set getRecoveredSwapLocations() throws IOException { + if (!recovered) { + throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed"); + } + + return snapshotRecovery.getRecoveredSwapLocations(); + } + + @Override + public int checkpoint() throws IOException { + final SnapshotCapture snapshotCapture; + + final long startNanos = System.nanoTime(); + final File[] existingJournals; + journalWriteLock.lock(); + try { + if (journal != null) { + final JournalSummary journalSummary = journal.getSummary(); + if (journalSummary.getTransactionCount() == 0) { + logger.debug("Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint"); + return snapshot.getRecordCount(); + } + + journal.fsync(); + journal.close(); + + nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1); + } + + syncListener.onGlobalSync(); + + final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile); + existingJournals = (existingFiles == null) ? new File[0] : existingFiles; + + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); + + // Create a new journal. We name the journal file .journal but it is possible + // that we could have an empty journal file already created. If this happens, we don't want to create + // a new file on top of it because it would get deleted below when we clean up old journals. So we + // will simply increment our transaction ID and try again. + File journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal"); + while (journalFile.exists()) { + nextTransactionId++; + journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal"); + } + + journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId); + journal.writeHeader(); + + logger.debug("Created new Journal starting with Transaction ID {}", nextTransactionId); + } finally { + journalWriteLock.unlock(); + } + + final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + snapshot.writeSnapshot(snapshotCapture); + + for (final File existingJournal : existingJournals) { + logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", existingJournal.getName()); + if (!existingJournal.delete() && existingJournal.exists()) { + logger.warn("Unable to delete expired journal file " + existingJournal + "; this file should be deleted manually."); + } + } + + final long totalNanos = System.nanoTime() - startNanos; + final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos); + logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds), max Transaction ID {}", + new Object[] {snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId()}); + + return snapshotCapture.getRecords().size(); + } + + + @Override + public void shutdown() throws IOException { + journalWriteLock.lock(); + try { + if (journal != null) { + journal.close(); + } + } finally { + journalWriteLock.unlock(); + } + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java new file mode 100644 index 0000000000..5a2425839b --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotCapture.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.util.Map; +import java.util.Set; + +public interface SnapshotCapture { + Map getRecords(); + + long getMaxTransactionId(); + + Set getSwapLocations(); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java new file mode 100644 index 0000000000..9b6339a205 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SnapshotRecovery.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +public interface SnapshotRecovery { + long getMaxTransactionId(); + + Map getRecords(); + + Set getRecoveredSwapLocations(); + + File getRecoveryFile(); + + + public static SnapshotRecovery emptyRecovery() { + return new SnapshotRecovery() { + @Override + public long getMaxTransactionId() { + return -1L; + } + + @Override + public Map getRecords() { + return Collections.emptyMap(); + } + + @Override + public Set getRecoveredSwapLocations() { + return Collections.emptySet(); + } + + @Override + public File getRecoveryFile() { + return null; + } + }; + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java new file mode 100644 index 0000000000..459e777944 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalRecovery.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public class StandardJournalRecovery implements JournalRecovery { + private final int updateCount; + private final long maxTransactionId; + private final boolean eofException; + + public StandardJournalRecovery(final int updateCount, final long maxTransactionId, final boolean eofException) { + this.updateCount = updateCount; + this.maxTransactionId = maxTransactionId; + this.eofException = eofException; + } + + @Override + public int getUpdateCount() { + return updateCount; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public boolean isEOFExceptionEncountered() { + return eofException; + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java new file mode 100644 index 0000000000..0f3c60f584 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardJournalSummary.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +public class StandardJournalSummary implements JournalSummary { + private final long firstTransactionId; + private final long lastTransactionId; + private final int transactionCount; + + public StandardJournalSummary(final long firstTransactionId, final long lastTransactionId, final int transactionCount) { + this.firstTransactionId = firstTransactionId; + this.lastTransactionId = lastTransactionId; + this.transactionCount = transactionCount; + } + + @Override + public long getFirstTransactionId() { + return firstTransactionId; + } + + @Override + public long getLastTransactionId() { + return lastTransactionId; + } + + @Override + public int getTransactionCount() { + return transactionCount; + } + +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java new file mode 100644 index 0000000000..70588dcf5d --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/StandardSnapshotRecovery.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +public class StandardSnapshotRecovery implements SnapshotRecovery { + private final Map recordMap; + private final Set recoveredSwapLocations; + private final File recoveryFile; + private final long maxTransactionId; + + public StandardSnapshotRecovery(final Map recordMap, final Set recoveredSwapLocations, final File recoveryFile, final long maxTransactionId) { + this.recordMap = recordMap; + this.recoveredSwapLocations = recoveredSwapLocations; + this.recoveryFile = recoveryFile; + this.maxTransactionId = maxTransactionId; + } + + @Override + public long getMaxTransactionId() { + return maxTransactionId; + } + + @Override + public Map getRecords() { + return recordMap; + } + + @Override + public Set getRecoveredSwapLocations() { + return recoveredSwapLocations; + } + + @Override + public File getRecoveryFile() { + return recoveryFile; + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java new file mode 100644 index 0000000000..f35d47ab79 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface WriteAheadJournal extends Closeable { + + JournalRecovery recoverRecords(Map recordMap, Set swapLocations) throws IOException; + + /** + * Updates the journal with the given set of records + * + * @param records the records to update + * @param recordLookup a lookup that can be used to access the current value of a record, given its ID + * + * @throws IOException if unable to write to the underlying storage mechanism + */ + void update(Collection records, RecordLookup recordLookup) throws IOException; + + void writeHeader() throws IOException; + + void fsync() throws IOException; + + /** + * Returns information about what was written to the journal + * + * @return A JournalSummary indicating what was written to the journal + * @throws IOException if unable to write to the underlying storage mechanism. + */ + JournalSummary getSummary(); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java new file mode 100644 index 0000000000..a4cbcd2892 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import java.io.IOException; +import java.util.Collection; + +public interface WriteAheadSnapshot { + SnapshotCapture prepareSnapshot(long maxTransactionId); + + void writeSnapshot(SnapshotCapture snapshot) throws IOException; + + SnapshotRecovery recover() throws IOException; + + void update(Collection records); + + int getRecordCount(); +} diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 0914a791df..eabac9dc80 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -61,6 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.nifi.wali.SequentialAccessWriteAheadLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,12 @@ import org.slf4j.LoggerFactory; *

* * @param type of record this WAL is for + * + * @deprecated This implementation is now deprecated in favor of {@link SequentialAccessWriteAheadLog}. + * This implementation, when given more than 1 partition, can have issues recovering after a sudden loss + * of power or an operating system crash. */ +@Deprecated public final class MinimalLockingWriteAheadLog implements WriteAheadRepository { private final Path basePath; @@ -105,7 +111,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor private volatile boolean recovered = false; public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe serde, final SyncListener syncListener) throws IOException { - this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory(serde), syncListener); + this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<>(serde), syncListener); } public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory serdeFactory, final SyncListener syncListener) throws IOException { @@ -113,7 +119,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor } public MinimalLockingWriteAheadLog(final SortedSet paths, final int partitionCount, final SerDe serde, final SyncListener syncListener) throws IOException { - this(paths, partitionCount, new SingletonSerDeFactory(serde), syncListener); + this(paths, partitionCount, new SingletonSerDeFactory<>(serde), syncListener); } /** @@ -645,6 +651,9 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor } finally { writeLock.unlock(); lockChannel.close(); + + final File lockFile = new File(basePath.toFile(), "wali.lock"); + lockFile.delete(); } } diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java index ffb11cadfc..7cc4fc0964 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SyncListener.java @@ -59,4 +59,14 @@ public interface SyncListener { * {@link WriteAheadRepository#sync()} method. */ void onGlobalSync(); + + public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() { + @Override + public void onSync(int partitionIndex) { + } + + @Override + public void onGlobalSync() { + } + }; } diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java new file mode 100644 index 0000000000..2492283b28 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestBlockingQueuePool.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.junit.Test; + +public class TestBlockingQueuePool { + private static final Consumer DO_NOTHING = ab -> {}; + + @Test + public void testReuse() { + final BlockingQueuePool pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean firstObject = pool.borrowObject(); + firstObject.set(true); + pool.returnObject(firstObject); + + for (int i = 0; i < 100; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertSame(firstObject, value); + pool.returnObject(value); + } + } + + @Test + public void testCreateOnExhaustion() { + final BlockingQueuePool pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean firstObject = pool.borrowObject(); + final AtomicBoolean secondObject = pool.borrowObject(); + + assertNotSame(firstObject, secondObject); + } + + @Test + public void testCreateMoreThanMaxCapacity() { + final BlockingQueuePool pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + for (int i = 0; i < 50; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertNotNull(value); + } + } + + @Test + public void testDoesNotBufferMoreThanCapacity() { + final BlockingQueuePool pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING); + + final AtomicBoolean[] seen = new AtomicBoolean[50]; + for (int i = 0; i < 50; i++) { + final AtomicBoolean value = pool.borrowObject(); + assertNotNull(value); + value.set(true); + seen[i] = value; + } + + for (final AtomicBoolean value : seen) { + pool.returnObject(value); + } + + for (int i = 0; i < 10; i++) { + final AtomicBoolean value = pool.borrowObject(); + + // verify that the object exists in the 'seen' array + boolean found = false; + for (final AtomicBoolean seenBoolean : seen) { + if (value == seenBoolean) { + found = true; + break; + } + } + + assertTrue(found); + } + + for (int i = 0; i < 40; i++) { + final AtomicBoolean value = pool.borrowObject(); + + // verify that the object does not exist in the 'seen' array + boolean found = false; + for (final AtomicBoolean seenBoolean : seen) { + if (value == seenBoolean) { + found = true; + break; + } + } + + assertFalse(found); + } + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java new file mode 100644 index 0000000000..692500e409 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestHashMapSnapshot.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; + +public class TestHashMapSnapshot { + + private final File storageDirectory = new File("target/test-hashmap-snapshot"); + private DummyRecordSerde serde; + private SerDeFactory serdeFactory; + + @Before + public void setup() throws IOException { + if (!storageDirectory.exists()) { + Files.createDirectories(storageDirectory.toPath()); + } + + final File[] childFiles = storageDirectory.listFiles(); + for (final File childFile : childFiles) { + if (childFile.isFile()) { + Files.delete(childFile.toPath()); + } + } + + serde = new DummyRecordSerde(); + serdeFactory = new SingletonSerDeFactory<>(serde); + + } + + @Test + public void testSuccessfulRoundTrip() throws IOException { + final HashMapSnapshot snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map props = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + for (int i = 2; i < 10; i += 2) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.DELETE); + snapshot.update(Collections.singleton(record)); + } + + for (int i = 1; i < 10; i += 2) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.SWAP_OUT); + record.setSwapLocation("swapFile-" + i); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapIn7 = new DummyRecord("7", UpdateType.SWAP_IN); + swapIn7.setSwapLocation("swapFile-7"); + snapshot.update(Collections.singleton(swapIn7)); + + final Set swappedOutLocations = new HashSet<>(); + swappedOutLocations.add("swapFile-1"); + swappedOutLocations.add("swapFile-3"); + swappedOutLocations.add("swapFile-5"); + swappedOutLocations.add("swapFile-9"); + + final SnapshotCapture capture = snapshot.prepareSnapshot(180L); + assertEquals(180L, capture.getMaxTransactionId()); + assertEquals(swappedOutLocations, capture.getSwapLocations()); + + final Map records = capture.getRecords(); + assertEquals(2, records.size()); + assertTrue(records.containsKey("0")); + assertTrue(records.containsKey("7")); + + snapshot.writeSnapshot(capture); + + final SnapshotRecovery recovery = snapshot.recover(); + assertEquals(180L, recovery.getMaxTransactionId()); + assertEquals(swappedOutLocations, recovery.getRecoveredSwapLocations()); + + final Map recoveredRecords = recovery.getRecords(); + assertEquals(records, recoveredRecords); + } + + @Test + public void testOOMEWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException { + final HashMapSnapshot snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map props = new HashMap<>(); + + for (int i = 0; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT); + swapOutRecord.setSwapLocation("SwapLocation-1"); + snapshot.update(Collections.singleton(swapOutRecord)); + + snapshot.writeSnapshot(snapshot.prepareSnapshot(25L)); + + serde.setThrowOOMEAfterNSerializeEdits(3); + + try { + snapshot.writeSnapshot(snapshot.prepareSnapshot(150L)); + Assert.fail("Expected OOME"); + } catch (final OutOfMemoryError oome) { + // expected + } + + final SnapshotRecovery recovery = snapshot.recover(); + assertEquals(25L, recovery.getMaxTransactionId()); + + final Map recordMap = recovery.getRecords(); + assertEquals(10, recordMap.size()); + for (int i = 0; i < 10; i++) { + assertTrue(recordMap.containsKey(String.valueOf(i))); + } + for (final Map.Entry entry : recordMap.entrySet()) { + final DummyRecord record = entry.getValue(); + final Map properties = record.getProperties(); + assertNotNull(properties); + assertEquals(1, properties.size()); + assertEquals(entry.getKey(), properties.get("key")); + } + + final Set swapLocations = recovery.getRecoveredSwapLocations(); + assertEquals(1, swapLocations.size()); + assertTrue(swapLocations.contains("SwapLocation-1")); + } + + @Test + public void testIOExceptionWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException { + final HashMapSnapshot snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory); + final Map props = new HashMap<>(); + + for (int i = 0; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + props.put("key", String.valueOf(i)); + record.setProperties(props); + snapshot.update(Collections.singleton(record)); + } + + final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT); + swapOutRecord.setSwapLocation("SwapLocation-1"); + snapshot.update(Collections.singleton(swapOutRecord)); + + snapshot.writeSnapshot(snapshot.prepareSnapshot(25L)); + + serde.setThrowIOEAfterNSerializeEdits(3); + + for (int i = 0; i < 5; i++) { + try { + snapshot.writeSnapshot(snapshot.prepareSnapshot(150L)); + Assert.fail("Expected IOE"); + } catch (final IOException ioe) { + // expected + } + } + + final SnapshotRecovery recovery = snapshot.recover(); + assertEquals(25L, recovery.getMaxTransactionId()); + + final Map recordMap = recovery.getRecords(); + assertEquals(10, recordMap.size()); + for (int i = 0; i < 10; i++) { + assertTrue(recordMap.containsKey(String.valueOf(i))); + } + for (final Map.Entry entry : recordMap.entrySet()) { + final DummyRecord record = entry.getValue(); + final Map properties = record.getProperties(); + assertNotNull(properties); + assertEquals(1, properties.size()); + assertEquals(entry.getKey(), properties.get("key")); + } + + final Set swapLocations = recovery.getRecoveredSwapLocations(); + assertEquals(1, swapLocations.size()); + assertTrue(swapLocations.contains("SwapLocation-1")); + } + +} diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java new file mode 100644 index 0000000000..94df890704 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; + +public class TestLengthDelimitedJournal { + private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal"); + private SerDeFactory serdeFactory; + private DummyRecordSerde serde; + private ObjectPool streamPool; + private static final int BUFFER_SIZE = 4096; + + @Before + public void setupJournal() throws IOException { + Files.deleteIfExists(journalFile.toPath()); + + if (!journalFile.getParentFile().exists()) { + Files.createDirectories(journalFile.getParentFile().toPath()); + } + + serde = new DummyRecordSerde(); + serdeFactory = new SingletonSerDeFactory<>(serde); + streamPool = new BlockingQueuePool<>(1, + () -> new ByteArrayDataOutputStream(BUFFER_SIZE), + stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE, + stream -> stream.getByteArrayOutputStream().reset()); + } + + @Test + public void testHandlingOfTrailingNulBytes() throws IOException { + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + final List firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + journal.update(firstTransaction, id -> null); + journal.update(secondTransaction, id -> null); + journal.update(thirdTransaction, id -> null); + } + + // Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes, + // as this is what we often see when we have a sudden power loss. + final byte[] contents = Files.readAllBytes(journalFile.toPath()); + final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8); + final byte[] withNuls = new byte[truncated.length + 28]; + System.arraycopy(truncated, 0, withNuls, 0, truncated.length); + + try (final OutputStream fos = new FileOutputStream(journalFile)) { + fos.write(withNuls); + } + + + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map recordMap = new HashMap<>(); + final Set swapLocations = new HashSet<>(); + + journal.recoverRecords(recordMap, swapLocations); + + assertFalse(recordMap.isEmpty()); + assertEquals(3, recordMap.size()); + + final DummyRecord record1 = recordMap.get("1"); + assertNotNull(record1); + assertEquals(Collections.singletonMap("abc", "123"), record1.getProperties()); + + final DummyRecord record2 = recordMap.get("2"); + assertNotNull(record2); + assertEquals(Collections.singletonMap("cba", "123"), record2.getProperties()); + + final DummyRecord record3 = recordMap.get("3"); + assertNotNull(record3); + assertEquals(Collections.singletonMap("aaa", "123"), record3.getProperties()); + } + } + + @Test + public void testUpdateOnlyAppliedIfEntireTransactionApplied() throws IOException { + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + for (int i = 0; i < 3; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + journal.update(Collections.singleton(record), key -> null); + } + + final DummyRecord swapOut1Record = new DummyRecord("1", UpdateType.SWAP_OUT); + swapOut1Record.setSwapLocation("swap12"); + journal.update(Collections.singleton(swapOut1Record), id -> null); + + final DummyRecord swapOut2Record = new DummyRecord("2", UpdateType.SWAP_OUT); + swapOut2Record.setSwapLocation("swap12"); + journal.update(Collections.singleton(swapOut2Record), id -> null); + + final List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord("1" + i, UpdateType.CREATE); + records.add(record); + } + + final DummyRecord swapIn1Record = new DummyRecord("1", UpdateType.SWAP_IN); + swapIn1Record.setSwapLocation("swap12"); + records.add(swapIn1Record); + + final DummyRecord swapOut1AgainRecord = new DummyRecord("1", UpdateType.SWAP_OUT); + swapOut1AgainRecord.setSwapLocation("swap12"); + records.add(swapOut1AgainRecord); + + final DummyRecord swapIn2Record = new DummyRecord("2", UpdateType.SWAP_IN); + swapIn2Record.setSwapLocation("swap12"); + records.add(swapIn2Record); + + final DummyRecord swapOut0Record = new DummyRecord("0", UpdateType.SWAP_OUT); + swapOut0Record.setSwapLocation("swap0"); + records.add(swapOut0Record); + + journal.update(records, id -> null); + } + + // Truncate the last 8 bytes so that we will get an EOFException when reading the last transaction. + try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) { + fos.getChannel().truncate(journalFile.length() - 8); + } + + + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map recordMap = new HashMap<>(); + final Set swapLocations = new HashSet<>(); + + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertEquals(5L, recovery.getMaxTransactionId()); + assertEquals(5, recovery.getUpdateCount()); + + final Set expectedSwap = Collections.singleton("swap12"); + assertEquals(expectedSwap, swapLocations); + + final Map expectedRecordMap = new HashMap<>(); + expectedRecordMap.put("0", new DummyRecord("0", UpdateType.CREATE)); + assertEquals(expectedRecordMap, recordMap); + } + } + + @Test + public void testPoisonedJournalNotWritableAfterIOE() throws IOException { + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + serde.setThrowIOEAfterNSerializeEdits(2); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + final RecordLookup lookup = key -> secondRecord; + try { + journal.update(Collections.singleton(thirdRecord), lookup); + Assert.fail("Expected IOException"); + } catch (final IOException ioe) { + // expected + } + + serde.setThrowIOEAfterNSerializeEdits(-1); + + final Collection records = Collections.singleton(thirdRecord); + for (int i = 0; i < 10; i++) { + try { + journal.update(records, lookup); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + + try { + journal.fsync(); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + } + } + } + + @Test + public void testPoisonedJournalNotWritableAfterOOME() throws IOException { + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + serde.setThrowOOMEAfterNSerializeEdits(2); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + final RecordLookup lookup = key -> secondRecord; + try { + journal.update(Collections.singleton(thirdRecord), lookup); + Assert.fail("Expected OOME"); + } catch (final OutOfMemoryError oome) { + // expected + } + + serde.setThrowOOMEAfterNSerializeEdits(-1); + + final Collection records = Collections.singleton(thirdRecord); + for (int i = 0; i < 10; i++) { + try { + journal.update(records, lookup); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + + try { + journal.fsync(); + Assert.fail("Expected IOException"); + } catch (final IOException expected) { + } + } + } + } + + @Test + public void testSuccessfulRoundTrip() throws IOException { + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(thirdRecord), key -> secondRecord); + + final Map recordMap = new HashMap<>(); + final Set swapLocations = new HashSet<>(); + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertFalse(recovery.isEOFExceptionEncountered()); + + assertEquals(2L, recovery.getMaxTransactionId()); + assertEquals(3, recovery.getUpdateCount()); + + assertTrue(swapLocations.isEmpty()); + assertEquals(1, recordMap.size()); + + final DummyRecord retrieved = recordMap.get("1"); + assertNotNull(retrieved); + assertEquals(thirdRecord, retrieved); + } + } + + @Test + public void testTruncatedJournalFile() throws IOException { + final DummyRecord firstRecord, secondRecord; + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + journal.writeHeader(); + + firstRecord = new DummyRecord("1", UpdateType.CREATE); + journal.update(Collections.singleton(firstRecord), key -> null); + + secondRecord = new DummyRecord("2", UpdateType.CREATE); + journal.update(Collections.singleton(secondRecord), key -> firstRecord); + + final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE); + journal.update(Collections.singleton(thirdRecord), key -> secondRecord); + } + + // Truncate the file + try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) { + fos.getChannel().truncate(journalFile.length() - 8); + } + + // Ensure that we are able to recover the first two records without an issue but the third is lost. + try (final LengthDelimitedJournal journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) { + final Map recordMap = new HashMap<>(); + final Set swapLocations = new HashSet<>(); + final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations); + assertTrue(recovery.isEOFExceptionEncountered()); + + assertEquals(2L, recovery.getMaxTransactionId()); // transaction ID is still 2 because that's what was written to the journal + assertEquals(2, recovery.getUpdateCount()); // only 2 updates because the last update will incur an EOFException and be skipped + + assertTrue(swapLocations.isEmpty()); + assertEquals(2, recordMap.size()); + + final DummyRecord retrieved1 = recordMap.get("1"); + assertNotNull(retrieved1); + assertEquals(firstRecord, retrieved1); + + final DummyRecord retrieved2 = recordMap.get("2"); + assertNotNull(retrieved2); + assertEquals(secondRecord, retrieved2); + } + } +} diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java new file mode 100644 index 0000000000..4fc0fe7945 --- /dev/null +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.wali; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +public class TestSequentialAccessWriteAheadLog { + @Rule + public TestName testName = new TestName(); + + @Test + public void testRecoverWithNoCheckpoint() throws IOException { + final SequentialAccessWriteAheadLog repo = createWriteRepo(); + + final List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + repo.shutdown(); + + final SequentialAccessWriteAheadLog recoveryRepo = createRecoveryRepo(); + final Collection recovered = recoveryRepo.recoverRecords(); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(new HashSet<>(records), new HashSet<>(recovered)); + } + + @Test + public void testRecoverWithNoJournalUpdates() throws IOException { + final SequentialAccessWriteAheadLog repo = createWriteRepo(); + + final List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + repo.checkpoint(); + repo.shutdown(); + + final SequentialAccessWriteAheadLog recoveryRepo = createRecoveryRepo(); + final Collection recovered = recoveryRepo.recoverRecords(); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(new HashSet<>(records), new HashSet<>(recovered)); + } + + @Test + public void testRecoverWithMultipleCheckpointsBetweenJournalUpdate() throws IOException { + final SequentialAccessWriteAheadLog repo = createWriteRepo(); + + final List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + records.add(record); + } + + repo.update(records, false); + + for (int i = 0; i < 8; i++) { + repo.checkpoint(); + } + + final DummyRecord updateRecord = new DummyRecord("4", UpdateType.UPDATE); + updateRecord.setProperties(Collections.singletonMap("updated", "true")); + repo.update(Collections.singleton(updateRecord), false); + + repo.shutdown(); + + final SequentialAccessWriteAheadLog recoveryRepo = createRecoveryRepo(); + final Collection recovered = recoveryRepo.recoverRecords(); + + // what we expect is the same as what we updated with, except we don't want the DummyRecord for CREATE 4 + // because we will instead recover an UPDATE only for 4. + final Set expected = new HashSet<>(records); + expected.remove(new DummyRecord("4", UpdateType.CREATE)); + expected.add(updateRecord); + + // ensure that we get the same records back, but the order may be different, so wrap both collections + // in a HashSet so that we can compare unordered collections of the same type. + assertEquals(expected, new HashSet<>(recovered)); + } + + private SequentialAccessWriteAheadLog createRecoveryRepo() throws IOException { + final File targetDir = new File("target"); + final File storageDir = new File(targetDir, testName.getMethodName()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory serdeFactory = new SingletonSerDeFactory<>(serde); + final SequentialAccessWriteAheadLog repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory); + + return repo; + } + + private SequentialAccessWriteAheadLog createWriteRepo() throws IOException { + final File targetDir = new File("target"); + final File storageDir = new File(targetDir, testName.getMethodName()); + deleteRecursively(storageDir); + assertTrue(storageDir.mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory serdeFactory = new SingletonSerDeFactory<>(serde); + final SequentialAccessWriteAheadLog repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory); + + final Collection recovered = repo.recoverRecords(); + assertNotNull(recovered); + assertTrue(recovered.isEmpty()); + + return repo; + } + + /** + * This test is designed to update the repository in several different wants, testing CREATE, UPDATE, SWAP IN, SWAP OUT, and DELETE + * update types, as well as testing updates with single records and with multiple records in a transaction. It also verifies that we + * are able to checkpoint, then update journals, and then recover updates to both the checkpoint and the journals. + */ + @Test + public void testUpdateThenRecover() throws IOException { + final SequentialAccessWriteAheadLog repo = createWriteRepo(); + + final DummyRecord firstCreate = new DummyRecord("0", UpdateType.CREATE); + repo.update(Collections.singleton(firstCreate), false); + + final List creations = new ArrayList<>(); + for (int i = 1; i < 11; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + creations.add(record); + } + repo.update(creations, false); + + final DummyRecord deleteRecord3 = new DummyRecord("3", UpdateType.DELETE); + repo.update(Collections.singleton(deleteRecord3), false); + + final DummyRecord swapOutRecord4 = new DummyRecord("4", UpdateType.SWAP_OUT); + swapOutRecord4.setSwapLocation("swap"); + + final DummyRecord swapOutRecord5 = new DummyRecord("5", UpdateType.SWAP_OUT); + swapOutRecord5.setSwapLocation("swap"); + + final List swapOuts = new ArrayList<>(); + swapOuts.add(swapOutRecord4); + swapOuts.add(swapOutRecord5); + repo.update(swapOuts, false); + + final DummyRecord swapInRecord5 = new DummyRecord("5", UpdateType.SWAP_IN); + swapInRecord5.setSwapLocation("swap"); + repo.update(Collections.singleton(swapInRecord5), false); + + final int recordCount = repo.checkpoint(); + assertEquals(9, recordCount); + + final DummyRecord updateRecord6 = new DummyRecord("6", UpdateType.UPDATE); + updateRecord6.setProperties(Collections.singletonMap("greeting", "hello")); + repo.update(Collections.singleton(updateRecord6), false); + + final List updateRecords = new ArrayList<>(); + for (int i = 7; i < 11; i++) { + final DummyRecord updateRecord = new DummyRecord(String.valueOf(i), UpdateType.UPDATE); + updateRecord.setProperties(Collections.singletonMap("greeting", "hi")); + updateRecords.add(updateRecord); + } + + final DummyRecord deleteRecord2 = new DummyRecord("2", UpdateType.DELETE); + updateRecords.add(deleteRecord2); + + repo.update(updateRecords, false); + + repo.shutdown(); + + final SequentialAccessWriteAheadLog recoveryRepo = createRecoveryRepo(); + final Collection recoveredRecords = recoveryRepo.recoverRecords(); + + // We should now have records: + // 0-10 CREATED + // 2 & 3 deleted + // 4 & 5 swapped out + // 5 swapped back in + // 6 updated with greeting = hello + // 7-10 updated with greeting = hi + + assertEquals(8, recoveredRecords.size()); + final Map recordMap = recoveredRecords.stream() + .collect(Collectors.toMap(record -> record.getId(), Function.identity())); + + assertFalse(recordMap.containsKey("2")); + assertFalse(recordMap.containsKey("3")); + assertFalse(recordMap.containsKey("4")); + + assertTrue(recordMap.get("1").getProperties().isEmpty()); + assertTrue(recordMap.get("5").getProperties().isEmpty()); + + assertEquals("hello", recordMap.get("6").getProperties().get("greeting")); + + for (int i = 7; i < 11; i++) { + assertEquals("hi", recordMap.get(String.valueOf(i)).getProperties().get("greeting")); + } + + recoveryRepo.shutdown(); + } + + + @Test + @Ignore("For manual performance testing") + public void testUpdatePerformance() throws IOException, InterruptedException { + final Path path = Paths.get("target/sequential-access-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final SerDeFactory serdeFactory = new SingletonSerDeFactory<>(serde); + + final WriteAheadRepository repo = new SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory); + final Collection initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final long updateCountPerThread = 1_000_000; + final int numThreads = 4; + + final Thread[] threads = new Thread[numThreads]; + final int batchSize = 1; + + long previousBytes = 0L; + + for (int j = 0; j < 2; j++) { + for (int i = 0; i < numThreads; i++) { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final List batch = new ArrayList<>(); + for (int i = 0; i < updateCountPerThread / batchSize; i++) { + batch.clear(); + for (int j = 0; j < batchSize; j++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + batch.add(record); + } + + try { + repo.update(batch, false); + } catch (Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); + } + } + } + }); + + threads[i] = t; + } + + final long start = System.nanoTime(); + for (final Thread t : threads) { + t.start(); + } + for (final Thread t : threads) { + t.join(); + } + + long bytes = 0L; + for (final File journalFile : path.resolve("journals").toFile().listFiles()) { + bytes += journalFile.length(); + } + + bytes -= previousBytes; + previousBytes = bytes; + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis; + final String eps = NumberFormat.getInstance().format(eventsPerSecond); + final long bytesPerSecond = bytes * 1000 / millis; + final String bps = NumberFormat.getInstance().format(bytesPerSecond); + + if (j == 0) { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + + " threads, *as a warmup!* " + eps + " events per second, " + bps + " bytes per second"); + } else { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + + " threads, " + eps + " events per second, " + bps + " bytes per second"); + } + } + } + + private void deleteRecursively(final File file) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + + file.delete(); + } + +} diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java index bf15ba7206..1ae7178a22 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java @@ -19,12 +19,14 @@ package org.wali; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class DummyRecord { private final String id; private final Map props; private final UpdateType updateType; + private String swapLocation; public DummyRecord(final String id, final UpdateType updateType) { this.id = id; @@ -59,8 +61,37 @@ public class DummyRecord { return props.get(name); } + public String getSwapLocation() { + return swapLocation; + } + + public void setSwapLocation(String swapLocation) { + this.swapLocation = swapLocation; + } + @Override public String toString() { return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]"; } + + @Override + public int hashCode() { + return Objects.hash(this.id, this.props, this.updateType, this.swapLocation); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (!(obj instanceof DummyRecord)) { + return false; + } + final DummyRecord other = (DummyRecord) obj; + return Objects.equals(id, other.id) && Objects.equals(props, other.props) && Objects.equals(updateType, other.updateType) && Objects.equals(swapLocation, other.swapLocation); + } } diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index e9f3b0194f..1f6aede9db 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -27,6 +27,7 @@ public class DummyRecordSerde implements SerDe { private int throwOOMEAfterNserializeEdits = -1; private int serializeEditCount = 0; + @SuppressWarnings("fallthrough") @Override public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { @@ -39,14 +40,28 @@ public class DummyRecordSerde implements SerDe { out.writeUTF(record.getUpdateType().name()); out.writeUTF(record.getId()); - if (record.getUpdateType() != UpdateType.DELETE) { - final Map props = record.getProperties(); - out.writeInt(props.size()); - for (final Map.Entry entry : props.entrySet()) { - out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue()); + switch (record.getUpdateType()) { + case DELETE: + break; + case SWAP_IN: { + out.writeUTF(record.getSwapLocation()); + // intentionally fall through to CREATE/UPDATE block } + case CREATE: + case UPDATE: { + final Map props = record.getProperties(); + out.writeInt(props.size()); + for (final Map.Entry entry : props.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + break; + case SWAP_OUT: + out.writeUTF(record.getSwapLocation()); + break; } + } @Override @@ -55,20 +70,36 @@ public class DummyRecordSerde implements SerDe { } @Override + @SuppressWarnings("fallthrough") public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { final String updateTypeName = in.readUTF(); final UpdateType updateType = UpdateType.valueOf(updateTypeName); final String id = in.readUTF(); final DummyRecord record = new DummyRecord(id, updateType); - if (record.getUpdateType() != UpdateType.DELETE) { - final int numProps = in.readInt(); - for (int i = 0; i < numProps; i++) { - final String key = in.readUTF(); - final String value = in.readUTF(); - record.setProperty(key, value); + switch (record.getUpdateType()) { + case DELETE: + break; + case SWAP_IN: { + final String swapLocation = in.readUTF(); + record.setSwapLocation(swapLocation); + // intentionally fall through to the CREATE/UPDATE block } + case CREATE: + case UPDATE: + final int numProps = in.readInt(); + for (int i = 0; i < numProps; i++) { + final String key = in.readUTF(); + final String value = in.readUTF(); + record.setProperty(key, value); + } + break; + case SWAP_OUT: + final String swapLocation = in.readUTF(); + record.setSwapLocation(swapLocation); + break; } + return record; } @@ -102,6 +133,6 @@ public class DummyRecordSerde implements SerDe { @Override public String getLocation(final DummyRecord record) { - return null; + return record.getSwapLocation(); } } diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index ef33f57fe7..20009d12d8 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -34,6 +34,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -139,9 +140,9 @@ public class TestMinimalLockingWriteAheadLog { } @Test - @Ignore("for local testing only") + @Ignore("For manual performance testing") public void testUpdatePerformance() throws IOException, InterruptedException { - final int numPartitions = 4; + final int numPartitions = 16; final Path path = Paths.get("target/minimal-locking-repo"); deleteRecursively(path.toFile()); @@ -152,23 +153,34 @@ public class TestMinimalLockingWriteAheadLog { final Collection initialRecs = repo.recoverRecords(); assertTrue(initialRecs.isEmpty()); - final int updateCountPerThread = 1_000_000; - final int numThreads = 16; + final long updateCountPerThread = 1_000_000; + final int numThreads = 4; final Thread[] threads = new Thread[numThreads]; + final int batchSize = 1; + + long previousBytes = 0; + for (int j = 0; j < 2; j++) { for (int i = 0; i < numThreads; i++) { final Thread t = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < updateCountPerThread; i++) { - final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + final List batch = new ArrayList<>(); + + for (int i = 0; i < updateCountPerThread / batchSize; i++) { + batch.clear(); + for (int j = 0; j < batchSize; j++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + batch.add(record); + } + try { - repo.update(Collections.singleton(record), false); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.toString()); + repo.update(batch, false); + } catch (Throwable t) { + t.printStackTrace(); + Assert.fail(t.toString()); } } } @@ -185,11 +197,30 @@ public class TestMinimalLockingWriteAheadLog { t.join(); } + long bytes = 0L; + for (final File file : path.toFile().listFiles()) { + if (file.getName().startsWith("partition-")) { + for (final File journalFile : file.listFiles()) { + bytes += journalFile.length(); + } + } + } + + bytes -= previousBytes; + previousBytes = bytes; + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis; + final String eps = NumberFormat.getInstance().format(eventsPerSecond); + final long bytesPerSecond = bytes * 1000 / millis; + final String bps = NumberFormat.getInstance().format(bytesPerSecond); + if (j == 0) { - System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads, *as a warmup!*"); + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, *as a warmup!* " + + eps + " events per second, " + bps + " bytes per second"); } else { - System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads"); + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, " + + eps + " events per second, " + bps + " bytes per second"); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 00dde06b13..3901029695 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -16,10 +16,10 @@ */ package org.apache.nifi.controller.repository; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -47,6 +48,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.wali.SequentialAccessWriteAheadLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wali.MinimalLockingWriteAheadLog; @@ -86,7 +88,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private volatile ScheduledFuture checkpointFuture; private final long checkpointDelayMillis; - private final SortedSet flowFileRepositoryPaths = new TreeSet<>(); + private final File flowFileRepositoryPath; + private final List recoveryFiles = new ArrayList<>(); private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; @@ -126,16 +129,23 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis checkpointDelayMillis = 0l; numPartitions = 0; checkpointExecutor = null; + flowFileRepositoryPath = null; } public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false")); // determine the database file path and ensure it exists + final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX); + flowFileRepositoryPath = new File(directoryName); + + // We used to use the MinimalLockingWriteAheadLog, but we now use the SequentialAccessWriteAheadLog. Since the + // MinimalLockingWriteAheadLog supports multiple partitions, we need to ensure that we recover records from all + // partitions, so we build up a List of Files for the recovery files. for (final String propertyName : nifiProperties.getPropertyKeys()) { if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) { - final String directoryName = nifiProperties.getProperty(propertyName); - flowFileRepositoryPaths.add(Paths.get(directoryName)); + final String dirName = nifiProperties.getProperty(propertyName); + recoveryFiles.add(new File(dirName)); } } @@ -149,16 +159,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis public void initialize(final ResourceClaimManager claimManager) throws IOException { this.claimManager = claimManager; - for (final Path path : flowFileRepositoryPaths) { - Files.createDirectories(path); - } + Files.createDirectories(flowFileRepositoryPath.toPath()); // TODO: Should ensure that only 1 instance running and pointing at a particular path // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory serdeFactory = new RepositoryRecordSerdeFactory(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, numPartitions, serdeFactory, this); + wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPath, serdeFactory, this); logger.info("Initialized FlowFile Repository using {} partitions", numPartitions); } @@ -179,22 +187,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public long getStorageCapacity() throws IOException { - long capacity = 0L; - for (final Path path : flowFileRepositoryPaths) { - capacity += Files.getFileStore(path).getTotalSpace(); - } - - return capacity; + return Files.getFileStore(flowFileRepositoryPath.toPath()).getTotalSpace(); } @Override public long getUsableStorageSpace() throws IOException { - long usableSpace = 0L; - for (final Path path : flowFileRepositoryPaths) { - usableSpace += Files.getFileStore(path).getUsableSpace(); - } - - return usableSpace; + return Files.getFileStore(flowFileRepositoryPath.toPath()).getUsableSpace(); } @Override @@ -371,6 +369,72 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); } + + @SuppressWarnings("deprecation") + private Optional> recoverFromOldWriteAheadLog() throws IOException { + final List partitionDirs = new ArrayList<>(); + for (final File recoveryFile : recoveryFiles) { + final File[] partitions = recoveryFile.listFiles(file -> file.getName().startsWith("partition-")); + for (final File partition : partitions) { + partitionDirs.add(partition); + } + } + + if (partitionDirs == null || partitionDirs.isEmpty()) { + return Optional.empty(); + } + + logger.info("Encountered FlowFile Repository that was written using an old version of the Write-Ahead Log. " + + "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log."); + + final SortedSet paths = recoveryFiles.stream() + .map(File::toPath) + .collect(Collectors.toCollection(TreeSet::new)); + + final Collection recordList; + final MinimalLockingWriteAheadLog minimalLockingWal = new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, null); + try { + recordList = minimalLockingWal.recoverRecords(); + } finally { + minimalLockingWal.shutdown(); + } + + wal.update(recordList, true); + + // Delete the old repository + logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files."); + for (final File partitionDir : partitionDirs) { + final File[] children = partitionDir.listFiles(); + + if (children != null) { + for (final File child : children) { + final boolean deleted = child.delete(); + if (!deleted) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", child); + } + } + } + + if (!partitionDir.delete()) { + logger.warn("Failed to delete old directory {}; this directory should be cleaned up manually", partitionDir); + } + } + + for (final File recoveryFile : recoveryFiles) { + final File snapshotFile = new File(recoveryFile, "snapshot"); + if (!snapshotFile.delete() && snapshotFile.exists()) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", snapshotFile); + } + + final File partialFile = new File(recoveryFile, "snapshot.partial"); + if (!partialFile.delete() && partialFile.exists()) { + logger.warn("Failed to delete old file {}; this file should be cleaned up manually", partialFile); + } + } + + return Optional.of(recordList); + } + @Override public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException { final Map queueMap = new HashMap<>(); @@ -378,7 +442,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis queueMap.put(queue.getIdentifier(), queue); } serdeFactory.setQueueMap(queueMap); - final Collection recordList = wal.recoverRecords(); + + // Since we used to use the MinimalLockingWriteAheadRepository, we need to ensure that if the FlowFile + // Repo was written using that impl, that we properly recover from the implementation. + Collection recordList = wal.recoverRecords(); + + if (recordList == null || recordList.isEmpty()) { + recordList = recoverFromOldWriteAheadLog().orElse(new ArrayList<>()); + } + serdeFactory.setQueueMap(null); for (final RepositoryRecord record : recordList) {