diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java index 09878ff9ab..53f74e070d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java @@ -75,7 +75,7 @@ public class IndexDirectoryManager { final long startTime = DirectoryUtils.getIndexTimestamp(indexDir); final List dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>()); - final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize()); + final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName); dirsForTimestamp.add(indexLoc); final Tuple tuple = latestIndexByStorageDir.get(storageDir); @@ -99,8 +99,7 @@ public class IndexDirectoryManager { final Map.Entry> entry = itr.next(); final List locations = entry.getValue(); - final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), - directory.getName(), repoConfig.getDesiredIndexSize()); + final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), directory.getName()); locations.remove(locToRemove); if (locations.isEmpty()) { itr.remove(); @@ -334,8 +333,8 @@ public class IndexDirectoryManager { */ public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) { IndexLocation indexLoc = activeIndices.get(partitionName); - if (indexLoc == null || indexLoc.isIndexFull()) { - indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize()); + if (indexLoc == null) { + indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName); logger.debug("Created new Index Directory {}", indexLoc); indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java index 33867c6d6a..f7de84fada 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java @@ -18,24 +18,16 @@ package org.apache.nifi.provenance.index.lucene; import java.io.File; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.provenance.util.DirectoryUtils; public class IndexLocation { - private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L); - private final File indexDirectory; private final long indexStartTimestamp; private final String partitionName; - private final long desiredIndexSize; - private volatile long lastSizeCheckTime = System.currentTimeMillis(); - public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) { + public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName) { this.indexDirectory = indexDirectory; this.indexStartTimestamp = indexStartTimestamp; this.partitionName = partitionName; - this.desiredIndexSize = desiredIndexSize; } public File getIndexDirectory() { @@ -50,17 +42,6 @@ public class IndexLocation { return partitionName; } - public boolean isIndexFull() { - final long now = System.currentTimeMillis(); - final long millisSinceLastSizeCheck = now - lastSizeCheckTime; - if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) { - return false; - } - - lastSizeCheckTime = now; - return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize; - } - @Override public int hashCode() { return 31 + 41 * indexDirectory.hashCode(); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java index 3f3c42296c..efcb601dbb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -82,12 +85,58 @@ public class TestIndexDirectoryManager { } + @Test + public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfig(2); + config.setDesiredIndexSize(4096 * 128); + + final File storageDir1 = config.getStorageDirectories().get("1"); + final File storageDir2 = config.getStorageDirectories().get("2"); + + final File index1 = new File(storageDir1, "index-1"); + final File index2 = new File(storageDir1, "index-2"); + final File index3 = new File(storageDir2, "index-3"); + final File index4 = new File(storageDir2, "index-4"); + + final File[] allIndices = new File[] {index1, index2, index3, index4}; + for (final File file : allIndices) { + assertTrue(file.mkdirs() || file.exists()); + } + + try { + final IndexDirectoryManager mgr = new IndexDirectoryManager(config); + mgr.initialize(); + + File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1"); + final File newFile = new File(indexDir, "1.bin"); + try (final OutputStream fos = new FileOutputStream(newFile)) { + final byte[] data = new byte[4096]; + for (int i = 0; i < 1024; i++) { + fos.write(data); + } + } + + try { + final File newDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1"); + assertEquals(indexDir, newDir); + } finally { + newFile.delete(); + } + } finally { + for (final File file : allIndices) { + file.delete(); + } + } + } + + + private IndexLocation createLocation(final long timestamp) { return createLocation(timestamp, "1"); } private IndexLocation createLocation(final long timestamp, final String partitionName) { - return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L); + return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName); } private RepositoryConfiguration createConfig(final int partitions) {