diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index a5889ed5ad..5a7656d378 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -130,6 +130,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor throw new IllegalArgumentException("Paths must be non-empty"); } + int resolvedPartitionCount = partitionCount; int existingPartitions = 0; for (final Path path : paths) { if (!Files.exists(path)) { @@ -162,6 +163,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has " + "{} partitions; ignoring argument and proceeding with {} partitions", new Object[]{partitionCount, existingPartitions, existingPartitions}); + resolvedPartitionCount = existingPartitions; } } } @@ -175,10 +177,10 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor lockChannel = new FileOutputStream(lockPath.toFile()).getChannel(); lockChannel.lock(); - partitions = new Partition[partitionCount]; + partitions = new Partition[resolvedPartitionCount]; Iterator pathIterator = paths.iterator(); - for (int i = 0; i < partitionCount; i++) { + for (int i = 0; i < resolvedPartitionCount; i++) { // If we're out of paths, create a new iterator to start over. if (!pathIterator.hasNext()) { pathIterator = paths.iterator(); diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 29d2e7f4bc..03e6581131 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -417,6 +417,40 @@ public class TestMinimalLockingWriteAheadLog { } + + @Test + public void testDecreaseNumberOfPartitions() throws IOException { + final Path path = Paths.get("target/minimal-locking-repo-decrease-partitions"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null); + final Collection initialRecs = writeRepo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final DummyRecord record1 = new DummyRecord("1", UpdateType.CREATE); + writeRepo.update(Collections.singleton(record1), false); + + for (int i=0; i < 8; i++) { + final DummyRecord r = new DummyRecord("1", UpdateType.UPDATE); + r.setProperty("i", String.valueOf(i)); + writeRepo.update(Collections.singleton(r), false); + } + + writeRepo.shutdown(); + + final WriteAheadRepository recoverRepo = new MinimalLockingWriteAheadLog<>(path, 6, serde, null); + final Collection records = recoverRepo.recoverRecords(); + final List list = new ArrayList<>(records); + assertEquals(1, list.size()); + + final DummyRecord recoveredRecord = list.get(0); + assertEquals("1", recoveredRecord.getId()); + assertEquals("7",recoveredRecord.getProperty("i")); + } + + private static class InsertThread extends Thread { private final List> records;