mirror of https://github.com/apache/lucene.git
SOLR-559: use Lucene updateDocument, deleteDocuments
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@655190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a58ceee76
commit
14d84045cc
|
@ -253,6 +253,11 @@ New Features
|
||||||
46. SOLR-557: Added SolrCore.getSearchComponents() to return an unmodifiable Map. (gsingers)
|
46. SOLR-557: Added SolrCore.getSearchComponents() to return an unmodifiable Map. (gsingers)
|
||||||
|
|
||||||
Changes in runtime behavior
|
Changes in runtime behavior
|
||||||
|
1. SOLR-559: use Lucene updateDocument, deleteDocuments methods. This
|
||||||
|
removes the maxBufferedDeletes parameter added by SOLR-310 as Lucene
|
||||||
|
now manages the deletes. This provides slightly better indexing
|
||||||
|
performance and makes overwrites atomic, eliminating the possibility of
|
||||||
|
a crash causing duplicates. (yonik)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
1. SOLR-276: improve JSON writer speed. (yonik)
|
1. SOLR-276: improve JSON writer speed. (yonik)
|
||||||
|
|
|
@ -125,14 +125,7 @@
|
||||||
org.apache.solr.(search|update|request|core|analysis)
|
org.apache.solr.(search|update|request|core|analysis)
|
||||||
-->
|
-->
|
||||||
|
|
||||||
<!-- Limit the number of deletions Solr will buffer during doc updating.
|
|
||||||
|
|
||||||
Setting this lower can help bound memory use during indexing.
|
|
||||||
-->
|
|
||||||
<maxPendingDeletes>100000</maxPendingDeletes>
|
|
||||||
|
|
||||||
<!-- Perform a <commit/> automatically under certain conditions:
|
<!-- Perform a <commit/> automatically under certain conditions:
|
||||||
|
|
||||||
maxDocs - number of updates since last commit is greater than this
|
maxDocs - number of updates since last commit is greater than this
|
||||||
maxTime - oldest uncommited update (in ms) is this long ago
|
maxTime - oldest uncommited update (in ms) is this long ago
|
||||||
<autoCommit>
|
<autoCommit>
|
||||||
|
|
|
@ -21,14 +21,9 @@
|
||||||
package org.apache.solr.update;
|
package org.apache.solr.update;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.Term;
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
|
||||||
import org.apache.lucene.index.TermDocs;
|
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||||
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -49,7 +44,6 @@ import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
import org.apache.solr.core.SolrConfig;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
|
* <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
|
||||||
|
@ -141,15 +135,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
// tracks when auto-commit should occur
|
// tracks when auto-commit should occur
|
||||||
protected final CommitTracker tracker;
|
protected final CommitTracker tracker;
|
||||||
|
|
||||||
// The key is the id, the value (Integer) is the number
|
|
||||||
// of docs to save (delete all except the last "n" added)
|
|
||||||
protected final Map<String,Integer> pset;
|
|
||||||
protected int maxPendingDeletes = -1;
|
protected int maxPendingDeletes = -1;
|
||||||
|
|
||||||
// commonly used constants for the count in the pset
|
|
||||||
protected final static Integer ZERO = 0;
|
|
||||||
protected final static Integer ONE = 1;
|
|
||||||
|
|
||||||
// iwCommit protects internal data and open/close of the IndexWriter and
|
// iwCommit protects internal data and open/close of the IndexWriter and
|
||||||
// is a mutex. Any use of the index writer should be protected by iwAccess,
|
// is a mutex. Any use of the index writer should be protected by iwAccess,
|
||||||
// which admits multiple simultaneous acquisitions. iwAccess is
|
// which admits multiple simultaneous acquisitions. iwAccess is
|
||||||
|
@ -161,10 +148,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
|
|
||||||
public DirectUpdateHandler2(SolrCore core) throws IOException {
|
public DirectUpdateHandler2(SolrCore core) throws IOException {
|
||||||
super(core);
|
super(core);
|
||||||
/* A TreeMap is used to maintain the natural ordering of the document ids,
|
|
||||||
which makes commits more efficient
|
|
||||||
*/
|
|
||||||
pset = new TreeMap<String,Integer>();
|
|
||||||
maxPendingDeletes = core.getSolrConfig().getInt("updateHandler/maxPendingDeletes", -1);
|
maxPendingDeletes = core.getSolrConfig().getInt("updateHandler/maxPendingDeletes", -1);
|
||||||
|
|
||||||
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||||
|
@ -179,7 +162,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
core.log.info(core.getLogId()+"REMOVING ALL DOCUMENTS FROM INDEX");
|
core.log.info(core.getLogId()+"REMOVING ALL DOCUMENTS FROM INDEX");
|
||||||
closeWriter();
|
closeWriter();
|
||||||
closeSearcher();
|
closeSearcher();
|
||||||
pset.clear(); // ignore docs marked for deletion since we are removing all
|
|
||||||
writer = createMainIndexWriter("DirectUpdateHandler2", true);
|
writer = createMainIndexWriter("DirectUpdateHandler2", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,45 +220,26 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
// state. This is safe as all other state-changing operations are
|
// state. This is safe as all other state-changing operations are
|
||||||
// protected with iwCommit (which iwAccess excludes from this block).
|
// protected with iwCommit (which iwAccess excludes from this block).
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!cmd.allowDups && !cmd.overwritePending && !cmd.overwriteCommitted) {
|
|
||||||
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"unsupported param combo:" + cmd);
|
|
||||||
// this would need a reader to implement (to be able to check committed
|
|
||||||
// before adding.)
|
|
||||||
// return addNoOverwriteNoDups(cmd);
|
|
||||||
} else if (!cmd.allowDups && !cmd.overwritePending && cmd.overwriteCommitted) {
|
|
||||||
rc = addConditionally(cmd);
|
|
||||||
} else if (!cmd.allowDups && cmd.overwritePending && !cmd.overwriteCommitted) {
|
|
||||||
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"unsupported param combo:" + cmd);
|
|
||||||
} else if (!cmd.allowDups && cmd.overwritePending && cmd.overwriteCommitted) {
|
|
||||||
rc = overwriteBoth(cmd);
|
|
||||||
} else if (cmd.allowDups && !cmd.overwritePending && !cmd.overwriteCommitted) {
|
|
||||||
rc = allowDups(cmd);
|
|
||||||
} else if (cmd.allowDups && !cmd.overwritePending && cmd.overwriteCommitted) {
|
|
||||||
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"unsupported param combo:" + cmd);
|
|
||||||
} else if (cmd.allowDups && cmd.overwritePending && !cmd.overwriteCommitted) {
|
|
||||||
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"unsupported param combo:" + cmd);
|
|
||||||
} else if (cmd.allowDups && cmd.overwritePending && cmd.overwriteCommitted) {
|
|
||||||
rc = overwriteBoth(cmd);
|
|
||||||
}
|
|
||||||
if (rc == -1)
|
|
||||||
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"unsupported param combo:" + cmd);
|
|
||||||
|
|
||||||
if (rc == 1) {
|
|
||||||
// adding document -- prep writer
|
// adding document -- prep writer
|
||||||
closeSearcher();
|
closeSearcher();
|
||||||
openWriter();
|
openWriter();
|
||||||
tracker.addedDocument();
|
tracker.addedDocument();
|
||||||
} else {
|
|
||||||
// exit prematurely
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
} // end synchronized block
|
} // end synchronized block
|
||||||
|
|
||||||
// this is the only unsynchronized code in the iwAccess block, which
|
// this is the only unsynchronized code in the iwAccess block, which
|
||||||
// should account for most of the time
|
// should account for most of the time
|
||||||
assert(rc == 1);
|
|
||||||
writer.addDocument(cmd.doc);
|
|
||||||
|
|
||||||
|
if (cmd.overwriteCommitted || cmd.overwritePending) {
|
||||||
|
if (cmd.indexedId == null) {
|
||||||
|
cmd.indexedId = getIndexedId(cmd.doc);
|
||||||
|
}
|
||||||
|
writer.updateDocument(idTerm.createTerm(cmd.indexedId), cmd.getLuceneDocument(schema));
|
||||||
|
} else {
|
||||||
|
// allow duplicates
|
||||||
|
writer.addDocument(cmd.getLuceneDocument(schema));
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = 1;
|
||||||
} finally {
|
} finally {
|
||||||
iwAccess.unlock();
|
iwAccess.unlock();
|
||||||
if (rc!=1) {
|
if (rc!=1) {
|
||||||
|
@ -286,17 +249,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
numDocsPending.incrementAndGet();
|
numDocsPending.incrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (maxPendingDeletes > 0 && pset.size() > maxPendingDeletes) {
|
|
||||||
iwCommit.lock();
|
|
||||||
try {
|
|
||||||
// note: this may be entered multiple times since the synchro is
|
|
||||||
// inside the if(), but doDeletions() is a cheap no-op if it has
|
|
||||||
// already executed
|
|
||||||
doDeletions();
|
|
||||||
} finally {
|
|
||||||
iwCommit.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,7 +272,9 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
|
|
||||||
iwCommit.lock();
|
iwCommit.lock();
|
||||||
try {
|
try {
|
||||||
pset.put(idFieldType.toInternal(cmd.id), ZERO);
|
closeSearcher();
|
||||||
|
openWriter();
|
||||||
|
writer.deleteDocuments(idTerm.createTerm(idFieldType.toInternal(cmd.id)));
|
||||||
} finally {
|
} finally {
|
||||||
iwCommit.unlock();
|
iwCommit.unlock();
|
||||||
}
|
}
|
||||||
|
@ -358,10 +313,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
if (delAll) {
|
if (delAll) {
|
||||||
deleteAll();
|
deleteAll();
|
||||||
} else {
|
} else {
|
||||||
// we need to do much of the commit logic (mainly doing queued
|
|
||||||
// deletes since deleteByQuery can throw off our counts.
|
|
||||||
doDeletions();
|
|
||||||
|
|
||||||
closeWriter();
|
closeWriter();
|
||||||
openSearcher();
|
openSearcher();
|
||||||
|
|
||||||
|
@ -396,128 +347,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////
|
|
||||||
/////////////////// helper method for each add type ///////////////
|
|
||||||
///////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
// methods return 1 if the document is to be added; 0 otherwise.
|
|
||||||
// methods must be called in synchronized context
|
|
||||||
|
|
||||||
protected int addConditionally(AddUpdateCommand cmd) throws IOException {
|
|
||||||
if (cmd.indexedId ==null) {
|
|
||||||
cmd.indexedId =getIndexedId(cmd.doc);
|
|
||||||
}
|
|
||||||
Integer saveCount = pset.get(cmd.indexedId);
|
|
||||||
if (saveCount!=null && saveCount!=0) {
|
|
||||||
// a doc with this id already exists in the pending set
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
pset.put(cmd.indexedId, ONE);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// overwrite both pending and committed
|
|
||||||
protected int overwriteBoth(AddUpdateCommand cmd) throws IOException {
|
|
||||||
if (cmd.indexedId ==null) {
|
|
||||||
cmd.indexedId =getIndexedId(cmd.doc);
|
|
||||||
}
|
|
||||||
pset.put(cmd.indexedId, ONE);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// add without checking
|
|
||||||
protected int allowDups(AddUpdateCommand cmd) throws IOException {
|
|
||||||
if (cmd.indexedId ==null) {
|
|
||||||
cmd.indexedId =getIndexedIdOptional(cmd.doc);
|
|
||||||
}
|
|
||||||
if (cmd.indexedId != null) {
|
|
||||||
Integer saveCount = pset.get(cmd.indexedId);
|
|
||||||
|
|
||||||
// if there weren't any docs marked for deletion before, then don't mark
|
|
||||||
// any for deletion now.
|
|
||||||
if (saveCount == null) return 1;
|
|
||||||
|
|
||||||
// If there were docs marked for deletion, then increment the number of
|
|
||||||
// docs to save at the end.
|
|
||||||
|
|
||||||
// the following line is optional, but it saves an allocation in the common case.
|
|
||||||
if (saveCount == ZERO) saveCount=ONE;
|
|
||||||
else saveCount++;
|
|
||||||
|
|
||||||
pset.put(cmd.indexedId, saveCount);
|
|
||||||
}
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// do all needed deletions.
|
|
||||||
// call with iwCommit lock held
|
|
||||||
//
|
|
||||||
protected void doDeletions() throws IOException {
|
|
||||||
int[] docnums = new int[0];
|
|
||||||
|
|
||||||
if (pset.size() > 0) { // optimization: only open searcher if there is something to delete...
|
|
||||||
log.info("DirectUpdateHandler2 deleting and removing dups for " + pset.size() +" ids");
|
|
||||||
int numDeletes=0;
|
|
||||||
|
|
||||||
closeWriter();
|
|
||||||
openSearcher();
|
|
||||||
IndexReader reader = searcher.getReader();
|
|
||||||
TermDocs tdocs = reader.termDocs();
|
|
||||||
String fieldname = idField.getName();
|
|
||||||
|
|
||||||
for (Map.Entry<String,Integer> entry : pset.entrySet()) {
|
|
||||||
String id = entry.getKey();
|
|
||||||
int saveLast = entry.getValue(); // save the last "saveLast" documents
|
|
||||||
|
|
||||||
//expand our array that keeps track of docs if needed.
|
|
||||||
if (docnums==null || saveLast > docnums.length) {
|
|
||||||
docnums = new int[saveLast];
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize all docnums in the list to -1 (unused)
|
|
||||||
for (int i=0; i<saveLast; i++) {
|
|
||||||
docnums[i] = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdocs.seek(new Term(fieldname,id));
|
|
||||||
|
|
||||||
//
|
|
||||||
// record the docs for this term in the "docnums" array and wrap around
|
|
||||||
// at size "saveLast". If we reuse a slot in the array, then we delete
|
|
||||||
// the doc that was there from the index.
|
|
||||||
//
|
|
||||||
int pos=0;
|
|
||||||
while (tdocs.next()) {
|
|
||||||
if (saveLast==0) {
|
|
||||||
// special case - delete all the docs as we see them.
|
|
||||||
reader.deleteDocument(tdocs.doc());
|
|
||||||
numDeletes++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int prev=docnums[pos];
|
|
||||||
docnums[pos]=tdocs.doc();
|
|
||||||
if (prev != -1) {
|
|
||||||
reader.deleteDocument(prev);
|
|
||||||
numDeletes++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (++pos >= saveLast) pos=0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// should we ever shrink it again, or just clear it?
|
|
||||||
pset.clear();
|
|
||||||
log.info("DirectUpdateHandler2 docs deleted=" + numDeletes);
|
|
||||||
numDocsDeleted.addAndGet(numDeletes);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public void commit(CommitUpdateCommand cmd) throws IOException {
|
public void commit(CommitUpdateCommand cmd) throws IOException {
|
||||||
|
|
||||||
|
@ -536,7 +365,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
iwCommit.lock();
|
iwCommit.lock();
|
||||||
try {
|
try {
|
||||||
log.info("start "+cmd);
|
log.info("start "+cmd);
|
||||||
doDeletions();
|
|
||||||
|
|
||||||
if (cmd.optimize) {
|
if (cmd.optimize) {
|
||||||
closeSearcher();
|
closeSearcher();
|
||||||
|
@ -597,7 +425,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
tracker.pending = null;
|
tracker.pending = null;
|
||||||
}
|
}
|
||||||
tracker.scheduler.shutdown();
|
tracker.scheduler.shutdown();
|
||||||
doDeletions();
|
|
||||||
closeSearcher();
|
closeSearcher();
|
||||||
closeWriter();
|
closeWriter();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -786,7 +613,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
||||||
lst.add("optimizes", optimizeCommands.get());
|
lst.add("optimizes", optimizeCommands.get());
|
||||||
lst.add("docsPending", numDocsPending.get());
|
lst.add("docsPending", numDocsPending.get());
|
||||||
// pset.size() not synchronized, but it should be fine to access.
|
// pset.size() not synchronized, but it should be fine to access.
|
||||||
lst.add("deletesPending", pset.size());
|
// lst.add("deletesPending", pset.size());
|
||||||
lst.add("adds", addCommands.get());
|
lst.add("adds", addCommands.get());
|
||||||
lst.add("deletesById", deleteByIdCommands.get());
|
lst.add("deletesById", deleteByIdCommands.get());
|
||||||
lst.add("deletesByQuery", deleteByQueryCommands.get());
|
lst.add("deletesByQuery", deleteByQueryCommands.get());
|
||||||
|
|
|
@ -56,6 +56,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
|
||||||
|
|
||||||
protected final SchemaField idField;
|
protected final SchemaField idField;
|
||||||
protected final FieldType idFieldType;
|
protected final FieldType idFieldType;
|
||||||
|
protected final Term idTerm; // prototype term to avoid interning fieldname
|
||||||
|
|
||||||
protected Vector<SolrEventListener> commitCallbacks = new Vector<SolrEventListener>();
|
protected Vector<SolrEventListener> commitCallbacks = new Vector<SolrEventListener>();
|
||||||
protected Vector<SolrEventListener> optimizeCallbacks = new Vector<SolrEventListener>();
|
protected Vector<SolrEventListener> optimizeCallbacks = new Vector<SolrEventListener>();
|
||||||
|
@ -112,7 +113,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
|
||||||
schema = core.getSchema();
|
schema = core.getSchema();
|
||||||
idField = schema.getUniqueKeyField();
|
idField = schema.getUniqueKeyField();
|
||||||
idFieldType = idField!=null ? idField.getType() : null;
|
idFieldType = idField!=null ? idField.getType() : null;
|
||||||
|
idTerm = idField!=null ? new Term(idField.getName(),"") : null;
|
||||||
parseEventListeners();
|
parseEventListeners();
|
||||||
core.getInfoRegistry().put("updateHandler", this);
|
core.getInfoRegistry().put("updateHandler", this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,8 +215,9 @@ public class AutoCommitTest extends AbstractSolrTestCase {
|
||||||
adoc("id", "A14", "subject", "info" ), null ) );
|
adoc("id", "A14", "subject", "info" ), null ) );
|
||||||
handler.handleRequest( req, rsp );
|
handler.handleRequest( req, rsp );
|
||||||
|
|
||||||
assertEquals(updater.numDocsPending.get(), 0);
|
// Lucene now manages it's own deletes.
|
||||||
assertEquals(updater.commitCommands.get(), 0);
|
// assertEquals(updater.numDocsPending.get(), 0);
|
||||||
|
// assertEquals(updater.commitCommands.get(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue