mirror of https://github.com/apache/lucene.git
remove TrackingIndexWriter
This commit is contained in:
parent
e4a21330a2
commit
c9aaa77182
|
@ -4960,4 +4960,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nocommit javadocs
|
||||||
|
public long getLastSequenceNumber() {
|
||||||
|
ensureOpen();
|
||||||
|
return docWriter.deleteQueue.seqNo.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,153 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.lucene.index;
|
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.lucene.analysis.Analyzer;
|
|
||||||
import org.apache.lucene.search.ControlledRealTimeReopenThread; // javadocs
|
|
||||||
import org.apache.lucene.search.Query;
|
|
||||||
import org.apache.lucene.store.Directory;
|
|
||||||
|
|
||||||
/** Class that tracks changes to a delegated
|
|
||||||
* IndexWriter, used by {@link
|
|
||||||
* ControlledRealTimeReopenThread} to ensure specific
|
|
||||||
* changes are visible. Create this class (passing your
|
|
||||||
* IndexWriter), and then pass this class to {@link
|
|
||||||
* ControlledRealTimeReopenThread}.
|
|
||||||
* Be sure to make all changes via the
|
|
||||||
* TrackingIndexWriter, otherwise {@link
|
|
||||||
* ControlledRealTimeReopenThread} won't know about the changes.
|
|
||||||
*
|
|
||||||
* @lucene.experimental */
|
|
||||||
|
|
||||||
// nocommit removeme
|
|
||||||
public class TrackingIndexWriter {
|
|
||||||
private final IndexWriter writer;
|
|
||||||
private final AtomicLong indexingGen = new AtomicLong(1);
|
|
||||||
|
|
||||||
/** Create a {@code TrackingIndexWriter} wrapping the
|
|
||||||
* provided {@link IndexWriter}. */
|
|
||||||
public TrackingIndexWriter(IndexWriter writer) {
|
|
||||||
this.writer = writer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link
|
|
||||||
* IndexWriter#updateDocument(Term,Iterable)} and
|
|
||||||
* returns the generation that reflects this change. */
|
|
||||||
public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
|
|
||||||
writer.updateDocument(t, d);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link
|
|
||||||
* IndexWriter#updateDocuments(Term,Iterable)} and returns
|
|
||||||
* the generation that reflects this change. */
|
|
||||||
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
|
|
||||||
writer.updateDocuments(t, docs);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
|
|
||||||
* returns the generation that reflects this change. */
|
|
||||||
public long deleteDocuments(Term... terms) throws IOException {
|
|
||||||
writer.deleteDocuments(terms);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#deleteDocuments(Query...)}
|
|
||||||
* and returns the generation that reflects this change. */
|
|
||||||
public long deleteDocuments(Query... queries) throws IOException {
|
|
||||||
writer.deleteDocuments(queries);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#deleteAll} and returns the
|
|
||||||
* generation that reflects this change. */
|
|
||||||
public long deleteAll() throws IOException {
|
|
||||||
writer.deleteAll();
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#addDocument(Iterable)}
|
|
||||||
* and returns the generation that reflects this change. */
|
|
||||||
public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
|
|
||||||
writer.addDocument(d);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#addDocuments(Iterable)} and
|
|
||||||
* returns the generation that reflects this change. */
|
|
||||||
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
|
|
||||||
writer.addDocuments(docs);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#addIndexes(Directory...)} and
|
|
||||||
* returns the generation that reflects this change. */
|
|
||||||
public long addIndexes(Directory... dirs) throws IOException {
|
|
||||||
writer.addIndexes(dirs);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls {@link IndexWriter#addIndexes(CodecReader...)}
|
|
||||||
* and returns the generation that reflects this change. */
|
|
||||||
public long addIndexes(CodecReader... readers) throws IOException {
|
|
||||||
writer.addIndexes(readers);
|
|
||||||
// Return gen as of when indexing finished:
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return the current generation being indexed. */
|
|
||||||
public long getGeneration() {
|
|
||||||
return indexingGen.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return the wrapped {@link IndexWriter}. */
|
|
||||||
public IndexWriter getIndexWriter() {
|
|
||||||
return writer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Return and increment current gen.
|
|
||||||
*
|
|
||||||
* @lucene.internal */
|
|
||||||
public long getAndIncrementGeneration() {
|
|
||||||
return indexingGen.getAndIncrement();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Cals {@link
|
|
||||||
* IndexWriter#tryDeleteDocument(IndexReader,int)} and
|
|
||||||
* returns the generation that reflects this change. */
|
|
||||||
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
|
|
||||||
if (writer.tryDeleteDocument(reader, docID) != -1) {
|
|
||||||
return indexingGen.get();
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -23,16 +23,11 @@ import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.TrackingIndexWriter;
|
|
||||||
import org.apache.lucene.util.ThreadInterruptedException;
|
import org.apache.lucene.util.ThreadInterruptedException;
|
||||||
|
|
||||||
/** Utility class that runs a thread to manage periodicc
|
/** Utility class that runs a thread to manage periodicc
|
||||||
* reopens of a {@link ReferenceManager}, with methods to wait for a specific
|
* reopens of a {@link ReferenceManager}, with methods to wait for a specific
|
||||||
* index changes to become visible. To use this class you
|
* index changes to become visible. When a given search request needs to see a specific
|
||||||
* must first wrap your {@link IndexWriter} with a {@link
|
|
||||||
* TrackingIndexWriter} and always use it to make changes
|
|
||||||
* to the index, saving the returned generation. Then,
|
|
||||||
* when a given search request needs to see a specific
|
|
||||||
* index change, call the {#waitForGeneration} to wait for
|
* index change, call the {#waitForGeneration} to wait for
|
||||||
* that change to be visible. Note that this will only
|
* that change to be visible. Note that this will only
|
||||||
* scale well if most searches do not need to wait for a
|
* scale well if most searches do not need to wait for a
|
||||||
|
@ -44,7 +39,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
|
||||||
private final ReferenceManager<T> manager;
|
private final ReferenceManager<T> manager;
|
||||||
private final long targetMaxStaleNS;
|
private final long targetMaxStaleNS;
|
||||||
private final long targetMinStaleNS;
|
private final long targetMinStaleNS;
|
||||||
private final TrackingIndexWriter writer;
|
private final IndexWriter writer;
|
||||||
private volatile boolean finish;
|
private volatile boolean finish;
|
||||||
private volatile long waitingGen;
|
private volatile long waitingGen;
|
||||||
private volatile long searchingGen;
|
private volatile long searchingGen;
|
||||||
|
@ -69,7 +64,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
|
||||||
* is waiting for a specific generation to
|
* is waiting for a specific generation to
|
||||||
* become visible.
|
* become visible.
|
||||||
*/
|
*/
|
||||||
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
|
public ControlledRealTimeReopenThread(IndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
|
||||||
if (targetMaxStaleSec < targetMinStaleSec) {
|
if (targetMaxStaleSec < targetMinStaleSec) {
|
||||||
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
|
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
|
||||||
}
|
}
|
||||||
|
@ -155,7 +150,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
|
||||||
* or false if maxMS wait time was exceeded
|
* or false if maxMS wait time was exceeded
|
||||||
*/
|
*/
|
||||||
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
|
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
|
||||||
final long curGen = writer.getGeneration();
|
final long curGen = writer.getLastSequenceNumber();
|
||||||
if (targetGen > curGen) {
|
if (targetGen > curGen) {
|
||||||
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
|
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
|
||||||
}
|
}
|
||||||
|
@ -240,7 +235,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
|
||||||
// Save the gen as of when we started the reopen; the
|
// Save the gen as of when we started the reopen; the
|
||||||
// listener (HandleRefresh above) copies this to
|
// listener (HandleRefresh above) copies this to
|
||||||
// searchingGen once the reopen completes:
|
// searchingGen once the reopen completes:
|
||||||
refreshStartGen = writer.getAndIncrementGeneration();
|
refreshStartGen = writer.getLastSequenceNumber();
|
||||||
try {
|
try {
|
||||||
manager.maybeRefreshBlocking();
|
manager.maybeRefreshBlocking();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
|
|
@ -79,8 +79,6 @@ public class TestTryDelete extends LuceneTestCase
|
||||||
ReferenceManager<IndexSearcher> mgr = new SearcherManager(writer,
|
ReferenceManager<IndexSearcher> mgr = new SearcherManager(writer,
|
||||||
new SearcherFactory());
|
new SearcherFactory());
|
||||||
|
|
||||||
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
|
|
||||||
|
|
||||||
IndexSearcher searcher = mgr.acquire();
|
IndexSearcher searcher = mgr.acquire();
|
||||||
|
|
||||||
TopDocs topDocs = searcher.search(new TermQuery(new Term("foo", "0")),
|
TopDocs topDocs = searcher.search(new TermQuery(new Term("foo", "0")),
|
||||||
|
@ -90,10 +88,10 @@ public class TestTryDelete extends LuceneTestCase
|
||||||
long result;
|
long result;
|
||||||
if (random().nextBoolean()) {
|
if (random().nextBoolean()) {
|
||||||
IndexReader r = DirectoryReader.open(writer);
|
IndexReader r = DirectoryReader.open(writer);
|
||||||
result = mgrWriter.tryDeleteDocument(r, 0);
|
result = writer.tryDeleteDocument(r, 0);
|
||||||
r.close();
|
r.close();
|
||||||
} else {
|
} else {
|
||||||
result = mgrWriter.tryDeleteDocument(searcher.getIndexReader(), 0);
|
result = writer.tryDeleteDocument(searcher.getIndexReader(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// The tryDeleteDocument should have succeeded:
|
// The tryDeleteDocument should have succeeded:
|
||||||
|
@ -132,10 +130,9 @@ public class TestTryDelete extends LuceneTestCase
|
||||||
100);
|
100);
|
||||||
assertEquals(1, topDocs.totalHits);
|
assertEquals(1, topDocs.totalHits);
|
||||||
|
|
||||||
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
|
long result = writer.tryDeleteDocument(DirectoryReader.open(writer), 0);
|
||||||
long result = mgrWriter.tryDeleteDocument(DirectoryReader.open(writer), 0);
|
|
||||||
|
|
||||||
assertEquals(1, result);
|
assertTrue(result != -1);
|
||||||
|
|
||||||
writer.commit();
|
writer.commit();
|
||||||
|
|
||||||
|
@ -175,11 +172,9 @@ public class TestTryDelete extends LuceneTestCase
|
||||||
100);
|
100);
|
||||||
assertEquals(1, topDocs.totalHits);
|
assertEquals(1, topDocs.totalHits);
|
||||||
|
|
||||||
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
|
long result = writer.deleteDocuments(new TermQuery(new Term("foo", "0")));
|
||||||
long result = mgrWriter.deleteDocuments(new TermQuery(new Term("foo",
|
|
||||||
"0")));
|
|
||||||
|
|
||||||
assertEquals(1, result);
|
assertTrue(result != -1);
|
||||||
|
|
||||||
// writer.commit();
|
// writer.commit();
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.lucene.index.RandomIndexWriter;
|
||||||
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
|
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
|
||||||
import org.apache.lucene.index.TrackingIndexWriter;
|
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.NRTCachingDirectory;
|
import org.apache.lucene.store.NRTCachingDirectory;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
@ -57,7 +56,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
// Is guaranteed to reflect deletes:
|
// Is guaranteed to reflect deletes:
|
||||||
private SearcherManager nrtDeletes;
|
private SearcherManager nrtDeletes;
|
||||||
|
|
||||||
private TrackingIndexWriter genWriter;
|
private IndexWriter genWriter;
|
||||||
|
|
||||||
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
|
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
|
||||||
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
|
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
|
||||||
|
@ -219,7 +218,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
|
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
|
||||||
}
|
}
|
||||||
|
|
||||||
genWriter = new TrackingIndexWriter(writer);
|
genWriter = writer;
|
||||||
|
|
||||||
final SearcherFactory sf = new SearcherFactory() {
|
final SearcherFactory sf = new SearcherFactory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -311,9 +310,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
final CountDownLatch signal = new CountDownLatch(1);
|
final CountDownLatch signal = new CountDownLatch(1);
|
||||||
|
|
||||||
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
|
LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
|
||||||
final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
|
final SearcherManager manager = new SearcherManager(writer, false, false, null);
|
||||||
final SearcherManager manager = new SearcherManager(_writer, false, false, null);
|
|
||||||
Document doc = new Document();
|
Document doc = new Document();
|
||||||
doc.add(newTextField("test", "test", Field.Store.YES));
|
doc.add(newTextField("test", "test", Field.Store.YES));
|
||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
|
@ -334,7 +332,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
t.start();
|
t.start();
|
||||||
_writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
|
writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
|
||||||
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
|
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
|
||||||
|
|
||||||
assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue
|
assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue
|
||||||
|
@ -373,7 +371,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
}
|
}
|
||||||
thread.close();
|
thread.close();
|
||||||
thread.join();
|
thread.join();
|
||||||
_writer.close();
|
writer.close();
|
||||||
IOUtils.close(manager, d);
|
IOUtils.close(manager, d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,9 +481,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
|
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
|
||||||
final IndexWriter iw = new IndexWriter(dir, config);
|
final IndexWriter iw = new IndexWriter(dir, config);
|
||||||
SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
|
SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
|
||||||
final TrackingIndexWriter tiw = new TrackingIndexWriter(iw);
|
|
||||||
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
|
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
|
||||||
new ControlledRealTimeReopenThread<>(tiw, sm, maxStaleSecs, 0);
|
new ControlledRealTimeReopenThread<>(iw, sm, maxStaleSecs, 0);
|
||||||
|
|
||||||
controlledRealTimeReopenThread.setDaemon(true);
|
controlledRealTimeReopenThread.setDaemon(true);
|
||||||
controlledRealTimeReopenThread.start();
|
controlledRealTimeReopenThread.start();
|
||||||
|
@ -517,7 +514,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
|
||||||
d.add(new TextField("count", i + "", Field.Store.NO));
|
d.add(new TextField("count", i + "", Field.Store.NO));
|
||||||
d.add(new TextField("content", content, Field.Store.YES));
|
d.add(new TextField("content", content, Field.Store.YES));
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
long l = tiw.addDocument(d);
|
long l = iw.addDocument(d);
|
||||||
controlledRealTimeReopenThread.waitForGeneration(l);
|
controlledRealTimeReopenThread.waitForGeneration(l);
|
||||||
long wait = System.currentTimeMillis() - start;
|
long wait = System.currentTimeMillis() - start;
|
||||||
assertTrue("waited too long for generation " + wait,
|
assertTrue("waited too long for generation " + wait,
|
||||||
|
|
Loading…
Reference in New Issue