mirror of
https://github.com/apache/lucene.git
synced 2025-02-08 02:58:58 +00:00
LUCENE-4695: add LiveFieldValues, to get current (live/real-time) values for fields indexed after the last NRT reopen
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1438721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5cb1a812f0
commit
1a11ce5fd4
@ -73,12 +73,23 @@ New Features
|
||||
compresses term vectors into chunks of documents similarly to
|
||||
CompressingStoredFieldsFormat. (Adrien Grand)
|
||||
|
||||
* LUCENE-4695: Added LiveFieldValues utility class, for getting the
|
||||
current (live, real-time) value for any indexed doc/field. The
|
||||
class buffers recently indexed doc/field values until a new
|
||||
near-real-time reader is opened that contains those changes.
|
||||
(Robert Muir, Mike McCandless)
|
||||
|
||||
API Changes
|
||||
|
||||
* LUCENE-4709: FacetResultNode no longer has a residue field. (Shai Erera)
|
||||
|
||||
* LUCENE-4716: DrillDown.query now takes Occur, allowing to specify if
|
||||
categories should be OR'ed or AND'ed. (Shai Erera)
|
||||
|
||||
* LUCENE-4695: ReferenceManager.RefreshListener.afterRefresh now takes
|
||||
a boolean indicating whether a new reference was in fact opened, and
|
||||
a new beforeRefresh method notifies you when a refresh attempt is
|
||||
starting. (Robert Muir, Mike McCandless)
|
||||
|
||||
Bug Fixes
|
||||
|
||||
|
@ -0,0 +1,147 @@
|
||||
package org.apache.lucene.search;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.AtomicReader;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.IndexDocument;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.MergeState;
|
||||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.util.Counter;
|
||||
|
||||
/** Tracks live field values across NRT reader reopens.
|
||||
* This holds a map for all updated ids since
|
||||
* the last reader reopen. Once the NRT reader is reopened,
|
||||
* it prunes the map. This means you must reopen your NRT
|
||||
* reader periodically otherwise the RAM consumption of
|
||||
* this class will grow unbounded!
|
||||
*
|
||||
* <p>NOTE: you must ensure the same id is never updated at
|
||||
* the same time by two threads, because in this case you
|
||||
* cannot in general know which thread "won". */
|
||||
|
||||
public abstract class LiveFieldValues<T> implements ReferenceManager.RefreshListener, Closeable {
|
||||
|
||||
private volatile Map<String,T> current = new ConcurrentHashMap<String,T>();
|
||||
private volatile Map<String,T> old = new ConcurrentHashMap<String,T>();
|
||||
private final ReferenceManager<IndexSearcher> mgr;
|
||||
private final T missingValue;
|
||||
|
||||
public LiveFieldValues(ReferenceManager<IndexSearcher> mgr, T missingValue) {
|
||||
this.missingValue = missingValue;
|
||||
this.mgr = mgr;
|
||||
mgr.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
mgr.removeListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeRefresh() throws IOException {
|
||||
old = current;
|
||||
// Start sending all updates after this point to the new
|
||||
// map. While reopen is running, any lookup will first
|
||||
// try this new map, then fallback to old, then to the
|
||||
// current searcher:
|
||||
current = new ConcurrentHashMap<String,T>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterRefresh(boolean didRefresh) throws IOException {
|
||||
// Now drop all the old values because they are now
|
||||
// visible via the searcher that was just opened; if
|
||||
// didRefresh is false, it's possible old has some
|
||||
// entries in it, which is fine: it means they were
|
||||
// actually already included in the previously opened
|
||||
// reader. So we can safely clear old here:
|
||||
old = new ConcurrentHashMap<String,T>();
|
||||
}
|
||||
|
||||
/** Call this after you've successfully added a document
|
||||
* to the index, to record what value you just set the
|
||||
* field to. */
|
||||
public void add(String id, T value) {
|
||||
current.put(id, value);
|
||||
}
|
||||
|
||||
/** Call this after you've successfully deleted a document
|
||||
* from the index. */
|
||||
public void delete(String id) {
|
||||
current.put(id, missingValue);
|
||||
}
|
||||
|
||||
/** Returns the [approximate] number of id/value pairs
|
||||
* buffered in RAM. */
|
||||
public int size() {
|
||||
return current.size() + old.size();
|
||||
}
|
||||
|
||||
/** Returns the current value for this id, or null if the
|
||||
* id isn't in the index or was deleted. */
|
||||
public T get(String id) throws IOException {
|
||||
// First try to get the "live" value:
|
||||
T value = current.get(id);
|
||||
if (value == missingValue) {
|
||||
// Deleted but the deletion is not yet reflected in
|
||||
// the reader:
|
||||
return null;
|
||||
} else if (value != null) {
|
||||
return value;
|
||||
} else {
|
||||
value = old.get(id);
|
||||
if (value == missingValue) {
|
||||
// Deleted but the deletion is not yet reflected in
|
||||
// the reader:
|
||||
return null;
|
||||
} else if (value != null) {
|
||||
return value;
|
||||
} else {
|
||||
// It either does not exist in the index, or, it was
|
||||
// already flushed & NRT reader was opened on the
|
||||
// segment, so fallback to current searcher:
|
||||
IndexSearcher s = mgr.acquire();
|
||||
try {
|
||||
return lookupFromSearcher(s, id);
|
||||
} finally {
|
||||
mgr.release(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** This is called when the id/value was already flushed & opened
|
||||
* in an NRT IndexSearcher. You must implement this to
|
||||
* go look up the value (eg, via doc values, field cache,
|
||||
* stored fields, etc.). */
|
||||
protected abstract T lookupFromSearcher(IndexSearcher s, String id) throws IOException;
|
||||
}
|
||||
|
@ -151,6 +151,7 @@ public abstract class ReferenceManager<G> implements Closeable {
|
||||
try {
|
||||
final G reference = acquire();
|
||||
try {
|
||||
notifyRefreshListenersBefore();
|
||||
G newReference = refreshIfNeeded(reference);
|
||||
if (newReference != null) {
|
||||
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
|
||||
@ -165,11 +166,9 @@ public abstract class ReferenceManager<G> implements Closeable {
|
||||
}
|
||||
} finally {
|
||||
release(reference);
|
||||
notifyRefreshListenersRefreshed(refreshed);
|
||||
}
|
||||
afterMaybeRefresh();
|
||||
if (refreshed) {
|
||||
notifyRefreshListeners();
|
||||
}
|
||||
} finally {
|
||||
refreshLock.unlock();
|
||||
}
|
||||
@ -254,9 +253,15 @@ public abstract class ReferenceManager<G> implements Closeable {
|
||||
decRef(reference);
|
||||
}
|
||||
|
||||
private void notifyRefreshListeners() {
|
||||
private void notifyRefreshListenersBefore() throws IOException {
|
||||
for (RefreshListener refreshListener : refreshListeners) {
|
||||
refreshListener.afterRefresh();
|
||||
refreshListener.beforeRefresh();
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
|
||||
for (RefreshListener refreshListener : refreshListeners) {
|
||||
refreshListener.afterRefresh(didRefresh);
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,9 +289,13 @@ public abstract class ReferenceManager<G> implements Closeable {
|
||||
* finished. See {@link #addListener}. */
|
||||
public interface RefreshListener {
|
||||
|
||||
/**
|
||||
* Called after a successful refresh and a new reference has been installed. When this is called {@link #acquire()} is guaranteed to return a new instance.
|
||||
*/
|
||||
void afterRefresh();
|
||||
/** Called right before a refresh attempt starts. */
|
||||
void beforeRefresh() throws IOException;
|
||||
|
||||
/** Called after the attempted refresh; if the refresh
|
||||
* did open a new reference then didRefresh will be true
|
||||
* and {@link #acquire()} is guaranteed to return the new
|
||||
* reference. */
|
||||
void afterRefresh(boolean didRefresh) throws IOException;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,180 @@
|
||||
package org.apache.lucene.search;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.IntField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.StoredDocument;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util._TestUtil;
|
||||
|
||||
public class TestLiveFieldValues extends LuceneTestCase {
|
||||
public void test() throws Exception {
|
||||
|
||||
Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates"));
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
|
||||
final IndexWriter _w = new IndexWriter(dir, iwc);
|
||||
final TrackingIndexWriter w = new TrackingIndexWriter(_w);
|
||||
|
||||
final NRTManager mgr = new NRTManager(w, new SearcherFactory() {
|
||||
@Override
|
||||
public IndexSearcher newSearcher(IndexReader r) {
|
||||
return new IndexSearcher(r);
|
||||
}
|
||||
});
|
||||
|
||||
final Integer missing = -1;
|
||||
|
||||
final LiveFieldValues<Integer> rt = new LiveFieldValues<Integer>(mgr, missing) {
|
||||
@Override
|
||||
protected Integer lookupFromSearcher(IndexSearcher s, String id) throws IOException {
|
||||
TermQuery tq = new TermQuery(new Term("id", id));
|
||||
TopDocs hits = s.search(tq, 1);
|
||||
assertTrue(hits.totalHits <= 1);
|
||||
if (hits.totalHits == 0) {
|
||||
return null;
|
||||
} else {
|
||||
StoredDocument doc = s.doc(hits.scoreDocs[0].doc);
|
||||
return (Integer) doc.getField("field").numericValue();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
int numThreads = _TestUtil.nextInt(random(), 2, 5);
|
||||
if (VERBOSE) {
|
||||
System.out.println(numThreads + " threads");
|
||||
}
|
||||
|
||||
final CountDownLatch startingGun = new CountDownLatch(1);
|
||||
List<Thread> threads = new ArrayList<Thread>();
|
||||
|
||||
final int iters = atLeast(1000);
|
||||
final int idCount = _TestUtil.nextInt(random(), 100, 10000);
|
||||
|
||||
final double reopenChance = random().nextDouble()*0.01;
|
||||
final double deleteChance = random().nextDouble()*0.25;
|
||||
final double addChance = random().nextDouble()*0.5;
|
||||
|
||||
for(int t=0;t<numThreads;t++) {
|
||||
final int threadID = t;
|
||||
final Random threadRandom = new Random(random().nextLong());
|
||||
Thread thread = new Thread() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Map<String,Integer> values = new HashMap<String,Integer>();
|
||||
List<String> allIDs = Collections.synchronizedList(new ArrayList<String>());
|
||||
|
||||
startingGun.await();
|
||||
for(int iter=0; iter<iters;iter++) {
|
||||
// Add/update a document
|
||||
Document doc = new Document();
|
||||
// Threads must not update the same id at the
|
||||
// same time:
|
||||
if (threadRandom.nextDouble() <= addChance) {
|
||||
String id = String.format(Locale.ROOT, "%d_%04x", threadID, threadRandom.nextInt(idCount));
|
||||
Integer field = threadRandom.nextInt(Integer.MAX_VALUE);
|
||||
doc.add(new StringField("id", id, Field.Store.YES));
|
||||
doc.add(new IntField("field", field.intValue(), Field.Store.YES));
|
||||
w.updateDocument(new Term("id", id), doc);
|
||||
rt.add(id, field);
|
||||
if (values.put(id, field) == null) {
|
||||
allIDs.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
if (allIDs.size() > 0 && threadRandom.nextDouble() <= deleteChance) {
|
||||
String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
|
||||
w.deleteDocuments(new Term("id", randomID));
|
||||
rt.delete(randomID);
|
||||
values.put(randomID, missing);
|
||||
}
|
||||
|
||||
if (threadRandom.nextDouble() <= reopenChance || rt.size() > 10000) {
|
||||
//System.out.println("refresh @ " + rt.size());
|
||||
mgr.maybeRefresh();
|
||||
if (VERBOSE) {
|
||||
IndexSearcher s = mgr.acquire();
|
||||
try {
|
||||
System.out.println("TEST: reopen " + s);
|
||||
} finally {
|
||||
mgr.release(s);
|
||||
}
|
||||
System.out.println("TEST: " + values.size() + " values");
|
||||
}
|
||||
}
|
||||
|
||||
if (threadRandom.nextInt(10) == 7) {
|
||||
assertEquals(null, rt.get("foo"));
|
||||
}
|
||||
|
||||
if (allIDs.size() > 0) {
|
||||
String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
|
||||
Integer expected = values.get(randomID);
|
||||
if (expected == missing) {
|
||||
expected = null;
|
||||
}
|
||||
assertEquals("id=" + randomID, expected, rt.get(randomID));
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
startingGun.countDown();
|
||||
|
||||
for(Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
mgr.maybeRefresh();
|
||||
assertEquals(0, rt.size());
|
||||
|
||||
rt.close();
|
||||
mgr.close();
|
||||
_w.close();
|
||||
dir.close();
|
||||
}
|
||||
}
|
@ -423,8 +423,13 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
|
||||
NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
|
||||
sm.addListener(new ReferenceManager.RefreshListener() {
|
||||
@Override
|
||||
public void afterRefresh() {
|
||||
afterRefreshCalled.set(true);
|
||||
public void beforeRefresh() {
|
||||
}
|
||||
@Override
|
||||
public void afterRefresh(boolean didRefresh) {
|
||||
if (didRefresh) {
|
||||
afterRefreshCalled.set(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
iw.addDocument(new Document());
|
||||
|
@ -331,8 +331,13 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
|
||||
SearcherManager sm = new SearcherManager(iw, false, new SearcherFactory());
|
||||
sm.addListener(new ReferenceManager.RefreshListener() {
|
||||
@Override
|
||||
public void afterRefresh() {
|
||||
afterRefreshCalled.set(true);
|
||||
public void beforeRefresh() {
|
||||
}
|
||||
@Override
|
||||
public void afterRefresh(boolean didRefresh) {
|
||||
if (didRefresh) {
|
||||
afterRefreshCalled.set(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
iw.addDocument(new Document());
|
||||
|
Loading…
x
Reference in New Issue
Block a user