Fixed some bugs

This commit is contained in:
Marc D'Mello 2024-10-31 22:43:55 +00:00
parent 7ebccc6b9b
commit da6d13af31
3 changed files with 65 additions and 52 deletions

View File

@ -2451,6 +2451,7 @@ public class IndexWriter
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
if (shouldClose(true)) {
indexWriterRAMManager.removeWriter();
rollbackInternal();
}
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -31,7 +30,6 @@ public class IndexWriterRAMManager {
private final LinkedIdToWriter idToWriter = new LinkedIdToWriter();
private final AtomicInteger idGenerator = new AtomicInteger();
private double ramBufferSizeMB;
private final AtomicLong totalRamTracker;
/**
* Default constructor
@ -44,7 +42,6 @@ public class IndexWriterRAMManager {
throw new IllegalArgumentException("ramBufferSize should be > 0.0 MB when enabled");
}
this.ramBufferSizeMB = ramBufferSizeMB;
this.totalRamTracker = new AtomicLong(0);
}
/** Set the buffer size for this manager */
@ -79,11 +76,13 @@ public class IndexWriterRAMManager {
private void flushIfNecessary(
FlushPolicy flushPolicy, PerWriterIndexWriterRAMManager perWriterRAMManager)
throws IOException {
flushPolicy.flushWriter(this, perWriterRAMManager);
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
flushPolicy.flushWriter(this, perWriterRAMManager);
}
}
private long updateAndGetCurrentBytesUsed(int id) {
return totalRamTracker.addAndGet(idToWriter.updateRAMAndGetDifference(id));
return idToWriter.getTotalRamTracker(id);
}
/**
@ -116,6 +115,7 @@ public class IndexWriterRAMManager {
private final Map<Integer, IndexWriterNode> idToWriterNode = new HashMap<>();
private IndexWriterNode first;
private IndexWriterNode last;
private long totalRamTracker;
private final ReentrantLock lock = new ReentrantLock();
@ -123,65 +123,76 @@ public class IndexWriterRAMManager {
private int lastIdFlushed = -1;
void addWriter(IndexWriter writer, int id) {
IndexWriterNode node = new IndexWriterNode(writer, id);
lock.lock();
if (idToWriterNode.isEmpty()) {
first = node;
synchronized (lock) {
IndexWriterNode node = new IndexWriterNode(writer, id);
if (idToWriterNode.isEmpty()) {
first = node;
last = node;
}
node.next = first;
last.next = node;
node.prev = last;
last = node;
idToWriterNode.put(id, node);
}
node.next = first;
last.next = node;
node.prev = last;
last = node;
idToWriterNode.put(id, node);
lock.unlock();
}
void removeWriter(int id) {
lock.lock();
if (idToWriterNode.containsKey(id) == false) {
throw new IllegalArgumentException(
"Writer " + id + " has not been registered or has been removed already");
synchronized (lock) {
if (idToWriterNode.containsKey(id)) {
IndexWriterNode nodeToRemove = idToWriterNode.remove(id);
totalRamTracker -= nodeToRemove.ram;
if (idToWriterNode.isEmpty()) {
first = null;
last = null;
lastIdFlushed = -1;
return;
}
if (id == lastIdFlushed) {
lastIdFlushed = nodeToRemove.prev.id;
}
nodeToRemove.prev.next = nodeToRemove.next;
nodeToRemove.next.prev = nodeToRemove.prev;
if (nodeToRemove == first) {
first = nodeToRemove.next;
}
if (nodeToRemove == last) {
last = nodeToRemove.prev;
}
}
}
IndexWriterNode nodeToRemove = idToWriterNode.remove(id);
if (idToWriterNode.isEmpty()) {
first = null;
last = null;
return;
}
nodeToRemove.prev.next = nodeToRemove.next;
nodeToRemove.next.prev = nodeToRemove.prev;
if (nodeToRemove == first) {
first = nodeToRemove.next;
}
if (nodeToRemove == last) {
last = nodeToRemove.prev;
}
lock.unlock();
}
void flushRoundRobin() throws IOException {
lock.lock();
if (idToWriterNode.isEmpty()) {
throw new IllegalCallerException("No registered writers");
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return;
}
int idToFlush;
if (lastIdFlushed == -1) {
idToFlush = first.id;
} else {
idToFlush = idToWriterNode.get(lastIdFlushed).next.id;
}
idToWriterNode.get(idToFlush).writer.flushNextBuffer();
lastIdFlushed = idToFlush;
}
int idToFlush;
if (lastIdFlushed == -1) {
idToFlush = first.id;
} else {
idToFlush = idToWriterNode.get(lastIdFlushed).next.id;
}
idToWriterNode.get(idToFlush).writer.flushNextBuffer();
lastIdFlushed = idToFlush;
lock.unlock();
}
long updateRAMAndGetDifference(int id) {
lock.lock();
long oldRAMBytesUsed = idToWriterNode.get(id).ram;
long newRAMBytesUsed = idToWriterNode.get(id).writer.ramBytesUsed();
lock.unlock();
return newRAMBytesUsed - oldRAMBytesUsed;
long getTotalRamTracker(int id) {
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return 0;
}
if (idToWriterNode.containsKey(id) == false) {
return totalRamTracker;
}
long oldRAMBytesUsed = idToWriterNode.get(id).ram;
long newRAMBytesUsed = idToWriterNode.get(id).writer.ramBytesUsed();
idToWriterNode.get(id).ram = newRAMBytesUsed;
totalRamTracker += newRAMBytesUsed - oldRAMBytesUsed;
return totalRamTracker;
}
}
private static class IndexWriterNode {

View File

@ -88,6 +88,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getUseCompoundFile");
getters.add("isCheckPendingFlushOnUpdate");
getters.add("getSoftDeletesField");
getters.add("getIndexWriterRAMManager");
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {