NIFI-892: If nifi.flowfile.repository.partitions property is changed, but repository already exists, just previous value

This commit is contained in:
Mark Payne 2015-08-24 16:30:30 -04:00
parent 310347fd66
commit 4baffacc42
2 changed files with 38 additions and 2 deletions

View File

@ -130,6 +130,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
throw new IllegalArgumentException("Paths must be non-empty");
}
int resolvedPartitionCount = partitionCount;
int existingPartitions = 0;
for (final Path path : paths) {
if (!Files.exists(path)) {
@ -162,6 +163,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has "
+ "{} partitions; ignoring argument and proceeding with {} partitions",
new Object[]{partitionCount, existingPartitions, existingPartitions});
resolvedPartitionCount = existingPartitions;
}
}
}
@ -175,10 +177,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
lockChannel.lock();
partitions = new Partition[partitionCount];
partitions = new Partition[resolvedPartitionCount];
Iterator<Path> pathIterator = paths.iterator();
for (int i = 0; i < partitionCount; i++) {
for (int i = 0; i < resolvedPartitionCount; i++) {
// If we're out of paths, create a new iterator to start over.
if (!pathIterator.hasNext()) {
pathIterator = paths.iterator();

View File

@ -417,6 +417,40 @@ public class TestMinimalLockingWriteAheadLog {
}
@Test
public void testDecreaseNumberOfPartitions() throws IOException {
final Path path = Paths.get("target/minimal-locking-repo-decrease-partitions");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
final Collection<DummyRecord> initialRecs = writeRepo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final DummyRecord record1 = new DummyRecord("1", UpdateType.CREATE);
writeRepo.update(Collections.singleton(record1), false);
for (int i=0; i < 8; i++) {
final DummyRecord r = new DummyRecord("1", UpdateType.UPDATE);
r.setProperty("i", String.valueOf(i));
writeRepo.update(Collections.singleton(r), false);
}
writeRepo.shutdown();
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, 6, serde, null);
final Collection<DummyRecord> records = recoverRepo.recoverRecords();
final List<DummyRecord> list = new ArrayList<>(records);
assertEquals(1, list.size());
final DummyRecord recoveredRecord = list.get(0);
assertEquals("1", recoveredRecord.getId());
assertEquals("7",recoveredRecord.getProperty("i"));
}
private static class InsertThread extends Thread {
private final List<List<DummyRecord>> records;