NIFI-4774: Implementation of SequentialAccessWriteAheadLog and updates to WriteAheadFlowFileRepository to make use of the new implementation instead of MinimalLockingWriteAheadLog.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2416
This commit is contained in:
Mark Payne 2018-01-17 14:24:04 -05:00 committed by Matthew Burgess
parent 14fef2de14
commit 0bcb241db3
26 changed files with 2960 additions and 48 deletions

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class BlockingQueuePool<T> implements ObjectPool<T> {
private final BlockingQueue<T> queue;
private final Supplier<T> creationFunction;
private final Predicate<T> reuseCheck;
private final Consumer<T> returnPreparation;
public BlockingQueuePool(final int maxSize, final Supplier<T> creationFunction, final Predicate<T> reuseCheck, final Consumer<T> returnPreparation) {
this.queue = new LinkedBlockingQueue<>(maxSize);
this.creationFunction = creationFunction;
this.reuseCheck = reuseCheck;
this.returnPreparation = returnPreparation;
}
@Override
public T borrowObject() {
final T existing = queue.poll();
if (existing != null) {
return existing;
}
return creationFunction.get();
}
@Override
public void returnObject(final T somethingBorrowed) {
if (reuseCheck.test(somethingBorrowed)) {
returnPreparation.accept(somethingBorrowed);
queue.offer(somethingBorrowed);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
/**
* A wrapper around a DataOutputStream, which wraps a ByteArrayOutputStream.
* This allows us to obtain the DataOutputStream itself so that we can perform
* writeXYZ methods and also allows us to obtain the underlying ByteArrayOutputStream
* for performing methods such as size(), reset(), writeTo()
*/
public class ByteArrayDataOutputStream {
private final ByteArrayOutputStream baos;
private final DataOutputStream dos;
public ByteArrayDataOutputStream(final int intiialBufferSize) {
this.baos = new ByteArrayOutputStream(intiialBufferSize);
this.dos = new DataOutputStream(baos);
}
public DataOutputStream getDataOutputStream() {
return dos;
}
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
}
}

View File

@ -0,0 +1,366 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> {
private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class);
private static final int ENCODING_VERSION = 1;
private final ConcurrentMap<Object, T> recordMap = new ConcurrentHashMap<>();
private final SerDeFactory<T> serdeFactory;
private final Set<String> swapLocations = Collections.synchronizedSet(new HashSet<>());
private final File storageDirectory;
public HashMapSnapshot(final File storageDirectory, final SerDeFactory<T> serdeFactory) {
this.serdeFactory = serdeFactory;
this.storageDirectory = storageDirectory;
}
private SnapshotHeader validateHeader(final DataInputStream dataIn) throws IOException {
final String snapshotClass = dataIn.readUTF();
logger.debug("Snapshot Class Name for {} is {}", storageDirectory, snapshotClass);
if (!snapshotClass.equals(HashMapSnapshot.class.getName())) {
throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using the "
+ snapshotClass + " class; cannot restore using " + getClass().getName());
}
final int snapshotVersion = dataIn.readInt();
logger.debug("Snapshot version for {} is {}", storageDirectory, snapshotVersion);
if (snapshotVersion > getVersion()) {
throw new IOException("Write-Ahead Log Snapshot located at " + storageDirectory + " was written using version "
+ snapshotVersion + " of the " + snapshotClass + " class; cannot restore using Version " + getVersion());
}
final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now
logger.debug("Serde encoding for Snapshot at {} is {}", storageDirectory, serdeEncoding);
final int serdeVersion = dataIn.readInt();
logger.debug("Serde version for Snapshot at {} is {}", storageDirectory, serdeVersion);
final long maxTransactionId = dataIn.readLong();
logger.debug("Max Transaction ID for Snapshot at {} is {}", storageDirectory, maxTransactionId);
final int numRecords = dataIn.readInt();
logger.debug("Number of Records for Snapshot at {} is {}", storageDirectory, numRecords);
final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
serde.readHeader(dataIn);
return new SnapshotHeader(serde, serdeVersion, maxTransactionId, numRecords);
}
@Override
public SnapshotRecovery<T> recover() throws IOException {
final File partialFile = getPartialFile();
final File snapshotFile = getSnapshotFile();
final boolean partialExists = partialFile.exists();
final boolean snapshotExists = snapshotFile.exists();
// If there is no snapshot (which is the case before the first snapshot is ever created), then just
// return an empty recovery.
if (!partialExists && !snapshotExists) {
return SnapshotRecovery.emptyRecovery();
}
if (partialExists && snapshotExists) {
// both files exist -- assume NiFi crashed/died while checkpointing. Delete the partial file.
Files.delete(partialFile.toPath());
} else if (partialExists) {
// partial exists but snapshot does not -- we must have completed
// creating the partial, deleted the snapshot
// but crashed before renaming the partial to the snapshot. Just
// rename partial to snapshot
Files.move(partialFile.toPath(), snapshotFile.toPath());
}
if (snapshotFile.length() == 0) {
logger.warn("{} Found 0-byte Snapshot file; skipping Snapshot file in recovery", this);
return SnapshotRecovery.emptyRecovery();
}
// At this point, we know the snapshotPath exists because if it didn't, then we either returned null
// or we renamed partialPath to snapshotPath. So just Recover from snapshotPath.
try (final DataInputStream dataIn = new DataInputStream(new BufferedInputStream(new FileInputStream(snapshotFile)))) {
// Ensure that the header contains the information that we expect and retrieve the relevant information from the header.
final SnapshotHeader header = validateHeader(dataIn);
final SerDe<T> serde = header.getSerDe();
final int serdeVersion = header.getSerDeVersion();
final int numRecords = header.getNumRecords();
final long maxTransactionId = header.getMaxTransactionId();
// Read all of the records that we expect to receive.
for (int i = 0; i < numRecords; i++) {
final T record = serde.deserializeRecord(dataIn, serdeVersion);
if (record == null) {
throw new EOFException();
}
final UpdateType updateType = serde.getUpdateType(record);
if (updateType == UpdateType.DELETE) {
logger.warn("While recovering from snapshot, found record with type 'DELETE'; this record will not be restored");
continue;
}
logger.trace("Recovered from snapshot: {}", record);
recordMap.put(serde.getRecordIdentifier(record), record);
}
// Determine the location of any swap files.
final int numSwapRecords = dataIn.readInt();
final Set<String> swapLocations = new HashSet<>();
for (int i = 0; i < numSwapRecords; i++) {
swapLocations.add(dataIn.readUTF());
}
this.swapLocations.addAll(swapLocations);
logger.info("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}",
new Object[] {this, numRecords, swapLocations.size(), maxTransactionId});
return new StandardSnapshotRecovery<>(recordMap, swapLocations, snapshotFile, maxTransactionId);
}
}
@Override
public void update(final Collection<T> records) {
// This implementation of Snapshot keeps a ConcurrentHashMap of all 'active' records
// (meaning records that have not been removed and are not swapped out), keyed by the
// Record Identifier. It keeps only the most up-to-date version of the Record. This allows
// us to write the snapshot very quickly without having to re-process the journal files.
// For each update, then, we will update the record in the map.
for (final T record : records) {
final Object recordId = serdeFactory.getRecordIdentifier(record);
final UpdateType updateType = serdeFactory.getUpdateType(record);
switch (updateType) {
case DELETE:
recordMap.remove(recordId);
break;
case SWAP_OUT:
final String location = serdeFactory.getLocation(record);
if (location == null) {
logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_OUT but "
+ "no indicator of where the Record is to be Swapped Out to; these records may be "
+ "lost when the repository is restored!");
} else {
recordMap.remove(recordId);
this.swapLocations.add(location);
}
break;
case SWAP_IN:
final String swapLocation = serdeFactory.getLocation(record);
if (swapLocation == null) {
logger.error("Received Record (ID=" + recordId + ") with UpdateType of SWAP_IN but no "
+ "indicator of where the Record is to be Swapped In from; these records may be duplicated "
+ "when the repository is restored!");
} else {
swapLocations.remove(swapLocation);
}
recordMap.put(recordId, record);
break;
default:
recordMap.put(recordId, record);
break;
}
}
}
@Override
public int getRecordCount() {
return recordMap.size();
}
@Override
public T lookup(final Object recordId) {
return recordMap.get(recordId);
}
@Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId);
}
private int getVersion() {
return ENCODING_VERSION;
}
private File getPartialFile() {
return new File(storageDirectory, "checkpoint.partial");
}
private File getSnapshotFile() {
return new File(storageDirectory, "checkpoint");
}
@Override
public synchronized void writeSnapshot(final SnapshotCapture<T> snapshot) throws IOException {
final SerDe<T> serde = serdeFactory.createSerDe(null);
final File snapshotFile = getSnapshotFile();
final File partialFile = getPartialFile();
// We must ensure that we do not overwrite the existing Snapshot file directly because if NiFi were
// to be killed or crash when we are partially finished, we'd end up with no viable Snapshot file at all.
// To avoid this, we write to a 'partial' file, then delete the existing Snapshot file, if it exists, and
// rename 'partial' to Snaphsot. That way, if NiFi crashes, we can still restore the Snapshot by first looking
// for a Snapshot file and restoring it, if it exists. If it does not exist, then we restore from the partial file,
// assuming that NiFi crashed after deleting the Snapshot file and before renaming the partial file.
//
// If there is no Snapshot file currently but there is a Partial File, then this indicates
// that we have deleted the Snapshot file and failed to rename the Partial File. We don't want
// to overwrite the Partial file, because doing so could potentially lose data. Instead, we must
// first rename it to Snapshot and then write to the partial file.
if (!snapshotFile.exists() && partialFile.exists()) {
final boolean rename = partialFile.renameTo(snapshotFile);
if (!rename) {
throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile);
}
}
// Write to the partial file.
try (final FileOutputStream fileOut = new FileOutputStream(getPartialFile());
final OutputStream bufferedOut = new BufferedOutputStream(fileOut);
final DataOutputStream dataOut = new DataOutputStream(bufferedOut)) {
// Write out the header
dataOut.writeUTF(HashMapSnapshot.class.getName());
dataOut.writeInt(getVersion());
dataOut.writeUTF(serde.getClass().getName());
dataOut.writeInt(serde.getVersion());
dataOut.writeLong(snapshot.getMaxTransactionId());
dataOut.writeInt(snapshot.getRecords().size());
serde.writeHeader(dataOut);
// Serialize each record
for (final T record : snapshot.getRecords().values()) {
logger.trace("Checkpointing {}", record);
serde.serializeRecord(record, dataOut);
}
// Write out the number of swap locations, followed by the swap locations themselves.
dataOut.writeInt(snapshot.getSwapLocations().size());
for (final String swapLocation : snapshot.getSwapLocations()) {
dataOut.writeUTF(swapLocation);
}
// Ensure that we flush the Buffered Output Stream and then perform an fsync().
// This ensures that the data is fully written to disk before we delete the existing snapshot.
dataOut.flush();
fileOut.getChannel().force(false);
}
// If the snapshot file exists, delete it
if (snapshotFile.exists()) {
if (!snapshotFile.delete()) {
logger.warn("Unable to delete existing Snapshot file " + snapshotFile);
}
}
// Rename the partial file to Snapshot.
final boolean rename = partialFile.renameTo(snapshotFile);
if (!rename) {
throw new IOException("Failed to rename partial snapshot file " + partialFile + " to " + snapshotFile);
}
}
public class Snapshot implements SnapshotCapture<T> {
private final Map<Object, T> records;
private final long maxTransactionId;
private final Set<String> swapLocations;
public Snapshot(final Map<Object, T> records, final Set<String> swapLocations, final long maxTransactionId) {
this.records = records;
this.swapLocations = swapLocations;
this.maxTransactionId = maxTransactionId;
}
@Override
public final Map<Object, T> getRecords() {
return records;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public Set<String> getSwapLocations() {
return swapLocations;
}
}
private class SnapshotHeader {
private final SerDe<T> serde;
private final int serdeVersion;
private final int numRecords;
private final long maxTransactionId;
public SnapshotHeader(final SerDe<T> serde, final int serdeVersion, final long maxTransactionId, final int numRecords) {
this.serde = serde;
this.serdeVersion = serdeVersion;
this.maxTransactionId = maxTransactionId;
this.numRecords = numRecords;
}
public SerDe<T> getSerDe() {
return serde;
}
public int getSerDeVersion() {
return serdeVersion;
}
public long getMaxTransactionId() {
return maxTransactionId;
}
public int getNumRecords() {
return numRecords;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public interface JournalRecovery {
int getUpdateCount();
long getMaxTransactionId();
boolean isEOFExceptionEncountered();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public interface JournalSummary {
/**
* @return the Transaction ID of the first transaction written
*/
long getFirstTransactionId();
/**
* @return the Transaction ID of the last transaction written
*/
long getLastTransactionId();
/**
* @return the number of transactions that were written to the journal
*/
int getTransactionCount();
}

View File

@ -0,0 +1,478 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class);
private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0);
private static final int JOURNAL_ENCODING_VERSION = 1;
private static final byte TRANSACTION_FOLLOWS = 64;
private static final byte JOURNAL_COMPLETE = 127;
private static final int NUL_BYTE = 0;
private final File journalFile;
private final long initialTransactionId;
private final SerDeFactory<T> serdeFactory;
private final ObjectPool<ByteArrayDataOutputStream> streamPool;
private SerDe<T> serde;
private FileOutputStream fileOut;
private BufferedOutputStream bufferedOut;
private long currentTransactionId;
private int transactionCount;
private boolean headerWritten = false;
private volatile boolean poisoned = false;
private volatile boolean closed = false;
private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block
public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId) {
this.journalFile = journalFile;
this.serdeFactory = serdeFactory;
this.serde = serdeFactory.createSerDe(null);
this.streamPool = streamPool;
this.initialTransactionId = initialTransactionId;
this.currentTransactionId = initialTransactionId;
}
private synchronized OutputStream getOutputStream() throws FileNotFoundException {
if (fileOut == null) {
fileOut = new FileOutputStream(journalFile);
bufferedOut = new BufferedOutputStream(fileOut);
}
return bufferedOut;
}
@Override
public synchronized void writeHeader() throws IOException {
try {
final DataOutputStream outStream = new DataOutputStream(getOutputStream());
outStream.writeUTF(LengthDelimitedJournal.class.getName());
outStream.writeInt(JOURNAL_ENCODING_VERSION);
serde = serdeFactory.createSerDe(null);
outStream.writeUTF(serde.getClass().getName());
outStream.writeInt(serde.getVersion());
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
serde.writeHeader(dos);
dos.flush();
final int serdeHeaderLength = baos.size();
outStream.writeInt(serdeHeaderLength);
baos.writeTo(outStream);
}
outStream.flush();
} catch (final Throwable t) {
poison(t);
final IOException ioe = (t instanceof IOException) ? (IOException) t : new IOException("Failed to create journal file " + journalFile, t);
logger.error("Failed to create new journal file {} due to {}", journalFile, ioe.toString(), ioe);
throw ioe;
}
headerWritten = true;
}
private synchronized SerDeAndVersion validateHeader(final DataInputStream in) throws IOException {
final String journalClassName = in.readUTF();
logger.debug("Write Ahead Log Class Name for {} is {}", journalFile, journalClassName);
if (!LengthDelimitedJournal.class.getName().equals(journalClassName)) {
throw new IOException("Invalid header information - " + journalFile + " does not appear to be a valid journal file.");
}
final int encodingVersion = in.readInt();
logger.debug("Encoding version for {} is {}", journalFile, encodingVersion);
if (encodingVersion > JOURNAL_ENCODING_VERSION) {
throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion
+ " but this version of the code only understands version " + JOURNAL_ENCODING_VERSION + " and below");
}
final String serdeClassName = in.readUTF();
logger.debug("Serde Class Name for {} is {}", journalFile, serdeClassName);
final SerDe<T> serde;
try {
serde = serdeFactory.createSerDe(serdeClassName);
} catch (final IllegalArgumentException iae) {
throw new IOException("Cannot read journal file " + journalFile + " because the serializer/deserializer used was " + serdeClassName
+ " but this repository is configured to use a different type of serializer/deserializer");
}
final int serdeVersion = in.readInt();
logger.debug("Serde version is {}", serdeVersion);
if (serdeVersion > serde.getVersion()) {
throw new IOException("Cannot read journal file " + journalFile + " because it is encoded using veresion " + encodingVersion
+ " of the serializer/deserializer but this version of the code only understands version " + serde.getVersion() + " and below");
}
final int serdeHeaderLength = in.readInt();
final InputStream serdeHeaderIn = new LimitingInputStream(in, serdeHeaderLength);
final DataInputStream dis = new DataInputStream(serdeHeaderIn);
serde.readHeader(dis);
return new SerDeAndVersion(serde, serdeVersion);
}
@Override
public void update(final Collection<T> records, final RecordLookup<T> recordLookup) throws IOException {
if (!headerWritten) {
throw new IllegalStateException("Cannot update journal file " + journalFile + " because no header has been written yet.");
}
if (records.isEmpty()) {
return;
}
checkState();
final ByteArrayDataOutputStream bados = streamPool.borrowObject();
try {
for (final T record : records) {
final Object recordId = serde.getRecordIdentifier(record);
final T previousRecordState = recordLookup.lookup(recordId);
serde.serializeEdit(previousRecordState, record, bados.getDataOutputStream());
}
final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
final OutputStream out = getOutputStream();
final long transactionId;
synchronized (this) {
transactionId = currentTransactionId++;
transactionCount++;
transactionPreamble.clear();
transactionPreamble.putLong(transactionId);
transactionPreamble.putInt(baos.size());
out.write(TRANSACTION_FOLLOWS);
out.write(transactionPreamble.array());
baos.writeTo(out);
out.flush();
}
logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size());
} catch (final Throwable t) {
poison(t);
throw t;
} finally {
streamPool.returnObject(bados);
}
}
private void checkState() throws IOException {
if (poisoned) {
throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. "
+ "If the repository is able to checkpoint, then this problem will resolve itself. However, if the repository is unable to be checkpointed "
+ "(for example, due to being out of storage space or having too many open files), then this issue may require manual intervention.");
}
if (closed) {
throw new IOException("Cannot update journal file " + journalFile + " because this journal has already been closed");
}
}
private void poison(final Throwable t) {
this.poisoned = true;
try {
if (fileOut != null) {
fileOut.close();
}
closed = true;
} catch (final IOException innerIOE) {
t.addSuppressed(innerIOE);
}
}
@Override
public synchronized void fsync() throws IOException {
checkState();
try {
if (fileOut != null) {
fileOut.getChannel().force(false);
}
} catch (final IOException ioe) {
poison(ioe);
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
try {
if (fileOut != null) {
if (!poisoned) {
fileOut.write(JOURNAL_COMPLETE);
}
fileOut.close();
}
} catch (final IOException ioe) {
poison(ioe);
}
}
@Override
public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<String> swapLocations) throws IOException {
long maxTransactionId = -1L;
int updateCount = 0;
boolean eofException = false;
logger.info("Recovering records from journal {}", journalFile);
final double journalLength = journalFile.length();
try (final InputStream fis = new FileInputStream(journalFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final ByteCountingInputStream byteCountingIn = new ByteCountingInputStream(bufferedIn);
final DataInputStream in = new DataInputStream(byteCountingIn)) {
try {
// Validate that the header is what we expect and obtain the appropriate SerDe and Version information
final SerDeAndVersion serdeAndVersion = validateHeader(in);
final SerDe<T> serde = serdeAndVersion.getSerDe();
// Ensure that we get a valid transaction indicator
int transactionIndicator = in.read();
if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of "
+ transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted.");
}
long consumedAtLog = 0L;
// We don't want to apply the updates in a transaction until we've finished recovering the entire
// transaction. Otherwise, we could apply say 8 out of 10 updates and then hit an EOF. In such a case,
// we want to rollback the entire transaction. We handle this by not updating recordMap or swapLocations
// variables directly but instead keeping track of the things that occurred and then once we've read the
// entire transaction, we can apply those updates to the recordMap and swapLocations.
final Map<Object, T> transactionRecordMap = new HashMap<>();
final Set<Object> idsRemoved = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();
final Set<String> swapLocationsAdded = new HashSet<>();
int transactionUpdates = 0;
// While we have a transaction to recover, recover it
while (transactionIndicator == TRANSACTION_FOLLOWS) {
transactionRecordMap.clear();
idsRemoved.clear();
swapLocationsRemoved.clear();
swapLocationsAdded.clear();
transactionUpdates = 0;
// Format is <Transaction ID: 8 bytes> <Transaction Length: 4 bytes> <Transaction data: # of bytes indicated by Transaction Length Field>
final long transactionId = in.readLong();
maxTransactionId = Math.max(maxTransactionId, transactionId);
final int transactionLength = in.readInt();
// Use SerDe to deserialize the update. We use a LimitingInputStream to ensure that the SerDe is not able to read past its intended
// length, in case there is a bug in the SerDe. We then use a ByteCountingInputStream so that we can ensure that all of the data has
// been read and throw EOFException otherwise.
final InputStream transactionLimitingIn = new LimitingInputStream(in, transactionLength);
final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn);
final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn);
while (transactionByteCountingIn.getBytesConsumed() < transactionLength) {
final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion());
// Update our RecordMap so that we have the most up-to-date version of the Record.
final Object recordId = serde.getRecordIdentifier(record);
final UpdateType updateType = serde.getUpdateType(record);
switch (updateType) {
case DELETE: {
idsRemoved.add(recordId);
transactionRecordMap.remove(recordId);
break;
}
case SWAP_IN: {
final String location = serde.getLocation(record);
if (location == null) {
logger.error("Recovered SWAP_IN record from edit log, but it did not contain a Location; skipping record");
} else {
swapLocationsRemoved.add(location);
swapLocationsAdded.remove(location);
transactionRecordMap.put(recordId, record);
}
break;
}
case SWAP_OUT: {
final String location = serde.getLocation(record);
if (location == null) {
logger.error("Recovered SWAP_OUT record from edit log, but it did not contain a Location; skipping record");
} else {
swapLocationsRemoved.remove(location);
swapLocationsAdded.add(location);
idsRemoved.add(recordId);
transactionRecordMap.remove(recordId);
}
break;
}
default: {
transactionRecordMap.put(recordId, record);
idsRemoved.remove(recordId);
break;
}
}
transactionUpdates++;
}
// Apply the transaction
for (final Object id : idsRemoved) {
recordMap.remove(id);
}
recordMap.putAll(transactionRecordMap);
swapLocations.removeAll(swapLocationsRemoved);
swapLocations.addAll(swapLocationsAdded);
updateCount += transactionUpdates;
// Check if there is another transaction to read
transactionIndicator = in.read();
if (transactionIndicator != TRANSACTION_FOLLOWS && transactionIndicator != JOURNAL_COMPLETE && transactionIndicator != -1) {
throw new IOException("After reading " + byteCountingIn.getBytesConsumed() + " bytes from " + journalFile + ", encountered unexpected value of "
+ transactionIndicator + " for the Transaction Indicator. This journal may have been corrupted.");
}
// If we have a very large journal (for instance, if checkpoint is not called for a long time, or if there is a problem rolling over
// the journal), then we want to occasionally notify the user that we are, in fact, making progress, so that it doesn't appear that
// NiFi has become "stuck".
final long consumed = byteCountingIn.getBytesConsumed();
if (consumed - consumedAtLog > 50_000_000) {
final double percentage = consumed / journalLength * 100D;
final String pct = new DecimalFormat("#.00").format(percentage);
logger.info("{}% of the way finished recovering journal {}, having recovered {} updates", pct, journalFile, updateCount);
consumedAtLog = consumed;
}
}
} catch (final EOFException eof) {
eofException = true;
logger.warn("Encountered unexpected End-of-File when reading journal file {}; assuming that NiFi was shutdown unexpectedly and continuing recovery", journalFile);
} catch (final Exception e) {
// If the stream consists solely of NUL bytes, then we want to treat it
// the same as an EOF because we see this happen when we suddenly lose power
// while writing to a file. However, if that is not the case, then something else has gone wrong.
// In such a case, there is not much that we can do but to re-throw the Exception.
if (remainingBytesAllNul(in)) {
logger.warn("Failed to recover some of the data from Write-Ahead Log Journal because encountered trailing NUL bytes. "
+ "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes."
+ "The following Exception was encountered while recovering the updates to the journal:", e);
} else {
throw e;
}
}
}
logger.info("Successfully recovered {} updates from journal {}", updateCount, journalFile);
return new StandardJournalRecovery(updateCount, maxTransactionId, eofException);
}
/**
* In the case of a sudden power loss, it is common - at least in a Linux journaling File System -
* that the partition file that is being written to will have many trailing "NUL bytes" (0's).
* If this happens, then on restart we want to treat this as an incomplete transaction, so we detect
* this case explicitly.
*
* @param in the input stream to scan
* @return <code>true</code> if the InputStream contains no data or contains only NUL bytes
* @throws IOException if unable to read from the given InputStream
*/
private boolean remainingBytesAllNul(final InputStream in) throws IOException {
int nextByte;
while ((nextByte = in.read()) != -1) {
if (nextByte != NUL_BYTE) {
return false;
}
}
return true;
}
@Override
public synchronized JournalSummary getSummary() {
if (transactionCount < 1) {
return INACTIVE_JOURNAL_SUMMARY;
}
return new StandardJournalSummary(initialTransactionId, currentTransactionId - 1, transactionCount);
}
private class SerDeAndVersion {
private final SerDe<T> serde;
private final int version;
public SerDeAndVersion(final SerDe<T> serde, final int version) {
this.serde = serde;
this.version = version;
}
public SerDe<T> getSerDe() {
return serde;
}
public int getVersion() {
return version;
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public interface ObjectPool<T> {
T borrowObject();
void returnObject(T somethingBorrowed);
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public interface RecordLookup<T> {
/**
* Returns the Record with the given identifier, or <code>null</code> if no such record exists
*
* @param identifier the identifier of the record to lookup
* @return the Record with the given identifier, or <code>null</code> if no such record exists
*/
T lookup(Object identifier);
}

View File

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDeFactory;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;
/**
* <p>
* This implementation of WriteAheadRepository provides the ability to write all updates to the
* repository sequentially by writing to a single journal file. Serialization of data into bytes
* happens outside of any lock contention and is done so using recycled byte buffers. As such,
* we occur minimal garbage collection and the theoretical throughput of this repository is equal
* to the throughput of the underlying disk itself.
* </p>
*
* <p>
* This implementation makes the assumption that only a single thread will ever issue updates for
* a given Record at any one time. I.e., the implementation is thread-safe but cannot guarantee
* that records are recovered correctly if two threads simultaneously update the write-ahead log
* with updates for the same record.
* </p>
*/
public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T> {
private static final int PARTITION_INDEX = 0;
private static final Logger logger = LoggerFactory.getLogger(SequentialAccessWriteAheadLog.class);
private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
private static final int MAX_BUFFERS = 64;
private static final int BUFFER_SIZE = 256 * 1024;
private final File storageDirectory;
private final File journalsDirectory;
private final SerDeFactory<T> serdeFactory;
private final SyncListener syncListener;
private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
private final Lock journalReadLock = journalRWLock.readLock();
private final Lock journalWriteLock = journalRWLock.writeLock();
private final ObjectPool<ByteArrayDataOutputStream> streamPool = new BlockingQueuePool<>(MAX_BUFFERS,
() -> new ByteArrayDataOutputStream(BUFFER_SIZE),
stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
stream -> stream.getByteArrayOutputStream().reset());
private final WriteAheadSnapshot<T> snapshot;
private final RecordLookup<T> recordLookup;
private SnapshotRecovery<T> snapshotRecovery;
private volatile boolean recovered = false;
private WriteAheadJournal<T> journal;
private volatile long nextTransactionId = 0L;
public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory) throws IOException {
this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER);
}
public SequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
throw new IOException("Directory " + storageDirectory + " does not exist and cannot be created");
}
if (!storageDirectory.isDirectory()) {
throw new IOException("File " + storageDirectory + " is a regular file and not a directory");
}
final HashMapSnapshot<T> hashMapSnapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory);
this.snapshot = hashMapSnapshot;
this.recordLookup = hashMapSnapshot;
this.storageDirectory = storageDirectory;
this.journalsDirectory = new File(storageDirectory, "journals");
if (!journalsDirectory.exists() && !journalsDirectory.mkdirs()) {
throw new IOException("Directory " + journalsDirectory + " does not exist and cannot be created");
}
recovered = false;
this.serdeFactory = serdeFactory;
this.syncListener = (syncListener == null) ? SyncListener.NOP_SYNC_LISTENER : syncListener;
}
@Override
public int update(final Collection<T> records, final boolean forceSync) throws IOException {
if (!recovered) {
throw new IllegalStateException("Cannot update repository until record recovery has been performed");
}
journalReadLock.lock();
try {
journal.update(records, recordLookup);
if (forceSync) {
journal.fsync();
syncListener.onSync(PARTITION_INDEX);
}
snapshot.update(records);
} finally {
journalReadLock.unlock();
}
return PARTITION_INDEX;
}
@Override
public synchronized Collection<T> recoverRecords() throws IOException {
if (recovered) {
throw new IllegalStateException("Cannot recover records from repository because record recovery has already commenced");
}
logger.info("Recovering records from Write-Ahead Log at {}", storageDirectory);
final long recoverStart = System.nanoTime();
recovered = true;
snapshotRecovery = snapshot.recover();
final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
final Map<Object, T> recoveredRecords = snapshotRecovery.getRecords();
final Set<String> swapLocations = snapshotRecovery.getRecoveredSwapLocations();
final File[] journalFiles = journalsDirectory.listFiles(this::isJournalFile);
if (journalFiles == null) {
throw new IOException("Cannot access the list of files in directory " + journalsDirectory + "; please ensure that appropriate file permissions are set.");
}
if (snapshotRecovery.getRecoveryFile() == null) {
logger.info("No Snapshot File to recover from at {}. Now recovering records from {} journal files", storageDirectory, journalFiles.length);
} else {
logger.info("Successfully recovered {} records and {} swap files from Snapshot at {} with Max Transaction ID of {} in {} milliseconds. Now recovering records from {} journal files",
recoveredRecords.size(), swapLocations.size(), snapshotRecovery.getRecoveryFile(), snapshotRecovery.getMaxTransactionId(),
snapshotRecoveryMillis, journalFiles.length);
}
final List<File> orderedJournalFiles = Arrays.asList(journalFiles);
Collections.sort(orderedJournalFiles, new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
final long transactionId1 = getMinTransactionId(o1);
final long transactionId2 = getMinTransactionId(o2);
return Long.compare(transactionId1, transactionId2);
}
});
final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId();
int totalUpdates = 0;
int journalFilesRecovered = 0;
int journalFilesSkipped = 0;
long maxTransactionId = snapshotTransactionId;
for (final File journalFile : orderedJournalFiles) {
final long journalMinTransactionId = getMinTransactionId(journalFile);
if (journalMinTransactionId < snapshotTransactionId) {
logger.debug("Will not recover records from journal file {} because the minimum Transaction ID for that journal is {} and the Transaction ID recovered from Snapshot was {}",
journalFile, journalMinTransactionId, snapshotTransactionId);
journalFilesSkipped++;
continue;
}
logger.debug("Min Transaction ID for journal {} is {}, so will recover records from journal", journalFile, journalMinTransactionId);
journalFilesRecovered++;
try (final WriteAheadJournal<T> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final JournalRecovery journalRecovery = journal.recoverRecords(recoveredRecords, swapLocations);
final int updates = journalRecovery.getUpdateCount();
logger.debug("Recovered {} updates from journal {}", updates, journalFile);
totalUpdates += updates;
maxTransactionId = Math.max(maxTransactionId, journalRecovery.getMaxTransactionId());
}
}
logger.debug("Recovered {} updates from {} journal files and skipped {} journal files because their data was already encapsulated in the snapshot",
totalUpdates, journalFilesRecovered, journalFilesSkipped);
this.nextTransactionId = maxTransactionId + 1;
final long recoverNanos = System.nanoTime() - recoverStart;
final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis);
checkpoint();
return recoveredRecords.values();
}
private long getMinTransactionId(final File journalFile) {
final String filename = journalFile.getName();
final String numeral = filename.substring(0, filename.indexOf("."));
return Long.parseLong(numeral);
}
private boolean isJournalFile(final File file) {
if (!file.isFile()) {
return false;
}
final String filename = file.getName();
return JOURNAL_FILENAME_PATTERN.matcher(filename).matches();
}
@Override
public synchronized Set<String> getRecoveredSwapLocations() throws IOException {
if (!recovered) {
throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed");
}
return snapshotRecovery.getRecoveredSwapLocations();
}
@Override
public int checkpoint() throws IOException {
final SnapshotCapture<T> snapshotCapture;
final long startNanos = System.nanoTime();
final File[] existingJournals;
journalWriteLock.lock();
try {
if (journal != null) {
final JournalSummary journalSummary = journal.getSummary();
if (journalSummary.getTransactionCount() == 0) {
logger.debug("Will not checkpoint Write-Ahead Log because no updates have occurred since last checkpoint");
return snapshot.getRecordCount();
}
journal.fsync();
journal.close();
nextTransactionId = Math.max(nextTransactionId, journalSummary.getLastTransactionId() + 1);
}
syncListener.onGlobalSync();
final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile);
existingJournals = (existingFiles == null) ? new File[0] : existingFiles;
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
// Create a new journal. We name the journal file <next transaction id>.journal but it is possible
// that we could have an empty journal file already created. If this happens, we don't want to create
// a new file on top of it because it would get deleted below when we clean up old journals. So we
// will simply increment our transaction ID and try again.
File journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal");
while (journalFile.exists()) {
nextTransactionId++;
journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal");
}
journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId);
journal.writeHeader();
logger.debug("Created new Journal starting with Transaction ID {}", nextTransactionId);
} finally {
journalWriteLock.unlock();
}
final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
snapshot.writeSnapshot(snapshotCapture);
for (final File existingJournal : existingJournals) {
logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", existingJournal.getName());
if (!existingJournal.delete() && existingJournal.exists()) {
logger.warn("Unable to delete expired journal file " + existingJournal + "; this file should be deleted manually.");
}
}
final long totalNanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos);
logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds), max Transaction ID {}",
new Object[] {snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId()});
return snapshotCapture.getRecords().size();
}
@Override
public void shutdown() throws IOException {
journalWriteLock.lock();
try {
if (journal != null) {
journal.close();
}
} finally {
journalWriteLock.unlock();
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.util.Map;
import java.util.Set;
public interface SnapshotCapture<T> {
Map<Object, T> getRecords();
long getMaxTransactionId();
Set<String> getSwapLocations();
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public interface SnapshotRecovery<T> {
long getMaxTransactionId();
Map<Object, T> getRecords();
Set<String> getRecoveredSwapLocations();
File getRecoveryFile();
public static <T> SnapshotRecovery<T> emptyRecovery() {
return new SnapshotRecovery<T>() {
@Override
public long getMaxTransactionId() {
return -1L;
}
@Override
public Map<Object, T> getRecords() {
return Collections.emptyMap();
}
@Override
public Set<String> getRecoveredSwapLocations() {
return Collections.emptySet();
}
@Override
public File getRecoveryFile() {
return null;
}
};
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public class StandardJournalRecovery implements JournalRecovery {
private final int updateCount;
private final long maxTransactionId;
private final boolean eofException;
public StandardJournalRecovery(final int updateCount, final long maxTransactionId, final boolean eofException) {
this.updateCount = updateCount;
this.maxTransactionId = maxTransactionId;
this.eofException = eofException;
}
@Override
public int getUpdateCount() {
return updateCount;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public boolean isEOFExceptionEncountered() {
return eofException;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
public class StandardJournalSummary implements JournalSummary {
private final long firstTransactionId;
private final long lastTransactionId;
private final int transactionCount;
public StandardJournalSummary(final long firstTransactionId, final long lastTransactionId, final int transactionCount) {
this.firstTransactionId = firstTransactionId;
this.lastTransactionId = lastTransactionId;
this.transactionCount = transactionCount;
}
@Override
public long getFirstTransactionId() {
return firstTransactionId;
}
@Override
public long getLastTransactionId() {
return lastTransactionId;
}
@Override
public int getTransactionCount() {
return transactionCount;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.File;
import java.util.Map;
import java.util.Set;
public class StandardSnapshotRecovery<T> implements SnapshotRecovery<T> {
private final Map<Object, T> recordMap;
private final Set<String> recoveredSwapLocations;
private final File recoveryFile;
private final long maxTransactionId;
public StandardSnapshotRecovery(final Map<Object, T> recordMap, final Set<String> recoveredSwapLocations, final File recoveryFile, final long maxTransactionId) {
this.recordMap = recordMap;
this.recoveredSwapLocations = recoveredSwapLocations;
this.recoveryFile = recoveryFile;
this.maxTransactionId = maxTransactionId;
}
@Override
public long getMaxTransactionId() {
return maxTransactionId;
}
@Override
public Map<Object, T> getRecords() {
return recordMap;
}
@Override
public Set<String> getRecoveredSwapLocations() {
return recoveredSwapLocations;
}
@Override
public File getRecoveryFile() {
return recoveryFile;
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
public interface WriteAheadJournal<T> extends Closeable {
JournalRecovery recoverRecords(Map<Object, T> recordMap, Set<String> swapLocations) throws IOException;
/**
* Updates the journal with the given set of records
*
* @param records the records to update
* @param recordLookup a lookup that can be used to access the current value of a record, given its ID
*
* @throws IOException if unable to write to the underlying storage mechanism
*/
void update(Collection<T> records, RecordLookup<T> recordLookup) throws IOException;
void writeHeader() throws IOException;
void fsync() throws IOException;
/**
* Returns information about what was written to the journal
*
* @return A JournalSummary indicating what was written to the journal
* @throws IOException if unable to write to the underlying storage mechanism.
*/
JournalSummary getSummary();
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import java.io.IOException;
import java.util.Collection;
public interface WriteAheadSnapshot<T> {
SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
SnapshotRecovery<T> recover() throws IOException;
void update(Collection<T> records);
int getRecordCount();
}

View File

@ -61,6 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -73,7 +74,12 @@ import org.slf4j.LoggerFactory;
* </p> * </p>
* *
* @param <T> type of record this WAL is for * @param <T> type of record this WAL is for
*
* @deprecated This implementation is now deprecated in favor of {@link SequentialAccessWriteAheadLog}.
* This implementation, when given more than 1 partition, can have issues recovering after a sudden loss
* of power or an operating system crash.
*/ */
@Deprecated
public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> { public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> {
private final Path basePath; private final Path basePath;
@ -105,7 +111,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
private volatile boolean recovered = false; private volatile boolean recovered = false;
public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<T>(serde), syncListener); this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<>(serde), syncListener);
} }
public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException { public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
@ -113,7 +119,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
} }
public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
this(paths, partitionCount, new SingletonSerDeFactory<T>(serde), syncListener); this(paths, partitionCount, new SingletonSerDeFactory<>(serde), syncListener);
} }
/** /**
@ -645,6 +651,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
lockChannel.close(); lockChannel.close();
final File lockFile = new File(basePath.toFile(), "wali.lock");
lockFile.delete();
} }
} }

View File

@ -59,4 +59,14 @@ public interface SyncListener {
* {@link WriteAheadRepository#sync()} method. * {@link WriteAheadRepository#sync()} method.
*/ */
void onGlobalSync(); void onGlobalSync();
public static final SyncListener NOP_SYNC_LISTENER = new SyncListener() {
@Override
public void onSync(int partitionIndex) {
}
@Override
public void onGlobalSync() {
}
};
} }

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.junit.Test;
public class TestBlockingQueuePool {
private static final Consumer<AtomicBoolean> DO_NOTHING = ab -> {};
@Test
public void testReuse() {
final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
final AtomicBoolean firstObject = pool.borrowObject();
firstObject.set(true);
pool.returnObject(firstObject);
for (int i = 0; i < 100; i++) {
final AtomicBoolean value = pool.borrowObject();
assertSame(firstObject, value);
pool.returnObject(value);
}
}
@Test
public void testCreateOnExhaustion() {
final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
final AtomicBoolean firstObject = pool.borrowObject();
final AtomicBoolean secondObject = pool.borrowObject();
assertNotSame(firstObject, secondObject);
}
@Test
public void testCreateMoreThanMaxCapacity() {
final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
for (int i = 0; i < 50; i++) {
final AtomicBoolean value = pool.borrowObject();
assertNotNull(value);
}
}
@Test
public void testDoesNotBufferMoreThanCapacity() {
final BlockingQueuePool<AtomicBoolean> pool = new BlockingQueuePool<>(10, AtomicBoolean::new, AtomicBoolean::get, DO_NOTHING);
final AtomicBoolean[] seen = new AtomicBoolean[50];
for (int i = 0; i < 50; i++) {
final AtomicBoolean value = pool.borrowObject();
assertNotNull(value);
value.set(true);
seen[i] = value;
}
for (final AtomicBoolean value : seen) {
pool.returnObject(value);
}
for (int i = 0; i < 10; i++) {
final AtomicBoolean value = pool.borrowObject();
// verify that the object exists in the 'seen' array
boolean found = false;
for (final AtomicBoolean seenBoolean : seen) {
if (value == seenBoolean) {
found = true;
break;
}
}
assertTrue(found);
}
for (int i = 0; i < 40; i++) {
final AtomicBoolean value = pool.borrowObject();
// verify that the object does not exist in the 'seen' array
boolean found = false;
for (final AtomicBoolean seenBoolean : seen) {
if (value == seenBoolean) {
found = true;
break;
}
}
assertFalse(found);
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
public class TestHashMapSnapshot {
private final File storageDirectory = new File("target/test-hashmap-snapshot");
private DummyRecordSerde serde;
private SerDeFactory<DummyRecord> serdeFactory;
@Before
public void setup() throws IOException {
if (!storageDirectory.exists()) {
Files.createDirectories(storageDirectory.toPath());
}
final File[] childFiles = storageDirectory.listFiles();
for (final File childFile : childFiles) {
if (childFile.isFile()) {
Files.delete(childFile.toPath());
}
}
serde = new DummyRecordSerde();
serdeFactory = new SingletonSerDeFactory<>(serde);
}
@Test
public void testSuccessfulRoundTrip() throws IOException {
final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory);
final Map<String, String> props = new HashMap<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
props.put("key", String.valueOf(i));
record.setProperties(props);
snapshot.update(Collections.singleton(record));
}
for (int i = 2; i < 10; i += 2) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.DELETE);
snapshot.update(Collections.singleton(record));
}
for (int i = 1; i < 10; i += 2) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.SWAP_OUT);
record.setSwapLocation("swapFile-" + i);
snapshot.update(Collections.singleton(record));
}
final DummyRecord swapIn7 = new DummyRecord("7", UpdateType.SWAP_IN);
swapIn7.setSwapLocation("swapFile-7");
snapshot.update(Collections.singleton(swapIn7));
final Set<String> swappedOutLocations = new HashSet<>();
swappedOutLocations.add("swapFile-1");
swappedOutLocations.add("swapFile-3");
swappedOutLocations.add("swapFile-5");
swappedOutLocations.add("swapFile-9");
final SnapshotCapture<DummyRecord> capture = snapshot.prepareSnapshot(180L);
assertEquals(180L, capture.getMaxTransactionId());
assertEquals(swappedOutLocations, capture.getSwapLocations());
final Map<Object, DummyRecord> records = capture.getRecords();
assertEquals(2, records.size());
assertTrue(records.containsKey("0"));
assertTrue(records.containsKey("7"));
snapshot.writeSnapshot(capture);
final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
assertEquals(180L, recovery.getMaxTransactionId());
assertEquals(swappedOutLocations, recovery.getRecoveredSwapLocations());
final Map<Object, DummyRecord> recoveredRecords = recovery.getRecords();
assertEquals(records, recoveredRecords);
}
@Test
public void testOOMEWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException {
final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory);
final Map<String, String> props = new HashMap<>();
for (int i = 0; i < 11; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
props.put("key", String.valueOf(i));
record.setProperties(props);
snapshot.update(Collections.singleton(record));
}
final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT);
swapOutRecord.setSwapLocation("SwapLocation-1");
snapshot.update(Collections.singleton(swapOutRecord));
snapshot.writeSnapshot(snapshot.prepareSnapshot(25L));
serde.setThrowOOMEAfterNSerializeEdits(3);
try {
snapshot.writeSnapshot(snapshot.prepareSnapshot(150L));
Assert.fail("Expected OOME");
} catch (final OutOfMemoryError oome) {
// expected
}
final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
assertEquals(25L, recovery.getMaxTransactionId());
final Map<Object, DummyRecord> recordMap = recovery.getRecords();
assertEquals(10, recordMap.size());
for (int i = 0; i < 10; i++) {
assertTrue(recordMap.containsKey(String.valueOf(i)));
}
for (final Map.Entry<Object, DummyRecord> entry : recordMap.entrySet()) {
final DummyRecord record = entry.getValue();
final Map<String, String> properties = record.getProperties();
assertNotNull(properties);
assertEquals(1, properties.size());
assertEquals(entry.getKey(), properties.get("key"));
}
final Set<String> swapLocations = recovery.getRecoveredSwapLocations();
assertEquals(1, swapLocations.size());
assertTrue(swapLocations.contains("SwapLocation-1"));
}
@Test
public void testIOExceptionWhenWritingResultsInPreviousSnapshotStillRecoverable() throws IOException {
final HashMapSnapshot<DummyRecord> snapshot = new HashMapSnapshot<>(storageDirectory, serdeFactory);
final Map<String, String> props = new HashMap<>();
for (int i = 0; i < 11; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
props.put("key", String.valueOf(i));
record.setProperties(props);
snapshot.update(Collections.singleton(record));
}
final DummyRecord swapOutRecord = new DummyRecord("10", UpdateType.SWAP_OUT);
swapOutRecord.setSwapLocation("SwapLocation-1");
snapshot.update(Collections.singleton(swapOutRecord));
snapshot.writeSnapshot(snapshot.prepareSnapshot(25L));
serde.setThrowIOEAfterNSerializeEdits(3);
for (int i = 0; i < 5; i++) {
try {
snapshot.writeSnapshot(snapshot.prepareSnapshot(150L));
Assert.fail("Expected IOE");
} catch (final IOException ioe) {
// expected
}
}
final SnapshotRecovery<DummyRecord> recovery = snapshot.recover();
assertEquals(25L, recovery.getMaxTransactionId());
final Map<Object, DummyRecord> recordMap = recovery.getRecords();
assertEquals(10, recordMap.size());
for (int i = 0; i < 10; i++) {
assertTrue(recordMap.containsKey(String.valueOf(i)));
}
for (final Map.Entry<Object, DummyRecord> entry : recordMap.entrySet()) {
final DummyRecord record = entry.getValue();
final Map<String, String> properties = record.getProperties();
assertNotNull(properties);
assertEquals(1, properties.size());
assertEquals(entry.getKey(), properties.get("key"));
}
final Set<String> swapLocations = recovery.getRecoveredSwapLocations();
assertEquals(1, swapLocations.size());
assertTrue(swapLocations.contains("SwapLocation-1"));
}
}

View File

@ -0,0 +1,353 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
public class TestLengthDelimitedJournal {
private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal");
private SerDeFactory<DummyRecord> serdeFactory;
private DummyRecordSerde serde;
private ObjectPool<ByteArrayDataOutputStream> streamPool;
private static final int BUFFER_SIZE = 4096;
@Before
public void setupJournal() throws IOException {
Files.deleteIfExists(journalFile.toPath());
if (!journalFile.getParentFile().exists()) {
Files.createDirectories(journalFile.getParentFile().toPath());
}
serde = new DummyRecordSerde();
serdeFactory = new SingletonSerDeFactory<>(serde);
streamPool = new BlockingQueuePool<>(1,
() -> new ByteArrayDataOutputStream(BUFFER_SIZE),
stream -> stream.getByteArrayOutputStream().size() < BUFFER_SIZE,
stream -> stream.getByteArrayOutputStream().reset());
}
@Test
public void testHandlingOfTrailingNulBytes() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
journal.update(firstTransaction, id -> null);
journal.update(secondTransaction, id -> null);
journal.update(thirdTransaction, id -> null);
}
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] contents = Files.readAllBytes(journalFile.toPath());
final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8);
final byte[] withNuls = new byte[truncated.length + 28];
System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
try (final OutputStream fos = new FileOutputStream(journalFile)) {
fos.write(withNuls);
}
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
journal.recoverRecords(recordMap, swapLocations);
assertFalse(recordMap.isEmpty());
assertEquals(3, recordMap.size());
final DummyRecord record1 = recordMap.get("1");
assertNotNull(record1);
assertEquals(Collections.singletonMap("abc", "123"), record1.getProperties());
final DummyRecord record2 = recordMap.get("2");
assertNotNull(record2);
assertEquals(Collections.singletonMap("cba", "123"), record2.getProperties());
final DummyRecord record3 = recordMap.get("3");
assertNotNull(record3);
assertEquals(Collections.singletonMap("aaa", "123"), record3.getProperties());
}
}
@Test
public void testUpdateOnlyAppliedIfEntireTransactionApplied() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
for (int i = 0; i < 3; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
journal.update(Collections.singleton(record), key -> null);
}
final DummyRecord swapOut1Record = new DummyRecord("1", UpdateType.SWAP_OUT);
swapOut1Record.setSwapLocation("swap12");
journal.update(Collections.singleton(swapOut1Record), id -> null);
final DummyRecord swapOut2Record = new DummyRecord("2", UpdateType.SWAP_OUT);
swapOut2Record.setSwapLocation("swap12");
journal.update(Collections.singleton(swapOut2Record), id -> null);
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord("1" + i, UpdateType.CREATE);
records.add(record);
}
final DummyRecord swapIn1Record = new DummyRecord("1", UpdateType.SWAP_IN);
swapIn1Record.setSwapLocation("swap12");
records.add(swapIn1Record);
final DummyRecord swapOut1AgainRecord = new DummyRecord("1", UpdateType.SWAP_OUT);
swapOut1AgainRecord.setSwapLocation("swap12");
records.add(swapOut1AgainRecord);
final DummyRecord swapIn2Record = new DummyRecord("2", UpdateType.SWAP_IN);
swapIn2Record.setSwapLocation("swap12");
records.add(swapIn2Record);
final DummyRecord swapOut0Record = new DummyRecord("0", UpdateType.SWAP_OUT);
swapOut0Record.setSwapLocation("swap0");
records.add(swapOut0Record);
journal.update(records, id -> null);
}
// Truncate the last 8 bytes so that we will get an EOFException when reading the last transaction.
try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) {
fos.getChannel().truncate(journalFile.length() - 8);
}
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertEquals(5L, recovery.getMaxTransactionId());
assertEquals(5, recovery.getUpdateCount());
final Set<String> expectedSwap = Collections.singleton("swap12");
assertEquals(expectedSwap, swapLocations);
final Map<Object, DummyRecord> expectedRecordMap = new HashMap<>();
expectedRecordMap.put("0", new DummyRecord("0", UpdateType.CREATE));
assertEquals(expectedRecordMap, recordMap);
}
}
@Test
public void testPoisonedJournalNotWritableAfterIOE() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
serde.setThrowIOEAfterNSerializeEdits(2);
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
final RecordLookup<DummyRecord> lookup = key -> secondRecord;
try {
journal.update(Collections.singleton(thirdRecord), lookup);
Assert.fail("Expected IOException");
} catch (final IOException ioe) {
// expected
}
serde.setThrowIOEAfterNSerializeEdits(-1);
final Collection<DummyRecord> records = Collections.singleton(thirdRecord);
for (int i = 0; i < 10; i++) {
try {
journal.update(records, lookup);
Assert.fail("Expected IOException");
} catch (final IOException expected) {
}
try {
journal.fsync();
Assert.fail("Expected IOException");
} catch (final IOException expected) {
}
}
}
}
@Test
public void testPoisonedJournalNotWritableAfterOOME() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
serde.setThrowOOMEAfterNSerializeEdits(2);
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
final RecordLookup<DummyRecord> lookup = key -> secondRecord;
try {
journal.update(Collections.singleton(thirdRecord), lookup);
Assert.fail("Expected OOME");
} catch (final OutOfMemoryError oome) {
// expected
}
serde.setThrowOOMEAfterNSerializeEdits(-1);
final Collection<DummyRecord> records = Collections.singleton(thirdRecord);
for (int i = 0; i < 10; i++) {
try {
journal.update(records, lookup);
Assert.fail("Expected IOException");
} catch (final IOException expected) {
}
try {
journal.fsync();
Assert.fail("Expected IOException");
} catch (final IOException expected) {
}
}
}
}
@Test
public void testSuccessfulRoundTrip() throws IOException {
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
final DummyRecord firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
final DummyRecord secondRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(thirdRecord), key -> secondRecord);
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertFalse(recovery.isEOFExceptionEncountered());
assertEquals(2L, recovery.getMaxTransactionId());
assertEquals(3, recovery.getUpdateCount());
assertTrue(swapLocations.isEmpty());
assertEquals(1, recordMap.size());
final DummyRecord retrieved = recordMap.get("1");
assertNotNull(retrieved);
assertEquals(thirdRecord, retrieved);
}
}
@Test
public void testTruncatedJournalFile() throws IOException {
final DummyRecord firstRecord, secondRecord;
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
journal.writeHeader();
firstRecord = new DummyRecord("1", UpdateType.CREATE);
journal.update(Collections.singleton(firstRecord), key -> null);
secondRecord = new DummyRecord("2", UpdateType.CREATE);
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
final DummyRecord thirdRecord = new DummyRecord("1", UpdateType.UPDATE);
journal.update(Collections.singleton(thirdRecord), key -> secondRecord);
}
// Truncate the file
try (final FileOutputStream fos = new FileOutputStream(journalFile, true)) {
fos.getChannel().truncate(journalFile.length() - 8);
}
// Ensure that we are able to recover the first two records without an issue but the third is lost.
try (final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
final JournalRecovery recovery = journal.recoverRecords(recordMap, swapLocations);
assertTrue(recovery.isEOFExceptionEncountered());
assertEquals(2L, recovery.getMaxTransactionId()); // transaction ID is still 2 because that's what was written to the journal
assertEquals(2, recovery.getUpdateCount()); // only 2 updates because the last update will incur an EOFException and be skipped
assertTrue(swapLocations.isEmpty());
assertEquals(2, recordMap.size());
final DummyRecord retrieved1 = recordMap.get("1");
assertNotNull(retrieved1);
assertEquals(firstRecord, retrieved1);
final DummyRecord retrieved2 = recordMap.get("2");
assertNotNull(retrieved2);
assertEquals(secondRecord, retrieved2);
}
}
}

View File

@ -0,0 +1,345 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.wali;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.wali.DummyRecord;
import org.wali.DummyRecordSerde;
import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
public class TestSequentialAccessWriteAheadLog {
@Rule
public TestName testName = new TestName();
@Test
public void testRecoverWithNoCheckpoint() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testRecoverWithNoJournalUpdates() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
repo.checkpoint();
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(new HashSet<>(records), new HashSet<>(recovered));
}
@Test
public void testRecoverWithMultipleCheckpointsBetweenJournalUpdate() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final List<DummyRecord> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
records.add(record);
}
repo.update(records, false);
for (int i = 0; i < 8; i++) {
repo.checkpoint();
}
final DummyRecord updateRecord = new DummyRecord("4", UpdateType.UPDATE);
updateRecord.setProperties(Collections.singletonMap("updated", "true"));
repo.update(Collections.singleton(updateRecord), false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
// what we expect is the same as what we updated with, except we don't want the DummyRecord for CREATE 4
// because we will instead recover an UPDATE only for 4.
final Set<DummyRecord> expected = new HashSet<>(records);
expected.remove(new DummyRecord("4", UpdateType.CREATE));
expected.add(updateRecord);
// ensure that we get the same records back, but the order may be different, so wrap both collections
// in a HashSet so that we can compare unordered collections of the same type.
assertEquals(expected, new HashSet<>(recovered));
}
private SequentialAccessWriteAheadLog<DummyRecord> createRecoveryRepo() throws IOException {
final File targetDir = new File("target");
final File storageDir = new File(targetDir, testName.getMethodName());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
return repo;
}
private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() throws IOException {
final File targetDir = new File("target");
final File storageDir = new File(targetDir, testName.getMethodName());
deleteRecursively(storageDir);
assertTrue(storageDir.mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
final Collection<DummyRecord> recovered = repo.recoverRecords();
assertNotNull(recovered);
assertTrue(recovered.isEmpty());
return repo;
}
/**
* This test is designed to update the repository in several different wants, testing CREATE, UPDATE, SWAP IN, SWAP OUT, and DELETE
* update types, as well as testing updates with single records and with multiple records in a transaction. It also verifies that we
* are able to checkpoint, then update journals, and then recover updates to both the checkpoint and the journals.
*/
@Test
public void testUpdateThenRecover() throws IOException {
final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
final DummyRecord firstCreate = new DummyRecord("0", UpdateType.CREATE);
repo.update(Collections.singleton(firstCreate), false);
final List<DummyRecord> creations = new ArrayList<>();
for (int i = 1; i < 11; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
creations.add(record);
}
repo.update(creations, false);
final DummyRecord deleteRecord3 = new DummyRecord("3", UpdateType.DELETE);
repo.update(Collections.singleton(deleteRecord3), false);
final DummyRecord swapOutRecord4 = new DummyRecord("4", UpdateType.SWAP_OUT);
swapOutRecord4.setSwapLocation("swap");
final DummyRecord swapOutRecord5 = new DummyRecord("5", UpdateType.SWAP_OUT);
swapOutRecord5.setSwapLocation("swap");
final List<DummyRecord> swapOuts = new ArrayList<>();
swapOuts.add(swapOutRecord4);
swapOuts.add(swapOutRecord5);
repo.update(swapOuts, false);
final DummyRecord swapInRecord5 = new DummyRecord("5", UpdateType.SWAP_IN);
swapInRecord5.setSwapLocation("swap");
repo.update(Collections.singleton(swapInRecord5), false);
final int recordCount = repo.checkpoint();
assertEquals(9, recordCount);
final DummyRecord updateRecord6 = new DummyRecord("6", UpdateType.UPDATE);
updateRecord6.setProperties(Collections.singletonMap("greeting", "hello"));
repo.update(Collections.singleton(updateRecord6), false);
final List<DummyRecord> updateRecords = new ArrayList<>();
for (int i = 7; i < 11; i++) {
final DummyRecord updateRecord = new DummyRecord(String.valueOf(i), UpdateType.UPDATE);
updateRecord.setProperties(Collections.singletonMap("greeting", "hi"));
updateRecords.add(updateRecord);
}
final DummyRecord deleteRecord2 = new DummyRecord("2", UpdateType.DELETE);
updateRecords.add(deleteRecord2);
repo.update(updateRecords, false);
repo.shutdown();
final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
final Collection<DummyRecord> recoveredRecords = recoveryRepo.recoverRecords();
// We should now have records:
// 0-10 CREATED
// 2 & 3 deleted
// 4 & 5 swapped out
// 5 swapped back in
// 6 updated with greeting = hello
// 7-10 updated with greeting = hi
assertEquals(8, recoveredRecords.size());
final Map<String, DummyRecord> recordMap = recoveredRecords.stream()
.collect(Collectors.toMap(record -> record.getId(), Function.identity()));
assertFalse(recordMap.containsKey("2"));
assertFalse(recordMap.containsKey("3"));
assertFalse(recordMap.containsKey("4"));
assertTrue(recordMap.get("1").getProperties().isEmpty());
assertTrue(recordMap.get("5").getProperties().isEmpty());
assertEquals("hello", recordMap.get("6").getProperties().get("greeting"));
for (int i = 7; i < 11; i++) {
assertEquals("hi", recordMap.get(String.valueOf(i)).getProperties().get("greeting"));
}
recoveryRepo.shutdown();
}
@Test
@Ignore("For manual performance testing")
public void testUpdatePerformance() throws IOException, InterruptedException {
final Path path = Paths.get("target/sequential-access-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
final WriteAheadRepository<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final long updateCountPerThread = 1_000_000;
final int numThreads = 4;
final Thread[] threads = new Thread[numThreads];
final int batchSize = 1;
long previousBytes = 0L;
for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) {
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
final List<DummyRecord> batch = new ArrayList<>();
for (int i = 0; i < updateCountPerThread / batchSize; i++) {
batch.clear();
for (int j = 0; j < batchSize; j++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
batch.add(record);
}
try {
repo.update(batch, false);
} catch (Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
}
}
}
});
threads[i] = t;
}
final long start = System.nanoTime();
for (final Thread t : threads) {
t.start();
}
for (final Thread t : threads) {
t.join();
}
long bytes = 0L;
for (final File journalFile : path.resolve("journals").toFile().listFiles()) {
bytes += journalFile.length();
}
bytes -= previousBytes;
previousBytes = bytes;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis;
final String eps = NumberFormat.getInstance().format(eventsPerSecond);
final long bytesPerSecond = bytes * 1000 / millis;
final String bps = NumberFormat.getInstance().format(bytesPerSecond);
if (j == 0) {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads
+ " threads, *as a warmup!* " + eps + " events per second, " + bps + " bytes per second");
} else {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads
+ " threads, " + eps + " events per second, " + bps + " bytes per second");
}
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
deleteRecursively(child);
}
}
file.delete();
}
}

View File

@ -19,12 +19,14 @@ package org.wali;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class DummyRecord { public class DummyRecord {
private final String id; private final String id;
private final Map<String, String> props; private final Map<String, String> props;
private final UpdateType updateType; private final UpdateType updateType;
private String swapLocation;
public DummyRecord(final String id, final UpdateType updateType) { public DummyRecord(final String id, final UpdateType updateType) {
this.id = id; this.id = id;
@ -59,8 +61,37 @@ public class DummyRecord {
return props.get(name); return props.get(name);
} }
public String getSwapLocation() {
return swapLocation;
}
public void setSwapLocation(String swapLocation) {
this.swapLocation = swapLocation;
}
@Override @Override
public String toString() { public String toString() {
return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]"; return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]";
} }
@Override
public int hashCode() {
return Objects.hash(this.id, this.props, this.updateType, this.swapLocation);
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof DummyRecord)) {
return false;
}
final DummyRecord other = (DummyRecord) obj;
return Objects.equals(id, other.id) && Objects.equals(props, other.props) && Objects.equals(updateType, other.updateType) && Objects.equals(swapLocation, other.swapLocation);
}
} }

View File

@ -27,6 +27,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
private int throwOOMEAfterNserializeEdits = -1; private int throwOOMEAfterNserializeEdits = -1;
private int serializeEditCount = 0; private int serializeEditCount = 0;
@SuppressWarnings("fallthrough")
@Override @Override
public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) {
@ -39,14 +40,28 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
out.writeUTF(record.getUpdateType().name()); out.writeUTF(record.getUpdateType().name());
out.writeUTF(record.getId()); out.writeUTF(record.getId());
if (record.getUpdateType() != UpdateType.DELETE) { switch (record.getUpdateType()) {
final Map<String, String> props = record.getProperties(); case DELETE:
out.writeInt(props.size()); break;
for (final Map.Entry<String, String> entry : props.entrySet()) { case SWAP_IN: {
out.writeUTF(entry.getKey()); out.writeUTF(record.getSwapLocation());
out.writeUTF(entry.getValue()); // intentionally fall through to CREATE/UPDATE block
} }
case CREATE:
case UPDATE: {
final Map<String, String> props = record.getProperties();
out.writeInt(props.size());
for (final Map.Entry<String, String> entry : props.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
}
break;
case SWAP_OUT:
out.writeUTF(record.getSwapLocation());
break;
} }
} }
@Override @Override
@ -55,20 +70,36 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
} }
@Override @Override
@SuppressWarnings("fallthrough")
public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
final String updateTypeName = in.readUTF(); final String updateTypeName = in.readUTF();
final UpdateType updateType = UpdateType.valueOf(updateTypeName); final UpdateType updateType = UpdateType.valueOf(updateTypeName);
final String id = in.readUTF(); final String id = in.readUTF();
final DummyRecord record = new DummyRecord(id, updateType); final DummyRecord record = new DummyRecord(id, updateType);
if (record.getUpdateType() != UpdateType.DELETE) { switch (record.getUpdateType()) {
final int numProps = in.readInt(); case DELETE:
for (int i = 0; i < numProps; i++) { break;
final String key = in.readUTF(); case SWAP_IN: {
final String value = in.readUTF(); final String swapLocation = in.readUTF();
record.setProperty(key, value); record.setSwapLocation(swapLocation);
// intentionally fall through to the CREATE/UPDATE block
} }
case CREATE:
case UPDATE:
final int numProps = in.readInt();
for (int i = 0; i < numProps; i++) {
final String key = in.readUTF();
final String value = in.readUTF();
record.setProperty(key, value);
}
break;
case SWAP_OUT:
final String swapLocation = in.readUTF();
record.setSwapLocation(swapLocation);
break;
} }
return record; return record;
} }
@ -102,6 +133,6 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
@Override @Override
public String getLocation(final DummyRecord record) { public String getLocation(final DummyRecord record) {
return null; return record.getSwapLocation();
} }
} }

View File

@ -34,6 +34,7 @@ import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.text.NumberFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -139,9 +140,9 @@ public class TestMinimalLockingWriteAheadLog {
} }
@Test @Test
@Ignore("for local testing only") @Ignore("For manual performance testing")
public void testUpdatePerformance() throws IOException, InterruptedException { public void testUpdatePerformance() throws IOException, InterruptedException {
final int numPartitions = 4; final int numPartitions = 16;
final Path path = Paths.get("target/minimal-locking-repo"); final Path path = Paths.get("target/minimal-locking-repo");
deleteRecursively(path.toFile()); deleteRecursively(path.toFile());
@ -152,23 +153,34 @@ public class TestMinimalLockingWriteAheadLog {
final Collection<DummyRecord> initialRecs = repo.recoverRecords(); final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty()); assertTrue(initialRecs.isEmpty());
final int updateCountPerThread = 1_000_000; final long updateCountPerThread = 1_000_000;
final int numThreads = 16; final int numThreads = 4;
final Thread[] threads = new Thread[numThreads]; final Thread[] threads = new Thread[numThreads];
final int batchSize = 1;
long previousBytes = 0;
for (int j = 0; j < 2; j++) { for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
final Thread t = new Thread(new Runnable() { final Thread t = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
for (int i = 0; i < updateCountPerThread; i++) { final List<DummyRecord> batch = new ArrayList<>();
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
for (int i = 0; i < updateCountPerThread / batchSize; i++) {
batch.clear();
for (int j = 0; j < batchSize; j++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
batch.add(record);
}
try { try {
repo.update(Collections.singleton(record), false); repo.update(batch, false);
} catch (IOException e) { } catch (Throwable t) {
e.printStackTrace(); t.printStackTrace();
Assert.fail(e.toString()); Assert.fail(t.toString());
} }
} }
} }
@ -185,11 +197,30 @@ public class TestMinimalLockingWriteAheadLog {
t.join(); t.join();
} }
long bytes = 0L;
for (final File file : path.toFile().listFiles()) {
if (file.getName().startsWith("partition-")) {
for (final File journalFile : file.listFiles()) {
bytes += journalFile.length();
}
}
}
bytes -= previousBytes;
previousBytes = bytes;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis;
final String eps = NumberFormat.getInstance().format(eventsPerSecond);
final long bytesPerSecond = bytes * 1000 / millis;
final String bps = NumberFormat.getInstance().format(bytesPerSecond);
if (j == 0) { if (j == 0) {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads, *as a warmup!*"); System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, *as a warmup!* "
+ eps + " events per second, " + bps + " bytes per second");
} else { } else {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads"); System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, "
+ eps + " events per second, " + bps + " bytes per second");
} }
} }
} }

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
@ -47,6 +48,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog; import org.wali.MinimalLockingWriteAheadLog;
@ -86,7 +88,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private volatile ScheduledFuture<?> checkpointFuture; private volatile ScheduledFuture<?> checkpointFuture;
private final long checkpointDelayMillis; private final long checkpointDelayMillis;
private final SortedSet<Path> flowFileRepositoryPaths = new TreeSet<>(); private final File flowFileRepositoryPath;
private final List<File> recoveryFiles = new ArrayList<>();
private final int numPartitions; private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor; private final ScheduledExecutorService checkpointExecutor;
@ -126,16 +129,23 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
checkpointDelayMillis = 0l; checkpointDelayMillis = 0l;
numPartitions = 0; numPartitions = 0;
checkpointExecutor = null; checkpointExecutor = null;
flowFileRepositoryPath = null;
} }
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false")); alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false"));
// determine the database file path and ensure it exists // determine the database file path and ensure it exists
final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPath = new File(directoryName);
// We used to use the MinimalLockingWriteAheadLog, but we now use the SequentialAccessWriteAheadLog. Since the
// MinimalLockingWriteAheadLog supports multiple partitions, we need to ensure that we recover records from all
// partitions, so we build up a List of Files for the recovery files.
for (final String propertyName : nifiProperties.getPropertyKeys()) { for (final String propertyName : nifiProperties.getPropertyKeys()) {
if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) { if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
final String directoryName = nifiProperties.getProperty(propertyName); final String dirName = nifiProperties.getProperty(propertyName);
flowFileRepositoryPaths.add(Paths.get(directoryName)); recoveryFiles.add(new File(dirName));
} }
} }
@ -149,16 +159,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
public void initialize(final ResourceClaimManager claimManager) throws IOException { public void initialize(final ResourceClaimManager claimManager) throws IOException {
this.claimManager = claimManager; this.claimManager = claimManager;
for (final Path path : flowFileRepositoryPaths) { Files.createDirectories(flowFileRepositoryPath.toPath());
Files.createDirectories(path);
}
// TODO: Should ensure that only 1 instance running and pointing at a particular path // TODO: Should ensure that only 1 instance running and pointing at a particular path
// TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on
// backup and then the data deleted from the normal location; then can move backup to normal location and // backup and then the data deleted from the normal location; then can move backup to normal location and
// delete backup. On restore, if no files exist in partition's directory, would have to check backup directory // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory
serdeFactory = new RepositoryRecordSerdeFactory(claimManager); serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, numPartitions, serdeFactory, this); wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPath, serdeFactory, this);
logger.info("Initialized FlowFile Repository using {} partitions", numPartitions); logger.info("Initialized FlowFile Repository using {} partitions", numPartitions);
} }
@ -179,22 +187,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override @Override
public long getStorageCapacity() throws IOException { public long getStorageCapacity() throws IOException {
long capacity = 0L; return Files.getFileStore(flowFileRepositoryPath.toPath()).getTotalSpace();
for (final Path path : flowFileRepositoryPaths) {
capacity += Files.getFileStore(path).getTotalSpace();
}
return capacity;
} }
@Override @Override
public long getUsableStorageSpace() throws IOException { public long getUsableStorageSpace() throws IOException {
long usableSpace = 0L; return Files.getFileStore(flowFileRepositoryPath.toPath()).getUsableSpace();
for (final Path path : flowFileRepositoryPaths) {
usableSpace += Files.getFileStore(path).getUsableSpace();
}
return usableSpace;
} }
@Override @Override
@ -371,6 +369,72 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
} }
@SuppressWarnings("deprecation")
private Optional<Collection<RepositoryRecord>> recoverFromOldWriteAheadLog() throws IOException {
final List<File> partitionDirs = new ArrayList<>();
for (final File recoveryFile : recoveryFiles) {
final File[] partitions = recoveryFile.listFiles(file -> file.getName().startsWith("partition-"));
for (final File partition : partitions) {
partitionDirs.add(partition);
}
}
if (partitionDirs == null || partitionDirs.isEmpty()) {
return Optional.empty();
}
logger.info("Encountered FlowFile Repository that was written using an old version of the Write-Ahead Log. "
+ "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
final SortedSet<Path> paths = recoveryFiles.stream()
.map(File::toPath)
.collect(Collectors.toCollection(TreeSet::new));
final Collection<RepositoryRecord> recordList;
final MinimalLockingWriteAheadLog<RepositoryRecord> minimalLockingWal = new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, null);
try {
recordList = minimalLockingWal.recoverRecords();
} finally {
minimalLockingWal.shutdown();
}
wal.update(recordList, true);
// Delete the old repository
logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files.");
for (final File partitionDir : partitionDirs) {
final File[] children = partitionDir.listFiles();
if (children != null) {
for (final File child : children) {
final boolean deleted = child.delete();
if (!deleted) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", child);
}
}
}
if (!partitionDir.delete()) {
logger.warn("Failed to delete old directory {}; this directory should be cleaned up manually", partitionDir);
}
}
for (final File recoveryFile : recoveryFiles) {
final File snapshotFile = new File(recoveryFile, "snapshot");
if (!snapshotFile.delete() && snapshotFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", snapshotFile);
}
final File partialFile = new File(recoveryFile, "snapshot.partial");
if (!partialFile.delete() && partialFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", partialFile);
}
}
return Optional.of(recordList);
}
@Override @Override
public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException { public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException {
final Map<String, FlowFileQueue> queueMap = new HashMap<>(); final Map<String, FlowFileQueue> queueMap = new HashMap<>();
@ -378,7 +442,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
queueMap.put(queue.getIdentifier(), queue); queueMap.put(queue.getIdentifier(), queue);
} }
serdeFactory.setQueueMap(queueMap); serdeFactory.setQueueMap(queueMap);
final Collection<RepositoryRecord> recordList = wal.recoverRecords();
// Since we used to use the MinimalLockingWriteAheadRepository, we need to ensure that if the FlowFile
// Repo was written using that impl, that we properly recover from the implementation.
Collection<RepositoryRecord> recordList = wal.recoverRecords();
if (recordList == null || recordList.isEmpty()) {
recordList = recoverFromOldWriteAheadLog().orElse(new ArrayList<>());
}
serdeFactory.setQueueMap(null); serdeFactory.setQueueMap(null);
for (final RepositoryRecord record : recordList) { for (final RepositoryRecord record : recordList) {