SOLR-10114: add _version_ field to child documents, fix reordered-dbq to not drop child docs

This commit is contained in:
yonik 2017-02-15 22:51:21 -05:00
parent 4f29685edb
commit 99188ae00c
5 changed files with 126 additions and 16 deletions

View File

@ -53,6 +53,9 @@ Bug Fixes
* SOLR-9837: Fix 55% performance regression of FieldCache uninvert time of * SOLR-9837: Fix 55% performance regression of FieldCache uninvert time of
numeric fields. (yonik) numeric fields. (yonik)
* SOLR-10114: Reordered delete-by-query causes inconsistenties between shards that have
child documents (Mano Kovacs, Mihaly Toth, yonik)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -189,8 +189,11 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
String idField = getHashableId(); String idField = getHashableId();
boolean isVersion = version != 0;
for (SolrInputDocument sdoc : all) { for (SolrInputDocument sdoc : all) {
sdoc.setField("_root_", idField); // should this be a string or the same type as the ID? sdoc.setField("_root_", idField); // should this be a string or the same type as the ID?
if(isVersion) sdoc.setField(VersionInfo.VERSION_FIELD, version);
// TODO: if possible concurrent modification exception (if SolrInputDocument not cloned and is being forwarded to replicas) // TODO: if possible concurrent modification exception (if SolrInputDocument not cloned and is being forwarded to replicas)
// then we could add this field to the generated lucene document instead. // then we could add this field to the generated lucene document instead.
} }

View File

@ -259,7 +259,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
private void doNormalUpdate(AddUpdateCommand cmd) throws IOException { private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {
Term updateTerm; Term updateTerm;
Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId()); Term idTerm = getIdTerm(cmd);
boolean del = false; boolean del = false;
if (cmd.updateTerm == null) { if (cmd.updateTerm == null) {
updateTerm = idTerm; updateTerm = idTerm;
@ -273,12 +273,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
try { try {
IndexWriter writer = iw.get(); IndexWriter writer = iw.get();
if (cmd.isBlock()) {
writer.updateDocuments(updateTerm, cmd);
} else {
updateDocOrDocValues(cmd, writer, updateTerm); updateDocOrDocValues(cmd, writer, updateTerm);
}
// SolrCore.verbose("updateDocument",updateTerm,"DONE");
if (del) { // ensure id remains unique if (del) { // ensure id remains unique
BooleanQuery.Builder bq = new BooleanQuery.Builder(); BooleanQuery.Builder bq = new BooleanQuery.Builder();
@ -323,7 +318,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
} }
Document luceneDocument = cmd.getLuceneDocument(); Document luceneDocument = cmd.getLuceneDocument();
Term idTerm = new Term(idField.getName(), cmd.getIndexedId()); Term idTerm = getIdTerm(cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core); RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try { try {
@ -344,6 +339,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
} }
private Term getIdTerm(AddUpdateCommand cmd) {
return new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId());
}
private void updateDeleteTrackers(DeleteUpdateCommand cmd) { private void updateDeleteTrackers(DeleteUpdateCommand cmd) {
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
if (commitWithinSoftCommit) { if (commitWithinSoftCommit) {
@ -866,6 +865,15 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
} }
log.debug("updateDocValues({})", cmd); log.debug("updateDocValues({})", cmd);
writer.updateDocValues(updateTerm, fieldsToUpdate.toArray(new Field[fieldsToUpdate.size()])); writer.updateDocValues(updateTerm, fieldsToUpdate.toArray(new Field[fieldsToUpdate.size()]));
} else {
updateDocument(cmd, writer, updateTerm);
}
}
private void updateDocument(AddUpdateCommand cmd, IndexWriter writer, Term updateTerm) throws IOException {
if(cmd.isBlock()){
log.debug("updateDocuments({})", cmd);
writer.updateDocuments(updateTerm, cmd);
}else{ }else{
Document luceneDocument = cmd.getLuceneDocument(false); Document luceneDocument = cmd.getLuceneDocument(false);
log.debug("updateDocument({})", cmd); log.debug("updateDocument({})", cmd);

View File

@ -23,6 +23,7 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric; import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.metrics.SolrMetricManager; import org.apache.solr.metrics.SolrMetricManager;
import org.noggit.ObjectBuilder; import org.noggit.ObjectBuilder;
@ -69,6 +70,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
// TODO: fix this test to not require FSDirectory // TODO: fix this test to not require FSDirectory
static String savedFactory; static String savedFactory;
private interface RunnableWithException{void run () throws Exception;}
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
savedFactory = System.getProperty("solr.DirectoryFactory"); savedFactory = System.getProperty("solr.DirectoryFactory");
@ -290,6 +294,86 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test @Test
public void testLogReplayWithReorderedDBQ() throws Exception { public void testLogReplayWithReorderedDBQ() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
updateJ(jsonAdd(sdoc("id", "B1", "_version_", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:B2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
updateJ(jsonAdd(sdoc("id", "B2", "_version_", "1015")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "B3", "_version_", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==2")
);
}
@Test
public void testLogReplayWithReorderedDBQByAsterixAndChildDocs() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
// 1010 - will be deleted
updateJ(jsonAdd(sdocWithChildren("B1", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - should be kept, including child docs
updateJ(jsonAdd(sdocWithChildren("B2", "1018")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1017 - delete should affect only 1010
updateJ(jsonDelQ("id:*"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
// 1012 - will be deleted
updateJ(jsonAdd(sdoc("id", "B3", "_version_", "1012")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1020 - should be untouched
updateJ(jsonAdd(sdocWithChildren("B4", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==6")
);
}
@Test
public void testLogReplayWithReorderedDBQByIdAndChildDocs() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
// 1010 - will be deleted
updateJ(jsonAdd(sdocWithChildren("B1", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - should be kept, including child docs
updateJ(jsonAdd(sdocWithChildren("B2", "1018")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1017 - delete should affect only 1010
updateJ(jsonDelQ("id:B1 id:B2 id:B3 id:B4"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
// 1012 - will be deleted
updateJ(jsonAdd(sdoc("id", "B3", "_version_", "1012")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1020 - should be untouched
updateJ(jsonAdd(sdocWithChildren("B4", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==8") // B2, B4 and 6 children docs (delete by id does not delete child docs)
);
}
@Test
public void testLogReplayWithReorderedDBQInsertingChildnodes() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
updateJ(jsonDelQ("id:B2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017"));
// test doc: B1
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("B1", "1013", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==4") // B1 and B2, plus 2x 3 children
);
}
@Test
public void testLogReplayWithReorderedDBQUpdateWithDifferentChildCount() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
// control
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("B1", "1011", 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - this should be the final
updateJ(jsonAdd(sdocWithChildren("B1", "1012", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("B2", "1013", 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:B3"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1019"));
// 1018 - this should be the final
updateJ(jsonAdd(sdocWithChildren("B2", "1018", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==8") // B1+3children+B2+3children
);
}
private void testLogReplayWithReorderedDBQWrapper(RunnableWithException act, RunnableWithException assrt) throws Exception {
try { try {
DirectUpdateHandler2.commitOnClose = false; DirectUpdateHandler2.commitOnClose = false;
@ -310,10 +394,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
clearIndex(); clearIndex();
assertU(commit()); assertU(commit());
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // Adding some documents
updateJ(jsonDelQ("id:B2"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-1017")); // This should've arrived after the 1015th update act.run();
updateJ(jsonAdd(sdoc("id","B2", "_version_","1015")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B3", "_version_","1020")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertJQ(req("q", "*:*"), "/response/numFound==0"); assertJQ(req("q", "*:*"), "/response/numFound==0");
@ -333,7 +415,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
// wait until recovery has finished // wait until recovery has finished
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS)); assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
assertJQ(req("q","*:*") ,"/response/numFound==2"); // Asserting
assrt.run();
} finally { } finally {
DirectUpdateHandler2.commitOnClose = true; DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null; UpdateLog.testing_logReplayHook = null;

View File

@ -1296,6 +1296,18 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
return sd; return sd;
} }
public SolrInputDocument sdocWithChildren(String id, String version) {
return sdocWithChildren(id, version, 2);
}
public SolrInputDocument sdocWithChildren(String id, String version, int childCount) {
SolrInputDocument doc = sdoc("id", id, "_version_", version);
for (int i = 0; i < childCount; i++) {
doc.addChildDocument(sdoc("id", id + "_child" + i));
}
return doc;
}
public static List<SolrInputDocument> sdocs(SolrInputDocument... docs) { public static List<SolrInputDocument> sdocs(SolrInputDocument... docs) {
return Arrays.asList(docs); return Arrays.asList(docs);
} }