mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 06:55:28 +00:00
NIFI-3631: This closes #1613. Do not change Active Directory in IndexDirectoryManager when it becomes full. Instead, wait until it is committed and is removed by the onIndexCommitted method. This resolved a bug where the index can exceed the configured limit but not yet be committed and as a result would no longer be the active index. As a result, this bug causes the IndexWriter never to get closed/removed from the IndexManager, and so a memory leak is created
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
a5d630672a
commit
778ba3957e
@ -75,7 +75,7 @@ public class IndexDirectoryManager {
|
|||||||
|
|
||||||
final long startTime = DirectoryUtils.getIndexTimestamp(indexDir);
|
final long startTime = DirectoryUtils.getIndexTimestamp(indexDir);
|
||||||
final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>());
|
final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>());
|
||||||
final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize());
|
final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName);
|
||||||
dirsForTimestamp.add(indexLoc);
|
dirsForTimestamp.add(indexLoc);
|
||||||
|
|
||||||
final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir);
|
final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir);
|
||||||
@ -99,8 +99,7 @@ public class IndexDirectoryManager {
|
|||||||
final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
|
final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
|
||||||
final List<IndexLocation> locations = entry.getValue();
|
final List<IndexLocation> locations = entry.getValue();
|
||||||
|
|
||||||
final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory),
|
final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), directory.getName());
|
||||||
directory.getName(), repoConfig.getDesiredIndexSize());
|
|
||||||
locations.remove(locToRemove);
|
locations.remove(locToRemove);
|
||||||
if (locations.isEmpty()) {
|
if (locations.isEmpty()) {
|
||||||
itr.remove();
|
itr.remove();
|
||||||
@ -334,8 +333,8 @@ public class IndexDirectoryManager {
|
|||||||
*/
|
*/
|
||||||
public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) {
|
public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) {
|
||||||
IndexLocation indexLoc = activeIndices.get(partitionName);
|
IndexLocation indexLoc = activeIndices.get(partitionName);
|
||||||
if (indexLoc == null || indexLoc.isIndexFull()) {
|
if (indexLoc == null) {
|
||||||
indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize());
|
indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName);
|
||||||
logger.debug("Created new Index Directory {}", indexLoc);
|
logger.debug("Created new Index Directory {}", indexLoc);
|
||||||
|
|
||||||
indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc);
|
indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc);
|
||||||
|
@ -18,24 +18,16 @@
|
|||||||
package org.apache.nifi.provenance.index.lucene;
|
package org.apache.nifi.provenance.index.lucene;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.nifi.provenance.util.DirectoryUtils;
|
|
||||||
|
|
||||||
public class IndexLocation {
|
public class IndexLocation {
|
||||||
private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L);
|
|
||||||
|
|
||||||
private final File indexDirectory;
|
private final File indexDirectory;
|
||||||
private final long indexStartTimestamp;
|
private final long indexStartTimestamp;
|
||||||
private final String partitionName;
|
private final String partitionName;
|
||||||
private final long desiredIndexSize;
|
|
||||||
private volatile long lastSizeCheckTime = System.currentTimeMillis();
|
|
||||||
|
|
||||||
public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) {
|
public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName) {
|
||||||
this.indexDirectory = indexDirectory;
|
this.indexDirectory = indexDirectory;
|
||||||
this.indexStartTimestamp = indexStartTimestamp;
|
this.indexStartTimestamp = indexStartTimestamp;
|
||||||
this.partitionName = partitionName;
|
this.partitionName = partitionName;
|
||||||
this.desiredIndexSize = desiredIndexSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public File getIndexDirectory() {
|
public File getIndexDirectory() {
|
||||||
@ -50,17 +42,6 @@ public class IndexLocation {
|
|||||||
return partitionName;
|
return partitionName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isIndexFull() {
|
|
||||||
final long now = System.currentTimeMillis();
|
|
||||||
final long millisSinceLastSizeCheck = now - lastSizeCheckTime;
|
|
||||||
if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
lastSizeCheckTime = now;
|
|
||||||
return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return 31 + 41 * indexDirectory.hashCode();
|
return 31 + 41 * indexDirectory.hashCode();
|
||||||
|
@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -82,12 +85,58 @@ public class TestIndexDirectoryManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException {
|
||||||
|
final RepositoryConfiguration config = createConfig(2);
|
||||||
|
config.setDesiredIndexSize(4096 * 128);
|
||||||
|
|
||||||
|
final File storageDir1 = config.getStorageDirectories().get("1");
|
||||||
|
final File storageDir2 = config.getStorageDirectories().get("2");
|
||||||
|
|
||||||
|
final File index1 = new File(storageDir1, "index-1");
|
||||||
|
final File index2 = new File(storageDir1, "index-2");
|
||||||
|
final File index3 = new File(storageDir2, "index-3");
|
||||||
|
final File index4 = new File(storageDir2, "index-4");
|
||||||
|
|
||||||
|
final File[] allIndices = new File[] {index1, index2, index3, index4};
|
||||||
|
for (final File file : allIndices) {
|
||||||
|
assertTrue(file.mkdirs() || file.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final IndexDirectoryManager mgr = new IndexDirectoryManager(config);
|
||||||
|
mgr.initialize();
|
||||||
|
|
||||||
|
File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
|
||||||
|
final File newFile = new File(indexDir, "1.bin");
|
||||||
|
try (final OutputStream fos = new FileOutputStream(newFile)) {
|
||||||
|
final byte[] data = new byte[4096];
|
||||||
|
for (int i = 0; i < 1024; i++) {
|
||||||
|
fos.write(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final File newDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
|
||||||
|
assertEquals(indexDir, newDir);
|
||||||
|
} finally {
|
||||||
|
newFile.delete();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
for (final File file : allIndices) {
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private IndexLocation createLocation(final long timestamp) {
|
private IndexLocation createLocation(final long timestamp) {
|
||||||
return createLocation(timestamp, "1");
|
return createLocation(timestamp, "1");
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexLocation createLocation(final long timestamp, final String partitionName) {
|
private IndexLocation createLocation(final long timestamp, final String partitionName) {
|
||||||
return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L);
|
return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RepositoryConfiguration createConfig(final int partitions) {
|
private RepositoryConfiguration createConfig(final int partitions) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user