From 1a11ce5fd4ffd5f087a872c404dbad9503cef207 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Fri, 25 Jan 2013 21:27:08 +0000 Subject: [PATCH] LUCENE-4695: add LiveFieldValues, to get current (live/real-time) values for fields indexed after the last NRT reopen git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1438721 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 11 ++ .../apache/lucene/search/LiveFieldValues.java | 147 ++++++++++++++ .../lucene/search/ReferenceManager.java | 27 ++- .../lucene/search/TestLiveFieldValues.java | 180 ++++++++++++++++++ .../apache/lucene/search/TestNRTManager.java | 9 +- .../lucene/search/TestSearcherManager.java | 9 +- 6 files changed, 370 insertions(+), 13 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java create mode 100644 lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index cec8a0ce42e..baca0d52ccc 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -73,12 +73,23 @@ New Features compresses term vectors into chunks of documents similarly to CompressingStoredFieldsFormat. (Adrien Grand) +* LUCENE-4695: Added LiveFieldValues utility class, for getting the + current (live, real-time) value for any indexed doc/field. The + class buffers recently indexed doc/field values until a new + near-real-time reader is opened that contains those changes. + (Robert Muir, Mike McCandless) + API Changes * LUCENE-4709: FacetResultNode no longer has a residue field. (Shai Erera) * LUCENE-4716: DrillDown.query now takes Occur, allowing to specify if categories should be OR'ed or AND'ed. (Shai Erera) + +* LUCENE-4695: ReferenceManager.RefreshListener.afterRefresh now takes + a boolean indicating whether a new reference was in fact opened, and + a new beforeRefresh method notifies you when a refresh attempt is + starting. (Robert Muir, Mike McCandless) Bug Fixes diff --git a/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java b/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java new file mode 100644 index 00000000000..8b7b045be2d --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java @@ -0,0 +1,147 @@ +package org.apache.lucene.search; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.IndexDocument; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.util.Counter; + +/** Tracks live field values across NRT reader reopens. + * This holds a map for all updated ids since + * the last reader reopen. Once the NRT reader is reopened, + * it prunes the map. This means you must reopen your NRT + * reader periodically otherwise the RAM consumption of + * this class will grow unbounded! + * + *

NOTE: you must ensure the same id is never updated at + * the same time by two threads, because in this case you + * cannot in general know which thread "won". */ + +public abstract class LiveFieldValues implements ReferenceManager.RefreshListener, Closeable { + + private volatile Map current = new ConcurrentHashMap(); + private volatile Map old = new ConcurrentHashMap(); + private final ReferenceManager mgr; + private final T missingValue; + + public LiveFieldValues(ReferenceManager mgr, T missingValue) { + this.missingValue = missingValue; + this.mgr = mgr; + mgr.addListener(this); + } + + @Override + public void close() { + mgr.removeListener(this); + } + + @Override + public void beforeRefresh() throws IOException { + old = current; + // Start sending all updates after this point to the new + // map. While reopen is running, any lookup will first + // try this new map, then fallback to old, then to the + // current searcher: + current = new ConcurrentHashMap(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + // Now drop all the old values because they are now + // visible via the searcher that was just opened; if + // didRefresh is false, it's possible old has some + // entries in it, which is fine: it means they were + // actually already included in the previously opened + // reader. So we can safely clear old here: + old = new ConcurrentHashMap(); + } + + /** Call this after you've successfully added a document + * to the index, to record what value you just set the + * field to. */ + public void add(String id, T value) { + current.put(id, value); + } + + /** Call this after you've successfully deleted a document + * from the index. */ + public void delete(String id) { + current.put(id, missingValue); + } + + /** Returns the [approximate] number of id/value pairs + * buffered in RAM. */ + public int size() { + return current.size() + old.size(); + } + + /** Returns the current value for this id, or null if the + * id isn't in the index or was deleted. */ + public T get(String id) throws IOException { + // First try to get the "live" value: + T value = current.get(id); + if (value == missingValue) { + // Deleted but the deletion is not yet reflected in + // the reader: + return null; + } else if (value != null) { + return value; + } else { + value = old.get(id); + if (value == missingValue) { + // Deleted but the deletion is not yet reflected in + // the reader: + return null; + } else if (value != null) { + return value; + } else { + // It either does not exist in the index, or, it was + // already flushed & NRT reader was opened on the + // segment, so fallback to current searcher: + IndexSearcher s = mgr.acquire(); + try { + return lookupFromSearcher(s, id); + } finally { + mgr.release(s); + } + } + } + } + + /** This is called when the id/value was already flushed & opened + * in an NRT IndexSearcher. You must implement this to + * go look up the value (eg, via doc values, field cache, + * stored fields, etc.). */ + protected abstract T lookupFromSearcher(IndexSearcher s, String id) throws IOException; +} + diff --git a/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java b/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java index 1973f0583cd..62a7fc3d6e0 100755 --- a/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java +++ b/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java @@ -151,6 +151,7 @@ public abstract class ReferenceManager implements Closeable { try { final G reference = acquire(); try { + notifyRefreshListenersBefore(); G newReference = refreshIfNeeded(reference); if (newReference != null) { assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed"; @@ -165,11 +166,9 @@ public abstract class ReferenceManager implements Closeable { } } finally { release(reference); + notifyRefreshListenersRefreshed(refreshed); } afterMaybeRefresh(); - if (refreshed) { - notifyRefreshListeners(); - } } finally { refreshLock.unlock(); } @@ -254,9 +253,15 @@ public abstract class ReferenceManager implements Closeable { decRef(reference); } - private void notifyRefreshListeners() { + private void notifyRefreshListenersBefore() throws IOException { for (RefreshListener refreshListener : refreshListeners) { - refreshListener.afterRefresh(); + refreshListener.beforeRefresh(); + } + } + + private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException { + for (RefreshListener refreshListener : refreshListeners) { + refreshListener.afterRefresh(didRefresh); } } @@ -284,9 +289,13 @@ public abstract class ReferenceManager implements Closeable { * finished. See {@link #addListener}. */ public interface RefreshListener { - /** - * Called after a successful refresh and a new reference has been installed. When this is called {@link #acquire()} is guaranteed to return a new instance. - */ - void afterRefresh(); + /** Called right before a refresh attempt starts. */ + void beforeRefresh() throws IOException; + + /** Called after the attempted refresh; if the refresh + * did open a new reference then didRefresh will be true + * and {@link #acquire()} is guaranteed to return the new + * reference. */ + void afterRefresh(boolean didRefresh) throws IOException; } } diff --git a/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java b/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java new file mode 100644 index 00000000000..d13f148ceac --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java @@ -0,0 +1,180 @@ +package org.apache.lucene.search; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.IntField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.StoredDocument; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.NRTManager.TrackingIndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; + +public class TestLiveFieldValues extends LuceneTestCase { + public void test() throws Exception { + + Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates")); + IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + + final IndexWriter _w = new IndexWriter(dir, iwc); + final TrackingIndexWriter w = new TrackingIndexWriter(_w); + + final NRTManager mgr = new NRTManager(w, new SearcherFactory() { + @Override + public IndexSearcher newSearcher(IndexReader r) { + return new IndexSearcher(r); + } + }); + + final Integer missing = -1; + + final LiveFieldValues rt = new LiveFieldValues(mgr, missing) { + @Override + protected Integer lookupFromSearcher(IndexSearcher s, String id) throws IOException { + TermQuery tq = new TermQuery(new Term("id", id)); + TopDocs hits = s.search(tq, 1); + assertTrue(hits.totalHits <= 1); + if (hits.totalHits == 0) { + return null; + } else { + StoredDocument doc = s.doc(hits.scoreDocs[0].doc); + return (Integer) doc.getField("field").numericValue(); + } + } + }; + + int numThreads = _TestUtil.nextInt(random(), 2, 5); + if (VERBOSE) { + System.out.println(numThreads + " threads"); + } + + final CountDownLatch startingGun = new CountDownLatch(1); + List threads = new ArrayList(); + + final int iters = atLeast(1000); + final int idCount = _TestUtil.nextInt(random(), 100, 10000); + + final double reopenChance = random().nextDouble()*0.01; + final double deleteChance = random().nextDouble()*0.25; + final double addChance = random().nextDouble()*0.5; + + for(int t=0;t values = new HashMap(); + List allIDs = Collections.synchronizedList(new ArrayList()); + + startingGun.await(); + for(int iter=0; iter 0 && threadRandom.nextDouble() <= deleteChance) { + String randomID = allIDs.get(threadRandom.nextInt(allIDs.size())); + w.deleteDocuments(new Term("id", randomID)); + rt.delete(randomID); + values.put(randomID, missing); + } + + if (threadRandom.nextDouble() <= reopenChance || rt.size() > 10000) { + //System.out.println("refresh @ " + rt.size()); + mgr.maybeRefresh(); + if (VERBOSE) { + IndexSearcher s = mgr.acquire(); + try { + System.out.println("TEST: reopen " + s); + } finally { + mgr.release(s); + } + System.out.println("TEST: " + values.size() + " values"); + } + } + + if (threadRandom.nextInt(10) == 7) { + assertEquals(null, rt.get("foo")); + } + + if (allIDs.size() > 0) { + String randomID = allIDs.get(threadRandom.nextInt(allIDs.size())); + Integer expected = values.get(randomID); + if (expected == missing) { + expected = null; + } + assertEquals("id=" + randomID, expected, rt.get(randomID)); + } + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }; + threads.add(thread); + thread.start(); + } + + startingGun.countDown(); + + for(Thread thread : threads) { + thread.join(); + } + mgr.maybeRefresh(); + assertEquals(0, rt.size()); + + rt.close(); + mgr.close(); + _w.close(); + dir.close(); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java b/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java index 38cc749051c..ded7c9d9159 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java @@ -423,8 +423,13 @@ public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase { NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory()); sm.addListener(new ReferenceManager.RefreshListener() { @Override - public void afterRefresh() { - afterRefreshCalled.set(true); + public void beforeRefresh() { + } + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + afterRefreshCalled.set(true); + } } }); iw.addDocument(new Document()); diff --git a/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java b/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java index 5a0cd46e0cc..9306f0a9d9b 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java @@ -331,8 +331,13 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase { SearcherManager sm = new SearcherManager(iw, false, new SearcherFactory()); sm.addListener(new ReferenceManager.RefreshListener() { @Override - public void afterRefresh() { - afterRefreshCalled.set(true); + public void beforeRefresh() { + } + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + afterRefreshCalled.set(true); + } } }); iw.addDocument(new Document());