NIFI-11833 - remove deprecated classes in nifi-commons (#7503)

Signed-off-by: Otto Fowler <ottobackwards@gmail.com>
This commit is contained in:
Pierre Villard 2023-07-20 17:58:51 +02:00 committed by GitHub
parent 0b023a17ec
commit c06bb97e85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 108 additions and 2532 deletions

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import java.io.InputStream;
/**
* @deprecated use java.io.BufferedInputStream instead
*/
@Deprecated
public class BufferedInputStream extends java.io.BufferedInputStream {
public BufferedInputStream(InputStream in) {
super(in);
}
public BufferedInputStream(InputStream in, int bufferSize) {
super(in, bufferSize);
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import java.io.OutputStream;
/**
* @deprecated use java.io.BufferedOutputStream instead
*/
@Deprecated
public class BufferedOutputStream extends java.io.BufferedOutputStream {
public BufferedOutputStream(OutputStream out) {
super(out);
}
public BufferedOutputStream(OutputStream out, int bufferSize) {
super(out, bufferSize);
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
/**
* @deprecated use java.io.ByteArrayInputStream instead
*/
@Deprecated
public class ByteArrayInputStream extends java.io.ByteArrayInputStream {
public ByteArrayInputStream(byte[] buffer) {
super(buffer);
}
public ByteArrayInputStream(byte[] buffer, int offset, int length) {
super(buffer, offset, length);
}
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
/**
* @deprecated use java.io.ByteArrayOutputStream instead
*/
@Deprecated
public class ByteArrayOutputStream extends java.io.ByteArrayOutputStream {
public ByteArrayOutputStream() {
super();
}
public ByteArrayOutputStream(int initialBufferSize) {
super(initialBufferSize);
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import java.io.OutputStream;
/**
* @deprecated use java.io.DataOutputStream instead
*/
@Deprecated
public class DataOutputStream extends java.io.DataOutputStream {
public DataOutputStream(OutputStream out) {
super(out);
}
}

View File

@ -1,977 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wali;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("deprecation")
public class TestMinimalLockingWriteAheadLog {
private static final Logger logger = LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class);
@Test
public void testTruncatedPartitionHeader() throws IOException {
final int numPartitions = 4;
final Path path = Paths.get("target/testTruncatedPartitionHeader");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final AtomicInteger counter = new AtomicInteger(0);
final SerDe<Object> serde = new SerDe<Object>() {
@Override
public void readHeader(DataInputStream in) throws IOException {
if (counter.getAndIncrement() == 1) {
throw new EOFException("Intentionally thrown for unit test");
}
}
@Override
public void serializeEdit(Object previousRecordState, Object newRecordState, DataOutputStream out) throws IOException {
out.write(1);
}
@Override
public void serializeRecord(Object record, DataOutputStream out) throws IOException {
out.write(1);
}
@Override
public Object deserializeEdit(DataInputStream in, Map<Object, Object> currentRecordStates, int version) throws IOException {
final int val = in.read();
return (val == 1) ? new Object() : null;
}
@Override
public Object deserializeRecord(DataInputStream in, int version) throws IOException {
final int val = in.read();
return (val == 1) ? new Object() : null;
}
@Override
public Object getRecordIdentifier(Object record) {
return 1;
}
@Override
public UpdateType getUpdateType(Object record) {
return UpdateType.CREATE;
}
@Override
public String getLocation(Object record) {
return null;
}
@Override
public int getVersion() {
return 0;
}
};
final WriteAheadRepository<Object> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
try {
final Collection<Object> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
repo.update(Collections.singletonList(new Object()), false);
repo.update(Collections.singletonList(new Object()), false);
repo.update(Collections.singletonList(new Object()), false);
} finally {
repo.shutdown();
}
final WriteAheadRepository<Object> secondRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
try {
secondRepo.recoverRecords();
} finally {
secondRepo.shutdown();
}
}
@Test
@Disabled("For manual performance testing")
public void testUpdatePerformance() throws IOException, InterruptedException {
final int numPartitions = 16;
final Path path = Paths.get("target/minimal-locking-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final long updateCountPerThread = 1_000_000;
final int numThreads = 4;
final Thread[] threads = new Thread[numThreads];
final int batchSize = 1;
long previousBytes = 0;
for (int j = 0; j < 2; j++) {
for (int i = 0; i < numThreads; i++) {
final Thread t = new Thread(() -> {
final List<DummyRecord> batch = new ArrayList<>();
for (int i1 = 0; i1 < updateCountPerThread / batchSize; i1++) {
batch.clear();
for (int j1 = 0; j1 < batchSize; j1++) {
final DummyRecord record = new DummyRecord(String.valueOf(i1), UpdateType.CREATE);
batch.add(record);
}
assertDoesNotThrow(() -> repo.update(batch, false));
}
});
threads[i] = t;
}
final long start = System.nanoTime();
for (final Thread t : threads) {
t.start();
}
for (final Thread t : threads) {
t.join();
}
long bytes = 0L;
for (final File file : path.toFile().listFiles()) {
if (file.getName().startsWith("partition-")) {
for (final File journalFile : file.listFiles()) {
bytes += journalFile.length();
}
}
}
bytes -= previousBytes;
previousBytes = bytes;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final long eventsPerSecond = (updateCountPerThread * numThreads * 1000) / millis;
final String eps = NumberFormat.getInstance().format(eventsPerSecond);
final long bytesPerSecond = bytes * 1000 / millis;
final String bps = NumberFormat.getInstance().format(bytesPerSecond);
if (j == 0) {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, *as a warmup!* "
+ eps + " events per second, " + bps + " bytes per second");
} else {
System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numThreads + " threads, "
+ eps + " events per second, " + bps + " bytes per second");
}
}
}
@Test
public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws IOException, InterruptedException {
final int numPartitions = 8;
final Path path = Paths.get("target/minimal-locking-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
try {
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
serde.setThrowOOMEAfterNSerializeEdits(100);
for (int i = 0; i < 108; i++) {
try {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
repo.update(Collections.singleton(record), false);
} catch (final OutOfMemoryError oome) {
logger.info("Received OOME on record " + i);
}
}
long expectedSize = sizeOf(path.toFile());
for (int i = 0; i < 1000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
assertThrows(IOException.class, () -> repo.update(Collections.singleton(record), false));
}
long newSize = sizeOf(path.toFile());
assertEquals(expectedSize, newSize);
assertThrows(OutOfMemoryError.class, () -> repo.checkpoint());
expectedSize = sizeOf(path.toFile());
for (int i = 0; i < 100000; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
assertThrows(IOException.class, () -> repo.update(Collections.singleton(record), false));
}
newSize = sizeOf(path.toFile());
assertEquals(expectedSize, newSize);
} finally {
repo.shutdown();
}
}
/**
* This test is intended to continually update the Write-ahead log using many threads, then
* stop and restore the repository to check for any corruption. There were reports of potential threading
* issues leading to repository corruption. This was an attempt to replicate. It should not be run as a
* unit test, really, but will be left, as it can be valuable to exercise the implementation
*
* @throws IOException if unable to read from/write to the write-ahead log
* @throws InterruptedException if a thread is interrupted
*/
@Test
@Disabled
public void tryToCauseThreadingIssue() throws IOException, InterruptedException {
System.setProperty("org.slf4j.simpleLogger.log.org.wali", "INFO");
final int numThreads = 12;
final long iterationsPerThread = 1000000;
final int numAttempts = 1000;
final Path path = Paths.get("D:/dummy/minimal-locking-repo");
path.toFile().mkdirs();
final AtomicReference<WriteAheadRepository<DummyRecord>> writeRepoRef = new AtomicReference<>();
final AtomicBoolean checkpointing = new AtomicBoolean(false);
final Thread bgThread = new Thread(() -> {
while (true) {
checkpointing.set(true);
final WriteAheadRepository<DummyRecord> repo = writeRepoRef.get();
if (repo != null) {
assertDoesNotThrow(() -> repo.checkpoint());
}
checkpointing.set(false);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
});
bgThread.setDaemon(true);
bgThread.start();
for (int x = 0; x < numAttempts; x++) {
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
final Collection<DummyRecord> writeRecords = writeRepo.recoverRecords();
for (final DummyRecord record : writeRecords) {
assertEquals("B", record.getProperty("A"));
}
writeRepoRef.set(writeRepo);
final Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final Thread t = new InlineCreationInsertThread(iterationsPerThread, writeRepo);
t.start();
threads[i] = t;
}
for (final Thread t : threads) {
t.join();
}
writeRepoRef.set(null);
writeRepo.shutdown();
boolean cp = checkpointing.get();
while (cp) {
Thread.sleep(100L);
cp = checkpointing.get();
}
final WriteAheadRepository<DummyRecord> readRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
// ensure that we are able to recover the records properly
final Collection<DummyRecord> readRecords = readRepo.recoverRecords();
for (final DummyRecord record : readRecords) {
assertEquals("B", record.getProperty("A"));
}
readRepo.shutdown();
}
}
@Test
public void testWrite() throws IOException, InterruptedException {
final int numPartitions = 8;
final Path path = Paths.get("target/minimal-locking-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final List<InsertThread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new InsertThread(10000, 1000000 * i, repo));
}
final long start = System.nanoTime();
for (final InsertThread thread : threads) {
thread.start();
}
for (final InsertThread thread : threads) {
thread.join();
}
final long nanos = System.nanoTime() - start;
final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
System.out.println("Took " + millis + " millis to insert 1,000,000 records each in its own transaction");
repo.shutdown();
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(100000, recoveredRecords.size());
for (final DummyRecord record : recoveredRecords) {
final Map<String, String> recoveredProps = record.getProperties();
assertEquals(1, recoveredProps.size());
assertEquals("B", recoveredProps.get("A"));
}
}
@Test
public void testRecoverAfterIOException() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/minimal-locking-repo-test-recover-after-ioe");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
serde.setThrowIOEAfterNSerializeEdits(7); // serialize the 2 transactions, then the first edit of the third transaction; then throw IOException
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
repo.update(secondTransaction, true);
assertThrows(IOException.class, () -> repo.update(thirdTransaction, true));
repo.shutdown();
serde.setThrowIOEAfterNSerializeEdits(-1);
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(3, recoveredRecords.size());
boolean record1 = false, record2 = false, record3 = false;
for (final DummyRecord record : recoveredRecords) {
switch (record.getId()) {
case "1":
record1 = true;
assertEquals("123", record.getProperty("abc"));
break;
case "2":
record2 = true;
assertEquals("123", record.getProperty("cba"));
break;
case "3":
record3 = true;
assertEquals("123", record.getProperty("aaa"));
break;
}
}
assertTrue(record1);
assertTrue(record2);
assertTrue(record3);
}
@Test
public void testRecoverFileThatHasTrailingNULBytesAndTruncation() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytesAndTruncation");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
repo.update(secondTransaction, true);
repo.update(thirdTransaction, true);
repo.shutdown();
final File partition3Dir = path.resolve("partition-2").toFile();
final File journalFile = partition3Dir.listFiles()[0];
final byte[] contents = Files.readAllBytes(journalFile.toPath());
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8);
final byte[] withNuls = new byte[truncated.length + 28];
System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
try (final OutputStream fos = new FileOutputStream(journalFile)) {
fos.write(withNuls);
}
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(3, recoveredRecords.size());
boolean record1 = false, record2 = false, record3 = false;
for (final DummyRecord record : recoveredRecords) {
switch (record.getId()) {
case "1":
record1 = true;
assertEquals("123", record.getProperty("abc"));
break;
case "2":
record2 = true;
assertEquals("123", record.getProperty("cba"));
break;
case "3":
record3 = true;
assertEquals("123", record.getProperty("aaa"));
break;
}
}
assertTrue(record1);
assertTrue(record2);
assertTrue(record3);
}
@Test
public void testRecoverFileThatHasTrailingNULBytesNoTruncation() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytesNoTruncation");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
repo.update(secondTransaction, true);
repo.update(thirdTransaction, true);
repo.shutdown();
final File partition3Dir = path.resolve("partition-2").toFile();
final File journalFile = partition3Dir.listFiles()[0];
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] withNuls = new byte[28];
try (final OutputStream fos = new FileOutputStream(journalFile, true)) {
fos.write(withNuls);
}
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(1, recoveredRecords.size());
boolean record1 = false, record2 = false, record3 = false;
for (final DummyRecord record : recoveredRecords) {
switch (record.getId()) {
case "1":
record1 = record.getUpdateType() != UpdateType.DELETE;
assertEquals("123", record.getProperty("abc"));
break;
case "2":
record2 = record.getUpdateType() != UpdateType.DELETE;
assertEquals("123", record.getProperty("cba"));
break;
case "3":
record3 = true;
assertEquals("123", record.getProperty("aaa"));
break;
}
}
assertFalse(record1);
assertFalse(record2);
assertTrue(record3);
}
@Test
public void testCannotModifyLogAfterAllAreBlackListed() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
serde.setThrowIOEAfterNSerializeEdits(3); // serialize the first transaction, then fail on all subsequent transactions
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
assertThrows(IOException.class, () -> repo.update(secondTransaction, true));
for (int i = 0; i < 4; i++) {
assertThrows(IOException.class, () -> repo.update(thirdTransaction, true));
}
serde.setThrowIOEAfterNSerializeEdits(-1);
final List<DummyRecord> fourthTransaction = new ArrayList<>();
fourthTransaction.add(new DummyRecord("1", UpdateType.DELETE));
IOException e = assertThrows(IOException.class, () -> repo.update(fourthTransaction, true));
assertTrue(e.getMessage().contains("All Partitions have been blacklisted"));
repo.shutdown();
serde.setThrowIOEAfterNSerializeEdits(-1);
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(3, recoveredRecords.size());
}
@Test
public void testStriping() throws IOException {
final int numPartitions = 6;
final Path path = Paths.get("target/minimal-locking-repo-striped");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final SortedSet<Path> paths = new TreeSet<>();
paths.add(path.resolve("stripe-1"));
paths.add(path.resolve("stripe-2"));
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final InsertThread inserter = new InsertThread(100000, 0, repo);
inserter.run();
for (final Path partitionPath : paths) {
final File[] files = partitionPath.toFile().listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().startsWith("partition");
}
});
assertEquals(3, files.length);
for (final File file : files) {
final File[] journalFiles = file.listFiles();
assertEquals(1, journalFiles.length);
}
}
repo.checkpoint();
}
@Test
public void testShutdownWhileBlacklisted() throws IOException {
final Path path = Paths.get("target/minimal-locking-repo-shutdown-blacklisted");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final SerDe<SimpleRecord> failOnThirdWriteSerde = new SerDe<SimpleRecord>() {
private int writes = 0;
@Override
public void serializeEdit(SimpleRecord previousRecordState, SimpleRecord newRecordState, DataOutputStream out) throws IOException {
serializeRecord(newRecordState, out);
}
@Override
public void serializeRecord(SimpleRecord record, DataOutputStream out) throws IOException {
int size = (int) record.getSize();
out.writeLong(record.getSize());
for (int i = 0; i < size; i++) {
out.write('A');
}
if (++writes == 3) {
throw new IOException("Intentional Exception for Unit Testing");
}
out.writeLong(record.getId());
}
@Override
public SimpleRecord deserializeEdit(DataInputStream in, Map<Object, SimpleRecord> currentRecordStates, int version) throws IOException {
return deserializeRecord(in, version);
}
@Override
public SimpleRecord deserializeRecord(DataInputStream in, int version) throws IOException {
long size = in.readLong();
for (int i = 0; i < (int) size; i++) {
in.read();
}
long id = in.readLong();
return new SimpleRecord(id, size);
}
@Override
public Object getRecordIdentifier(SimpleRecord record) {
return record.getId();
}
@Override
public UpdateType getUpdateType(SimpleRecord record) {
return UpdateType.CREATE;
}
@Override
public String getLocation(SimpleRecord record) {
return null;
}
@Override
public int getVersion() {
return 0;
}
};
final WriteAheadRepository<SimpleRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 1, failOnThirdWriteSerde, null);
final Collection<SimpleRecord> initialRecs = writeRepo.recoverRecords();
assertTrue(initialRecs.isEmpty());
writeRepo.update(Collections.singleton(new SimpleRecord(1L, 1L)), false);
writeRepo.update(Collections.singleton(new SimpleRecord(2L, 2L)), false);
// Use a size of 8194 because the BufferedOutputStream has a buffer size of 8192 and we want
// to exceed this for testing purposes.
assertThrows(IOException.class,
() -> writeRepo.update(Collections.singleton(new SimpleRecord(3L, 8194L)), false));
final Path partitionDir = path.resolve("partition-0");
final File journalFile = partitionDir.toFile().listFiles()[0];
final long journalFileSize = journalFile.length();
verifyBlacklistedJournalContents(journalFile, failOnThirdWriteSerde);
writeRepo.shutdown();
// Ensure that calling shutdown() didn't write anything to the journal file
final long newJournalSize = journalFile.length();
assertEquals(newJournalSize, journalFile.length(), "Calling Shutdown wrote " + (newJournalSize - journalFileSize) + " bytes to the journal file");
}
private void verifyBlacklistedJournalContents(final File journalFile, final SerDe<?> serde) throws IOException {
try (final FileInputStream fis = new FileInputStream(journalFile);
final InputStream bis = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bis)) {
// Verify header info.
final String waliClassName = in.readUTF();
assertEquals(MinimalLockingWriteAheadLog.class.getName(), waliClassName);
final int waliVersion = in.readInt();
assertTrue(waliVersion > 0);
final String serdeClassName = in.readUTF();
assertEquals(serde.getClass().getName(), serdeClassName);
final int serdeVersion = in.readInt();
assertEquals(serde.getVersion(), serdeVersion);
for (int i = 0; i < 2; i++) {
long transactionId = in.readLong();
assertEquals(i, transactionId);
// read what serde wrote
long size = in.readLong();
assertEquals((i + 1), size);
for (int j = 0; j < (int) size; j++) {
final int c = in.read();
assertEquals('A', c);
}
long id = in.readLong();
assertEquals((i + 1), id);
int transactionIndicator = in.read();
assertEquals(2, transactionIndicator);
}
// In previous implementations, we would still have a partial record written out.
// In the current version, however, the serde above would result in the data serialization
// failing and as a result no data would be written to the stream, so the stream should
// now be out of data
final int nextByte = in.read();
assertEquals(-1, nextByte);
}
}
@Test
public void testDecreaseNumberOfPartitions() throws IOException {
final Path path = Paths.get("target/minimal-locking-repo-decrease-partitions");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
final Collection<DummyRecord> initialRecs = writeRepo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final DummyRecord record1 = new DummyRecord("1", UpdateType.CREATE);
writeRepo.update(Collections.singleton(record1), false);
for (int i=0; i < 8; i++) {
final DummyRecord r = new DummyRecord("1", UpdateType.UPDATE);
r.setProperty("i", String.valueOf(i));
writeRepo.update(Collections.singleton(r), false);
}
writeRepo.shutdown();
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, 6, serde, null);
final Collection<DummyRecord> records = recoverRepo.recoverRecords();
final List<DummyRecord> list = new ArrayList<>(records);
assertEquals(1, list.size());
final DummyRecord recoveredRecord = list.get(0);
assertEquals("1", recoveredRecord.getId());
assertEquals("7",recoveredRecord.getProperty("i"));
}
private static class InsertThread extends Thread {
private final List<List<DummyRecord>> records;
private final WriteAheadRepository<DummyRecord> repo;
public InsertThread(final int numInsertions, final int startIndex, final WriteAheadRepository<DummyRecord> repo) {
records = new ArrayList<>();
for (int i = 0; i < numInsertions; i++) {
final DummyRecord record = new DummyRecord(String.valueOf(i + startIndex), UpdateType.CREATE);
record.setProperty("A", "B");
final List<DummyRecord> list = new ArrayList<>();
list.add(record);
records.add(list);
}
this.repo = repo;
}
@Override
public void run() {
int counter = 0;
for (final List<DummyRecord> list : records) {
final boolean forceSync = (++counter == records.size());
assertDoesNotThrow(() -> repo.update(list, forceSync));
}
}
}
private static class InlineCreationInsertThread extends Thread {
private final long iterations;
private final WriteAheadRepository<DummyRecord> repo;
public InlineCreationInsertThread(final long numInsertions, final WriteAheadRepository<DummyRecord> repo) {
this.iterations = numInsertions;
this.repo = repo;
}
@Override
public void run() {
final List<DummyRecord> list = new ArrayList<>(1);
list.add(null);
final UpdateType[] updateTypes = new UpdateType[] { UpdateType.CREATE, UpdateType.DELETE, UpdateType.UPDATE };
final Random random = new Random();
for (long i = 0; i < iterations; i++) {
final int updateTypeIndex = random.nextInt(updateTypes.length);
final UpdateType updateType = updateTypes[updateTypeIndex];
final DummyRecord record = new DummyRecord(String.valueOf(i), updateType);
record.setProperty("A", "B");
list.set(0, record);
try {
repo.update(list, false);
} catch (final Throwable t) {
t.printStackTrace();
}
}
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
deleteRecursively(child);
}
}
file.delete();
}
private long sizeOf(final File file) {
long size = 0L;
if (file.isDirectory()) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
size += sizeOf(child);
}
}
}
size += file.length();
return size;
}
static class SimpleRecord {
private long id;
private long size;
public SimpleRecord(final long id, final long size) {
this.id = id;
this.size = size;
}
public long getId() {
return id;
}
public long getSize() {
return size;
}
}
}

View File

@ -31,7 +31,6 @@ import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SnapshotCapture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;
@ -47,10 +46,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -94,7 +90,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog";
private static final String DEFAULT_WAL_IMPLEMENTATION = SEQUENTIAL_ACCESS_WAL;
private static final int DEFAULT_CACHE_SIZE = 10_000_000;
@ -110,7 +105,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private final long checkpointDelayMillis;
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
private final List<File> recoveryFiles = new ArrayList<>();
private final ScheduledExecutorService checkpointExecutor;
private final int maxCharactersToCache;
@ -176,24 +170,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
this.walImplementation = writeAheadLogImpl;
this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
// We used to use one implementation (minimal locking) of the write-ahead log, but we now want to use the other
// (sequential access), we must address this. Since the MinimalLockingWriteAheadLog supports multiple partitions,
// we need to ensure that we recover records from all partitions, so we build up a List of Files for the
// recovery files.
for (final String propertyName : nifiProperties.getPropertyKeys()) {
if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
final String dirName = nifiProperties.getProperty(propertyName);
recoveryFiles.add(new File(dirName));
}
}
if (isSequentialAccessWAL(walImplementation)) {
final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPaths.add(new File(directoryName));
} else {
flowFileRepositoryPaths.addAll(recoveryFiles);
}
final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPaths.add(new File(directoryName));
checkpointDelayMillis = FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
@ -207,16 +185,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
});
}
/**
* Returns true if the provided implementation is a sequential access write ahead log (plaintext or encrypted).
*
* @param walImplementation the implementation to check
* @return true if this implementation is sequential access
*/
private static boolean isSequentialAccessWAL(String walImplementation) {
return walImplementation.equals(SEQUENTIAL_ACCESS_WAL) || walImplementation.equals(ENCRYPTED_SEQUENTIAL_ACCESS_WAL);
}
@Override
public void initialize(final ResourceClaimManager claimManager) throws IOException {
final FieldCache fieldCache = new CaffeineFieldCache(maxCharactersToCache);
@ -251,15 +219,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
this.serdeFactory = serdeFactory;
// The specified implementation can be plaintext or encrypted; the only difference is the serde factory
if (isSequentialAccessWAL(walImplementation)) {
if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL) || walImplementation.equals(ENCRYPTED_SEQUENTIAL_ACCESS_WAL)) {
// TODO: May need to instantiate ESAWAL for clarity?
wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPaths.get(0), serdeFactory, this);
} else if (walImplementation.equals(MINIMAL_LOCKING_WALI)) {
final SortedSet<Path> paths = flowFileRepositoryPaths.stream()
.map(File::toPath)
.collect(Collectors.toCollection(TreeSet::new));
wal = new MinimalLockingWriteAheadLog<>(paths, 1, serdeFactory, this);
} else {
throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property '" + WRITE_AHEAD_LOG_IMPL + "' has an invalid value of '" + walImplementation
+ "'. Please update nifi.properties to indicate a valid value for this property.");
@ -285,10 +247,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override
public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(final Set<ResourceClaim> resourceClaims, final FlowFileSwapManager swapManager) {
if (!(isSequentialAccessWAL(walImplementation))) {
return null;
}
final Map<ResourceClaim, Set<ResourceClaimReference>> references = new HashMap<>();
final SnapshotCapture<SerializedRepositoryRecord> snapshot = ((SequentialAccessWriteAheadLog<SerializedRepositoryRecord>) wal).captureSnapshot();
@ -747,95 +705,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
}
private Optional<Collection<SerializedRepositoryRecord>> migrateFromSequentialAccessLog(final WriteAheadRepository<SerializedRepositoryRecord> toUpdate) throws IOException {
final String recoveryDirName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
final File recoveryDir = new File(recoveryDirName);
if (!recoveryDir.exists()) {
return Optional.empty();
}
final WriteAheadRepository<SerializedRepositoryRecord> recoveryWal = new SequentialAccessWriteAheadLog<>(recoveryDir, serdeFactory, this);
logger.info("Encountered FlowFile Repository that was written using the Sequential Access Write Ahead Log. Will recover from this version.");
final Collection<SerializedRepositoryRecord> recordList;
try {
recordList = recoveryWal.recoverRecords();
} finally {
recoveryWal.shutdown();
}
toUpdate.update(recordList, true);
logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new Write-Ahead Log. Will not delete old files.");
final File journalsDir = new File(recoveryDir, "journals");
deleteRecursively(journalsDir);
final File checkpointFile = new File(recoveryDir, "checkpoint");
if (!checkpointFile.delete() && checkpointFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", checkpointFile);
}
final File partialFile = new File(recoveryDir, "checkpoint.partial");
if (!partialFile.delete() && partialFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", partialFile);
}
return Optional.of(recordList);
}
@SuppressWarnings("deprecation")
private Optional<Collection<SerializedRepositoryRecord>> migrateFromMinimalLockingLog(final WriteAheadRepository<SerializedRepositoryRecord> toUpdate) throws IOException {
final List<File> partitionDirs = new ArrayList<>();
for (final File recoveryFile : recoveryFiles) {
final File[] partitions = recoveryFile.listFiles(file -> file.getName().startsWith("partition-"));
for (final File partition : partitions) {
partitionDirs.add(partition);
}
}
if (partitionDirs == null || partitionDirs.isEmpty()) {
return Optional.empty();
}
logger.info("Encountered FlowFile Repository that was written using the 'Minimal Locking Write-Ahead Log'. "
+ "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
final SortedSet<Path> paths = recoveryFiles.stream()
.map(File::toPath)
.collect(Collectors.toCollection(TreeSet::new));
final Collection<SerializedRepositoryRecord> recordList;
final MinimalLockingWriteAheadLog<SerializedRepositoryRecord> minimalLockingWal = new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, null);
try {
recordList = minimalLockingWal.recoverRecords();
} finally {
minimalLockingWal.shutdown();
}
toUpdate.update(recordList, true);
// Delete the old repository
logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files.");
for (final File partitionDir : partitionDirs) {
deleteRecursively(partitionDir);
}
for (final File recoveryFile : recoveryFiles) {
final File snapshotFile = new File(recoveryFile, "snapshot");
if (!snapshotFile.delete() && snapshotFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", snapshotFile);
}
final File partialFile = new File(recoveryFile, "snapshot.partial");
if (!partialFile.delete() && partialFile.exists()) {
logger.warn("Failed to delete old file {}; this file should be cleaned up manually", partialFile);
}
}
return Optional.of(recordList);
}
@Override
public Set<String> findQueuesWithFlowFiles(final FlowFileSwapManager swapManager) throws IOException {
if (recoveredRecords == null) {
@ -885,23 +754,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
}
// If we didn't recover any records from our write-ahead log, attempt to recover records from the other implementation
// of the write-ahead log. We do this in case the user changed the "nifi.flowfile.repository.wal.impl" property.
// In such a case, we still want to recover the records from the previous FlowFile Repository and write them into the new one.
// Since these implementations do not write to the same files, they will not interfere with one another. If we do recover records,
// then we will update the new WAL (with fsync()) and delete the old repository so that we won't recover it again.
if (recordList == null || recordList.isEmpty()) {
if (isSequentialAccessWAL(walImplementation)) {
// Configured to use Sequential Access WAL but it has no records. Check if there are records in
// a MinimalLockingWriteAheadLog that we can recover.
recordList = migrateFromMinimalLockingLog(wal).orElse(new ArrayList<>());
} else {
// Configured to use Minimal Locking WAL but it has no records. Check if there are records in
// a SequentialAccess Log that we can recover.
recordList = migrateFromSequentialAccessLog(wal).orElse(new ArrayList<>());
}
}
fieldCache.clear();
final Map<String, FlowFileQueue> queueMap = new HashMap<>();

View File

@ -43,9 +43,11 @@ import org.apache.nifi.controller.state.StateMapSerDe;
import org.apache.nifi.controller.state.StateMapUpdate;
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
@ -131,7 +133,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
}
versionGenerator = new AtomicLong(-1L);
writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), numPartitions, serde, null);
writeAheadLog = new SequentialAccessWriteAheadLog<>(basePath, new SerdeFactory(serde));
final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords();
long maxRecordVersion = EMPTY_VERSION;
@ -319,4 +321,34 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
return t;
}
}
private static class SerdeFactory implements SerDeFactory<StateMapUpdate> {
private StateMapSerDe serde;
public SerdeFactory(StateMapSerDe serde) {
this.serde = serde;
}
@Override
public SerDe<StateMapUpdate> createSerDe(String encodingName) {
return this.serde;
}
@Override
public Object getRecordIdentifier(StateMapUpdate record) {
return this.serde.getRecordIdentifier(record);
}
@Override
public UpdateType getUpdateType(StateMapUpdate record) {
return this.serde.getUpdateType(record);
}
@Override
public String getLocation(StateMapUpdate record) {
return this.serde.getLocation(record);
}
}
}

View File

@ -43,13 +43,13 @@ import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.WriteAheadRepository;
import java.io.File;
@ -359,7 +359,7 @@ public class TestWriteAheadFlowFileRepository {
final ResourceClaimManager claimManager = new StandardResourceClaimManager();
final StandardRepositoryRecordSerdeFactory serdeFactory = new StandardRepositoryRecordSerdeFactory(claimManager);
final WriteAheadRepository<SerializedRepositoryRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serdeFactory, null);
final WriteAheadRepository<SerializedRepositoryRecord> repo = new SequentialAccessWriteAheadLog<>(path.toFile(), serdeFactory);
final Collection<SerializedRepositoryRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());

View File

@ -33,10 +33,10 @@ import org.apache.nifi.remote.protocol.HandshakeProperties;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

View File

@ -29,11 +29,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
@ -48,7 +50,7 @@ public class PersistentMapCache implements MapCache {
public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException {
try {
wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
wali = new SequentialAccessWriteAheadLog<>(persistencePath, new SerdeFactory());
} catch (OverlappingFileLockException ex) {
logger.error("OverlappingFileLockException thrown: Check lock location - possible duplicate persistencePath conflict in PersistentMapCache.");
// Propagate the exception
@ -276,4 +278,34 @@ public class PersistentMapCache implements MapCache {
return 1;
}
}
private static class SerdeFactory implements SerDeFactory<MapWaliRecord> {
private Serde serde;
public SerdeFactory() {
this.serde = new Serde();
}
@Override
public SerDe<MapWaliRecord> createSerDe(String encodingName) {
return this.serde;
}
@Override
public Object getRecordIdentifier(MapWaliRecord record) {
return this.serde.getRecordIdentifier(record);
}
@Override
public UpdateType getUpdateType(MapWaliRecord record) {
return this.serde.getUpdateType(record);
}
@Override
public String getLocation(MapWaliRecord record) {
return this.serde.getLocation(record);
}
}
}

View File

@ -29,8 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.wali.MinimalLockingWriteAheadLog;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
@ -42,7 +43,7 @@ public class PersistentSetCache implements SetCache {
private final AtomicLong modifications = new AtomicLong(0L);
public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException {
wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
wali = new SequentialAccessWriteAheadLog<>(persistencePath, new SerdeFactory());
wrapped = cacheToWrap;
}
@ -192,4 +193,34 @@ public class PersistentSetCache implements SetCache {
return 1;
}
}
private static class SerdeFactory implements SerDeFactory<SetRecord> {
private Serde serde;
public SerdeFactory() {
this.serde = new Serde();
}
@Override
public SerDe<SetRecord> createSerDe(String encodingName) {
return this.serde;
}
@Override
public Object getRecordIdentifier(SetRecord record) {
return this.serde.getRecordIdentifier(record);
}
@Override
public UpdateType getUpdateType(SetRecord record) {
return this.serde.getUpdateType(record);
}
@Override
public String getLocation(SetRecord record) {
return this.serde.getLocation(record);
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.nio.channels.OverlappingFileLockException;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestPersistentMapCache {
/**
* Test OverlappingFileLockException is caught when persistent path is duplicated.
*/
@Test
public void testDuplicatePersistenceDirectory() {
assertThrows(OverlappingFileLockException.class, () -> {
File duplicatedFilePath = new File("/tmp/path1");
final MapCache cache = new SimpleMapCache("simpleCache", 2, EvictionPolicy.FIFO);
PersistentMapCache pmc1 = new PersistentMapCache("id1", duplicatedFilePath, cache);
PersistentMapCache pmc2 = new PersistentMapCache("id2", duplicatedFilePath, cache);
});
}
}