Removed PerWriterIndexRamManager

This commit is contained in:
Marc D'Mello 2024-11-27 02:01:47 +00:00
parent 80ea482e11
commit 1657e49154
6 changed files with 27 additions and 67 deletions

View File

@ -55,13 +55,13 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
}
@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {
long totalBytes = perWriterRamManager.getTotalBufferBytesUsed();
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
public void flushRamManager(IndexWriter writer) throws IOException {
IndexWriterRAMManager ramManager = writer.getConfig().indexWriterRAMManager;
if (ramManager.getRamBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
long totalBytes = ramManager.updateAndGetCurrentBytesUsed(writer.ramManagerId);
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
}
}
}

View File

@ -59,15 +59,11 @@ abstract class FlushPolicy {
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);
/**
* Chooses which writer should be flushed. Default implementation chooses the writer with most RAM
* usage
*
* @param ramManager the {@link IndexWriterRAMManager} being used to actually flush the writers
* Flushed a writer according to the FlushPolicy. NOTE: this doesn't necessarily mean the passed
* in writer will be flushed, and in most cases, this will actually be the case as the default
* policy is a round-robin policy
*/
public abstract void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException;
public abstract void flushRamManager(IndexWriter writer) throws IOException;
/** Called by DocumentsWriter to initialize the FlushPolicy */
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {

View File

@ -462,7 +462,8 @@ public class IndexWriter
}
};
private final IndexWriterRAMManager.PerWriterIndexWriterRAMManager indexWriterRAMManager;
/** The id that is associated with this writer for {@link IndexWriterRAMManager} */
public final int ramManagerId;
/**
* Expert: returns a readonly reader, covering all committed as well as un-committed changes to
@ -1213,9 +1214,7 @@ public class IndexWriter
writeLock = null;
}
}
this.indexWriterRAMManager =
new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(
this, config.getIndexWriterRAMManager());
this.ramManagerId = config.indexWriterRAMManager.registerWriter(this);
}
/** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */
@ -1370,7 +1369,7 @@ public class IndexWriter
*/
@Override
public void close() throws IOException {
indexWriterRAMManager.removeWriter();
config.indexWriterRAMManager.removeWriter(ramManagerId);
if (config.getCommitOnClose()) {
shutdown();
} else {
@ -2451,7 +2450,7 @@ public class IndexWriter
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
if (shouldClose(true)) {
indexWriterRAMManager.removeWriter();
config.indexWriterRAMManager.removeWriter(ramManagerId);
rollbackInternal();
}
}
@ -6019,7 +6018,7 @@ public class IndexWriter
seqNo = -seqNo;
processEvents(true);
}
indexWriterRAMManager.flushIfNecessary(config.flushPolicy);
config.flushPolicy.flushRamManager(this);
return seqNo;
}

View File

@ -63,7 +63,7 @@ public class IndexWriterRAMManager {
return idToWriter.flushRoundRobin();
}
/** Registers a writer and returns the associated ID, protected for testing */
/** Registers a writer can returns the associated ID */
protected int registerWriter(IndexWriter writer) {
int id = idGenerator.incrementAndGet();
idToWriter.addWriter(writer, id);
@ -75,42 +75,12 @@ public class IndexWriterRAMManager {
idToWriter.removeWriter(id);
}
private void flushIfNecessary(
FlushPolicy flushPolicy, PerWriterIndexWriterRAMManager perWriterRAMManager)
throws IOException {
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
flushPolicy.flushWriter(this, perWriterRAMManager);
}
}
private long updateAndGetCurrentBytesUsed(int id) {
return idToWriter.getTotalRamTracker(id);
}
/**
* For use in {@link IndexWriter}, manages communication with the {@link IndexWriterRAMManager}
* Will call {@link IndexWriter#ramBytesUsed()} for the writer id passed in, and then updates the
* total ram using that value and returns it
*/
public static class PerWriterIndexWriterRAMManager {
private final int id;
private final IndexWriterRAMManager manager;
PerWriterIndexWriterRAMManager(IndexWriter writer, IndexWriterRAMManager manager) {
id = manager.registerWriter(writer);
this.manager = manager;
}
void removeWriter() {
manager.removeWriter(id);
}
void flushIfNecessary(FlushPolicy flushPolicy) throws IOException {
manager.flushIfNecessary(flushPolicy, this);
}
long getTotalBufferBytesUsed() {
return manager.updateAndGetCurrentBytesUsed(id);
}
public long updateAndGetCurrentBytesUsed(int id) {
return idToWriter.getTotalRamTracker(id);
}
private static class LinkedIdToWriter {

View File

@ -3218,10 +3218,7 @@ public class TestIndexWriter extends LuceneTestCase {
}
@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {}
public void flushRamManager(IndexWriter writer) throws IOException {}
});
try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());

View File

@ -369,11 +369,9 @@ public class TestIndexWriterRAMManager extends LuceneTestCase {
public void onChange(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {}
@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {
long totalBytes = perWriterRamManager.getTotalBufferBytesUsed();
public void flushRamManager(IndexWriter writer) throws IOException {
IndexWriterRAMManager ramManager = writer.getConfig().indexWriterRAMManager;
long totalBytes = ramManager.updateAndGetCurrentBytesUsed(writer.ramManagerId);
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
int flushedId = ramManager.flushRoundRobin();
flushedWriters.add(flushedId);
@ -420,7 +418,7 @@ public class TestIndexWriterRAMManager extends LuceneTestCase {
}
@Override
protected int registerWriter(IndexWriter writer) {
public int registerWriter(IndexWriter writer) {
int id = super.registerWriter(writer);
events.add(new TestEventAndId(TestEvent.ADD, id));
return id;