SOLR-10151: Use monotonically incrementing counter for doc ids in TestRecovery.

This commit is contained in:
markrmiller 2017-04-14 01:17:03 -04:00
parent 6d948debc6
commit 4dc367414f
2 changed files with 226 additions and 122 deletions

View File

@ -236,6 +236,8 @@ Other Changes
* SOLR-10007: Clean up references to CoreContainer and CoreDescriptors (Erick Erickson)
* SOLR-10151: Use monotonically incrementing counter for doc ids in TestRecovery. (Peter Szantai-Kis, Mano Kovacs via Mark Miller)
================== 6.5.1 ==================
Bug Fixes

View File

@ -17,6 +17,7 @@
package org.apache.solr.search;
import static org.apache.solr.search.TestRecovery.VersionProvider.*;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import com.codahale.metrics.Gauge;
@ -292,10 +293,15 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplayWithReorderedDBQ() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
updateJ(jsonAdd(sdoc("id", "RDBQ1_1", "_version_", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:RDBQ1_2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
updateJ(jsonAdd(sdoc("id", "RDBQ1_2", "_version_", "1015")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "RDBQ1_3", "_version_", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
String v1010 = getNextVersion();
String v1015 = getNextVersion();
String v1017_del = "-" + getNextVersion();
String v1020 = getNextVersion();
updateJ(jsonAdd(sdoc("id", "RDBQ1_1", "_version_", v1010)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:RDBQ1_2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", v1017_del)); // This should've arrived after the ver2 update
updateJ(jsonAdd(sdoc("id", "RDBQ1_2", "_version_", v1015)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "RDBQ1_3", "_version_", v1020)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==2")
);
@ -304,16 +310,22 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplayWithReorderedDBQByAsterixAndChildDocs() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
String v1010 = getNextVersion();
String v1012 = getNextVersion();
String v1017_del = "-" + getNextVersion();
String v1018 = getNextVersion();
String v1020 = getNextVersion();
// 1010 - will be deleted
updateJ(jsonAdd(sdocWithChildren("RDBQ2_1", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ2_1", v1010)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - should be kept, including child docs
updateJ(jsonAdd(sdocWithChildren("RDBQ2_2", "1018")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ2_2", v1018)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1017 - delete should affect only 1010
updateJ(jsonDelQ("_root_:RDBQ2_1 _root_:RDBQ2_2 id:RDBQ2_3 _root_:RDBQ2_4"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
updateJ(jsonDelQ("_root_:RDBQ2_1 _root_:RDBQ2_2 id:RDBQ2_3 _root_:RDBQ2_4"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", v1017_del)); // This should've arrived after the ver2 update
// 1012 - will be deleted
updateJ(jsonAdd(sdoc("id", "RDBQ2_3", "_version_", "1012")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "RDBQ2_3", "_version_", v1012)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1020 - should be untouched
updateJ(jsonAdd(sdocWithChildren("RDBQ2_4", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ2_4", v1020)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==6")
);
@ -322,16 +334,22 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplayWithReorderedDBQByIdAndChildDocs() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
String v1010 = getNextVersion();
String v1012 = getNextVersion();
String v1017_del = "-" + getNextVersion();
String v1018 = getNextVersion();
String v1020 = getNextVersion();
// 1010 - will be deleted
updateJ(jsonAdd(sdocWithChildren("RDBQ3_1", "1010")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ3_1", v1010)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - should be kept, including child docs
updateJ(jsonAdd(sdocWithChildren("RDBQ3_2", "1018")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ3_2", v1018)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1017 - delete should affect only 1010
updateJ(jsonDelQ("id:RDBQ3_1 id:RDBQ3_2 id:RDBQ3_3 id:RDBQ3_4"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017")); // This should've arrived after the 1015th update
updateJ(jsonDelQ("id:RDBQ3_1 id:RDBQ3_2 id:RDBQ3_3 id:RDBQ3_4"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", v1017_del)); // This should've arrived after the ver2 update
// 1012 - will be deleted
updateJ(jsonAdd(sdoc("id", "RDBQ3_3", "_version_", "1012")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "RDBQ3_3", "_version_", v1012)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1020 - should be untouched
updateJ(jsonAdd(sdocWithChildren("RDBQ3_4", "1020")), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ3_4", v1020)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==8") // RDBQ3_2, RDBQ3_4 and 6 children docs (delete by id does not delete child docs)
);
@ -340,10 +358,13 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplayWithReorderedDBQInsertingChildnodes() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
updateJ(jsonDelQ("id:RDBQ4_2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1017"));
String v1013 = getNextVersion();
String v1017_del = "-" + getNextVersion();
updateJ(jsonDelQ("id:RDBQ4_2"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", v1017_del));
// test doc: B1
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("RDBQ4_1", "1013", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ4_1", v1013, 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==4") // RDBQ4_1 and RDBQ4_2, plus 2x 3 children
);
@ -353,17 +374,23 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplayWithReorderedDBQUpdateWithDifferentChildCount() throws Exception {
testLogReplayWithReorderedDBQWrapper(() -> {
String v1011 = getNextVersion();
String v1012 = getNextVersion();
String v1013 = getNextVersion();
String v1018 = getNextVersion();
String v1019_del = "-" + getNextVersion();
// control
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("RDBQ5_1", "1011", 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1018 - this should be the final
updateJ(jsonAdd(sdocWithChildren("RDBQ5_1", "1012", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1011 - will be inserted with 3 children as 1012
updateJ(jsonAdd(sdocWithChildren("RDBQ5_1", v1011, 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1012 - this should be the final
updateJ(jsonAdd(sdocWithChildren("RDBQ5_1", v1012, 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
// 1013 - will be inserted with 3 children
updateJ(jsonAdd(sdocWithChildren("RDBQ5_2", "1013", 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:RDBQ5_3"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "-1019"));
// 1013 - will be inserted with 3 children as 1018
updateJ(jsonAdd(sdocWithChildren("RDBQ5_2", v1013, 2)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonDelQ("id:RDBQ5_3"), params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", v1019_del));
// 1018 - this should be the final
updateJ(jsonAdd(sdocWithChildren("RDBQ5_2", "1018", 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
updateJ(jsonAdd(sdocWithChildren("RDBQ5_2", v1018, 3)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
},
() -> assertJQ(req("q", "*:*"), "/response/numFound==8") // RDBQ5_1+3children+RDBQ5_2+3children
);
@ -469,23 +496,43 @@ public class TestRecovery extends SolrTestCaseJ4 {
int initialOps = bufferedOps.getValue();
Meter applyingBuffered = (Meter)metrics.get("TLOG.applyingBuffered.ops");
long initialApplyingOps = applyingBuffered.getCount();
String v3 = getNextVersion();
String v940_del = "-" + getNextVersion();
String v950_del = "-" + getNextVersion();
String v1010 = getNextVersion();
String v1015 = getNextVersion();
String v1017_del = "-" + getNextVersion();
String v1020 = getNextVersion();
String v1030 = getNextVersion();
String v1040 = getNextVersion();
String v1050 = getNextVersion();
String v1060 = getNextVersion();
String v1070 = getNextVersion();
String v1080 = getNextVersion();
String v2010_del = "-" + getNextVersion();
String v2060_del = "-" + getNextVersion();
String v3000_del = "-" + getNextVersion();
String versionListFirstCheck = String.join(",", v2010_del, v1030, v1020, v1017_del, v1015, v1010);
String versionListSecondCheck = String.join(",", v3000_del, v1080, v1050, v1060, v940_del, v1040 ,v3, v2010_del, v1030, v1020, v1017_del, v1015, v1010);
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-1017"));
updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B1", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2010"));
updateJ(jsonAdd(sdoc("id","B1", "_version_",v1010)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B11", "_version_",v1015)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v1017_del));
updateJ(jsonAdd(sdoc("id","B2", "_version_",v1020)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B3", "_version_",v1030)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B1", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v2010_del));
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
,"=={'versions':["+versionListFirstCheck+"]}"
);
assertU(commit());
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
,"=={'versions':["+versionListFirstCheck+"]}"
);
// updates should be buffered, so we should not see any results yet.
@ -515,7 +562,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(6L, applyingBuffered.getCount() - initialApplyingOps);
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
,"=={'versions':["+versionListFirstCheck+"]}"
);
@ -528,24 +575,24 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
Long ver = getVer(req("qt","/get", "id","B3"));
assertEquals(1030L, ver.longValue());
assertEquals(Long.valueOf(v1030), ver);
// add a reordered doc that shouldn't overwrite one in the index
updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B3", "_version_",v3)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// reorder two buffered updates
updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B4", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-940")); // this update should not take affect
updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B4", "_version_",v1040)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B4", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v940_del)); // this update should not take affect
updateJ(jsonAdd(sdoc("id","B6", "_version_",v1060)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B5", "_version_",v1050)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B8", "_version_",v1080)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that delete by query is at least buffered along with everything else so it will delete the
// currently buffered id:8 (even if it doesn't currently support versioning)
updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-3000"));
updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v3000_del));
assertJQ(req("qt","/get", "getVersions","13")
,"=={'versions':[-3000,1080,1050,1060,-940,1040,3,-2010,1030,1020,-1017,1015,1010]}" // the "3" appears because versions aren't checked while buffering
,"=={'versions':[" + versionListSecondCheck + "]}" // the "3" appears because versions aren't checked while buffering
);
logReplay.drainPermits();
@ -557,22 +604,22 @@ public class TestRecovery extends SolrTestCaseJ4 {
logReplay.release(1);
// now add another update
updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B7", "_version_",v1070)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// a reordered update that should be dropped
deleteAndGetVersion("B5", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-950"));
deleteAndGetVersion("B5", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v950_del));
deleteAndGetVersion("B6", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2060"));
deleteAndGetVersion("B6", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",v2060_del));
logReplay.release(1000);
UpdateLog.RecoveryInfo recInfo = rinfoFuture.get();
assertJQ(req("q", "*:*", "sort","id asc", "fl","id,_version_")
, "/response/docs==["
+ "{'id':'B3','_version_':1030}"
+ ",{'id':'B4','_version_':1040}"
+ ",{'id':'B5','_version_':1050}"
+ ",{'id':'B7','_version_':1070}"
+ "{'id':'B3','_version_':"+v1030+"}"
+ ",{'id':'B4','_version_':"+v1040+"}"
+ ",{'id':'B5','_version_':"+v1050+"}"
+ ",{'id':'B7','_version_':"+v1070+"}"
+"]"
);
@ -615,6 +662,22 @@ public class TestRecovery extends SolrTestCaseJ4 {
UpdateLog ulog = uhandler.getUpdateLog();
try {
String v101 = getNextVersion();
String v102 = getNextVersion();
String v103 = getNextVersion();
String v104 = getNextVersion();
String v105 = getNextVersion();
String v200 = getNextVersion();
String v201 = getNextVersion();
String v203 = getNextVersion();
String v204 = getNextVersion();
String v205 = getNextVersion();
String v206 = getNextVersion();
String v301 = getNextVersion();
String v302 = getNextVersion();
String v998 = getNextVersion();
String v999 = getNextVersion();
clearIndex();
assertU(commit());
@ -629,14 +692,14 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C1", "_version_",v101)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C2", "_version_",v102)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C3", "_version_",v103)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "C4", "_version_",v104)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "C5", "_version_",v105)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
logReplay.release(1000);
rinfoFuture = ulog.applyBufferedUpdates();
@ -644,21 +707,21 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(2, rinfo.adds);
assertJQ(req("qt","/get", "getVersions","2")
,"=={'versions':[105,104]}"
,"=={'versions':["+v105+","+v104+"]}"
);
// this time add some docs first before buffering starts (so tlog won't be at pos 0)
updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C103", "_version_",v203)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C104", "_version_",v204)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C105", "_version_",v205)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C106", "_version_",v206)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
rinfoFuture = ulog.applyBufferedUpdates();
rinfo = rinfoFuture.get();
@ -666,45 +729,45 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_")
, "/response/docs==["
+ "{'id':'C4','_version_':104}"
+ ",{'id':'C5','_version_':105}"
+ ",{'id':'C100','_version_':200}"
+ ",{'id':'C101','_version_':201}"
+ ",{'id':'C105','_version_':205}"
+ ",{'id':'C106','_version_':206}"
+ "{'id':'C4','_version_':"+v104+"}"
+ ",{'id':'C5','_version_':"+v105+"}"
+ ",{'id':'C100','_version_':"+v200+"}"
+ ",{'id':'C101','_version_':"+v201+"}"
+ ",{'id':'C105','_version_':"+v205+"}"
+ ",{'id':'C106','_version_':"+v206+"}"
+"]"
);
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[206,205,201,200,105,104]}"
,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
);
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C301", "_version_",v998)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_",v999)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
// make sure we can overwrite with a lower version
// TODO: is this functionality needed?
updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C301", "_version_",v301)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_",v302)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertU(commit());
assertJQ(req("qt","/get", "getVersions","2")
,"=={'versions':[302,301]}"
,"=={'versions':["+v302+","+v301+"]}"
);
assertJQ(req("q", "*:*", "sort","_version_ desc", "fl","id,_version_", "rows","2")
, "/response/docs==["
+ "{'id':'C302','_version_':302}"
+ ",{'id':'C301','_version_':301}"
+ "{'id':'C302','_version_':"+v302+"}"
+ ",{'id':'C301','_version_':"+v301+"}"
+"]"
);
updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C2", "_version_",v302)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
@ -744,6 +807,18 @@ public class TestRecovery extends SolrTestCaseJ4 {
Future<UpdateLog.RecoveryInfo> rinfoFuture;
try {
String v101 = getNextVersion();
String v102 = getNextVersion();
String v103 = getNextVersion();
String v104 = getNextVersion();
String v105 = getNextVersion();
String v200 = getNextVersion();
String v201 = getNextVersion();
String v203 = getNextVersion();
String v204 = getNextVersion();
String v205 = getNextVersion();
String v206 = getNextVersion();
clearIndex();
assertU(commit());
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
@ -752,16 +827,16 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","c1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c1", "_version_",v101)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c2", "_version_",v102)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c3", "_version_",v103)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// call bufferUpdates again (this currently happens when recovery fails)... we should get a new starting point
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
updateJ(jsonAdd(sdoc("id", "c4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "c5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "c4", "_version_",v104)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "c5", "_version_",v105)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
logReplay.release(1000);
rinfoFuture = ulog.applyBufferedUpdates();
@ -769,21 +844,21 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(2, rinfo.adds);
assertJQ(req("qt","/get", "getVersions","2")
,"=={'versions':[105,104]}"
,"=={'versions':["+v105+","+v104+"]}"
);
// this time add some docs first before buffering starts (so tlog won't be at pos 0)
updateJ(jsonAdd(sdoc("id","c100", "_version_","200")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c101", "_version_","201")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","c103", "_version_","203")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c104", "_version_","204")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c103", "_version_",v203)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c104", "_version_",v204)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// call bufferUpdates again (this currently happens when recovery fails)... we should get a new starting point
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","c105", "_version_","205")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c106", "_version_","206")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c105", "_version_",v205)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","c106", "_version_",v206)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
rinfoFuture = ulog.applyBufferedUpdates();
rinfo = rinfoFuture.get();
@ -791,19 +866,19 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_")
, "/response/docs==["
+ "{'id':'c4','_version_':104}"
+ ",{'id':'c5','_version_':105}"
+ ",{'id':'c100','_version_':200}"
+ ",{'id':'c101','_version_':201}"
+ ",{'id':'c105','_version_':205}"
+ ",{'id':'c106','_version_':206}"
+"]"
+ "{'id':'c4','_version_':"+v104+"}"
+ ",{'id':'c5','_version_':"+v105+"}"
+ ",{'id':'c100','_version_':"+v200+"}"
+ ",{'id':'c101','_version_':"+v201+"}"
+ ",{'id':'c105','_version_':"+v205+"}"
+ ",{'id':'c106','_version_':"+v206+"}"
+"" +"]"
);
// The updates that were buffered (but never applied) still appear in recent versions!
// This is good for some uses, but may not be good for others.
assertJQ(req("qt","/get", "getVersions","11")
,"=={'versions':[206,205,204,203,201,200,105,104,103,102,101]}"
,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}"
);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
@ -864,6 +939,14 @@ public class TestRecovery extends SolrTestCaseJ4 {
UpdateLog ulog = uhandler.getUpdateLog();
try {
String v101 = getNextVersion();
String v102 = getNextVersion();
String v103 = getNextVersion();
String v114 = getNextVersion();
String v115 = getNextVersion();
String v116 = getNextVersion();
String v117 = getNextVersion();
clearIndex();
assertU(commit());
@ -871,9 +954,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
ulog.bufferUpdates();
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q1", "_version_",v101)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q2", "_version_",v102)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q3", "_version_",v103)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
req.close();
@ -903,9 +986,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
// now do some normal non-buffered adds
updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertU(commit());
req.close();
@ -921,7 +1004,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
ulog.bufferUpdates();
// simulate receiving no updates
ulog.applyBufferedUpdates();
updateJ(jsonAdd(sdoc("id","Q7", "_version_","117")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal
updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal
req.close();
h.close();
@ -950,26 +1033,29 @@ public class TestRecovery extends SolrTestCaseJ4 {
// make sure that on a restart, versions don't start too low
@Test
public void testVersionsOnRestart() throws Exception {
String v1 = getNextVersion();
String v2 = getNextVersion();
clearIndex();
assertU(commit());
assertU(adoc("id","D1", "val_i","1"));
assertU(adoc("id","D2", "val_i","1"));
assertU(adoc("id","D1", "val_i",v1));
assertU(adoc("id","D2", "val_i",v1));
assertU(commit());
long v1 = getVer(req("q","id:D1"));
long v1a = getVer(req("q","id:D2"));
long D1Version1 = getVer(req("q","id:D1"));
long D2Version1 = getVer(req("q","id:D2"));
h.close();
createCore();
assertU(adoc("id","D1", "val_i","2"));
assertU(adoc("id","D1", "val_i",v2));
assertU(commit());
long v2 = getVer(req("q","id:D1"));
long D1Version2 = getVer(req("q","id:D1"));
assert(v2 > v1);
assert(D1Version2 > D1Version1);
assertJQ(req("qt","/get", "getVersions","2")
,"/versions==[" + v2 + "," + v1a + "]"
,"/versions==[" + D1Version2 + "," + D2Version1 + "]"
);
}
@ -997,11 +1083,13 @@ public class TestRecovery extends SolrTestCaseJ4 {
UpdateLog ulog = uhandler.getUpdateLog();
try {
String v1 = getNextVersion();
clearIndex();
assertU(commit());
assertU(adoc("id","E1", "val_i","1"));
assertU(adoc("id","E2", "val_i","1"));
assertU(adoc("id","E1", "val_i",v1));
assertU(adoc("id","E2", "val_i",v1));
// set to a high enough number so this test won't hang on a bug
logReplay.release(10);
@ -1203,13 +1291,17 @@ public class TestRecovery extends SolrTestCaseJ4 {
// Now test that the bad log file doesn't mess up retrieving latest versions
//
updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
String v104 = getNextVersion();
String v105 = getNextVersion();
String v106 = getNextVersion();
updateJ(jsonAdd(sdoc("id","F4", "_version_",v104)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F5", "_version_",v105)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F6", "_version_",v106)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// This currently skips the bad log file and also returns the version of the clearIndex (del *:*)
// assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]");
assertJQ(req("qt","/get", "getVersions","3"), "/versions==[106,105,104]");
assertJQ(req("qt","/get", "getVersions","3"), "/versions==["+v106+","+v105+","+v104+"]");
} finally {
DirectUpdateHandler2.commitOnClose = true;
@ -1259,14 +1351,16 @@ public class TestRecovery extends SolrTestCaseJ4 {
//
// Now test that the bad log file doesn't mess up retrieving latest versions
//
String v104 = getNextVersion();
String v105 = getNextVersion();
String v106 = getNextVersion();
updateJ(jsonAdd(sdoc("id","G4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","G5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","G6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","G4", "_version_",v104)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","G5", "_version_",v105)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","G6", "_version_",v106)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// This currently skips the bad log file and also returns the version of the clearIndex (del *:*)
// assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]");
assertJQ(req("qt","/get", "getVersions","3"), "/versions==[106,105,104]");
assertJQ(req("qt","/get", "getVersions","3"), "/versions==["+v106+","+v105+","+v104+"]");
assertU(commit());
@ -1554,5 +1648,13 @@ public class TestRecovery extends SolrTestCaseJ4 {
return (Long)doc.get("_version_");
}
static class VersionProvider{
private static long version = 0;
static String getNextVersion() {
return Long.toString(version++);
}
}
}