SOLR-4511: When a new index is replicated into place, we need to update the most recent replicatable index point without doing a commit. This is important for repeater use cases, as well as when nodes may switch master/slave roles.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1451659 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-03-01 17:37:30 +00:00
parent 153d79ce99
commit 3ce9b7d1ac
5 changed files with 148 additions and 44 deletions

View File

@ -188,6 +188,12 @@ Bug Fixes
* SOLR-4505: Possible deadlock around SolrCoreState update lock.
(Erick Erickson, Mark Miller)
* SOLR-4511: When a new index is replicated into place, we need
to update the most recent replicatable index point without
doing a commit. This is important for repeater use cases, as
well as when nodes may switch master/slave roles.
(Mark Miller, Raúl Grande)
Optimizations
----------------------
@ -238,6 +244,8 @@ Other Changes
* SOLR-3843: Include lucene codecs jar and enable per-field postings and docvalues
support in the schema.xml (Robert Muir, Steve Rowe)
* SOLR-4511: Add new test for 'repeater' replication node. (Mark Miller)
================== 4.1.0 ==================
Versions of Major Components

View File

@ -122,7 +122,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
private volatile IndexCommit indexCommitPoint;
volatile IndexCommit indexCommitPoint;
volatile NamedList<Object> snapShootDetails;

View File

@ -463,7 +463,7 @@ public class SnapPuller {
// may be closed
core.getDirectoryFactory().doneWithDirectory(oldDirectory);
}
doCommit(isFullCopyNeeded);
openNewWriterAndSearcher(isFullCopyNeeded);
}
replicationStartTime = 0;
@ -639,17 +639,19 @@ public class SnapPuller {
return sb;
}
private void doCommit(boolean isFullCopyNeeded) throws IOException {
private void openNewWriterAndSearcher(boolean isFullCopyNeeded) throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
// reboot the writer on the new index and get a new searcher
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded, false);
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
try {
// first try to open an NRT searcher so that the new
// IndexWriter is registered with the reader
Future[] waitSearcher = new Future[1];
solrCore.getSearcher(true, false, waitSearcher, true);
searcher = solrCore.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
@ -659,10 +661,17 @@ public class SnapPuller {
SolrException.log(LOG, e);
}
}
commitPoint = searcher.get().getIndexReader().getIndexCommit();
} finally {
req.close();
if (searcher != null) {
searcher.decref();
}
}
// update the commit point in replication handler
replicationHandler.indexCommitPoint = commitPoint;
}

View File

@ -51,7 +51,6 @@
</lst>
<lst name="slave">
<str name="masterUrl">http://127.0.0.1:TEST_PORT/solr/replication</str>
<str name="pollInterval">00:00:01</str>
</lst>
</requestHandler>

View File

@ -58,7 +58,6 @@ import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.util.AbstractSolrTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -76,9 +75,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
+ File.separator + "collection1" + File.separator + "conf"
+ File.separator;
JettySolrRunner masterJetty, slaveJetty;
SolrServer masterClient, slaveClient;
SolrInstance master = null, slave = null;
JettySolrRunner masterJetty, slaveJetty, repeaterJetty;
SolrServer masterClient, slaveClient, repeaterClient;
SolrInstance master = null, slave = null, repeater = null;
static String context = "/solr";
@ -561,44 +560,42 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
// snappull from the slave to the master
for (int i = 0; i < 3; i++)
for (int i = nDocs; i < nDocs + 3; i++)
index(slaveClient, "id", i, "name", "name = " + i);
slaveClient.commit();
pullFromSlaveToMaster();
rQuery(nDocs, "*:*", masterClient);
rQuery(nDocs + 3, "*:*", masterClient);
//get docs from slave and check if number is equal to master
slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
slaveQueryRsp = rQuery(nDocs + 3, "*:*", slaveClient);
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(nDocs, slaveQueryResult.getNumFound());
assertEquals(nDocs + 3, slaveQueryResult.getNumFound());
//compare results
masterQueryRsp = rQuery(nDocs, "*:*", masterClient);
masterQueryRsp = rQuery(nDocs + 3, "*:*", masterClient);
masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null);
assertEquals(null, cmp);
// get the details
// just ensures we don't get an exception
assertVersions();
assertVersions(masterClient, slaveClient);
pullFromSlaveToMaster();
//get docs from slave and check if number is equal to master
slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
slaveQueryRsp = rQuery(nDocs + 3, "*:*", slaveClient);
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(nDocs, slaveQueryResult.getNumFound());
assertEquals(nDocs + 3, slaveQueryResult.getNumFound());
//compare results
masterQueryRsp = rQuery(nDocs, "*:*", masterClient);
masterQueryRsp = rQuery(nDocs + 3, "*:*", masterClient);
masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null);
assertEquals(null, cmp);
assertVersions();
assertVersions(masterClient, slaveClient);
// now force a new index directory
for (int i = 0; i < 4; i++)
for (int i = nDocs + 3; i < nDocs + 7; i++)
index(masterClient, "id", i, "name", "name = " + i);
masterClient.commit();
@ -607,29 +604,29 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
rQuery((int) slaveQueryResult.getNumFound(), "*:*", masterClient);
//get docs from slave and check if number is equal to master
slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
slaveQueryRsp = rQuery(nDocs + 3, "*:*", slaveClient);
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(nDocs, slaveQueryResult.getNumFound());
assertEquals(nDocs + 3, slaveQueryResult.getNumFound());
//compare results
masterQueryRsp = rQuery(nDocs, "*:*", masterClient);
masterQueryRsp = rQuery(nDocs + 3, "*:*", masterClient);
masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null);
assertEquals(null, cmp);
assertVersions();
assertVersions(masterClient, slaveClient);
pullFromSlaveToMaster();
//get docs from slave and check if number is equal to master
slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
slaveQueryRsp = rQuery(nDocs + 3, "*:*", slaveClient);
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(nDocs, slaveQueryResult.getNumFound());
assertEquals(nDocs + 3, slaveQueryResult.getNumFound());
//compare results
masterQueryRsp = rQuery(nDocs, "*:*", masterClient);
masterQueryRsp = rQuery(nDocs + 3, "*:*", masterClient);
masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null);
assertEquals(null, cmp);
assertVersions();
assertVersions(masterClient, slaveClient);
NamedList<Object> details = getDetails(masterClient);
@ -642,40 +639,131 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
}
@Test
public void doTestRepeater() throws Exception {
// no polling
slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
slaveJetty.stop();
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
private void assertVersions() throws Exception {
NamedList<Object> details = getDetails(masterClient);
ArrayList<NamedList<Object>> commits = (ArrayList<NamedList<Object>>) details.get("commits");
Long maxVersionMaster = 0L;
for(NamedList<Object> commit : commits) {
Long version = (Long) commit.get("indexVersion");
maxVersionMaster = Math.max(version, maxVersionMaster);
try {
repeater = new SolrInstance("repeater", null);
repeater.setUp();
repeater.copyConfigFile(CONF_DIR + "solrconfig-repeater.xml",
"solrconfig.xml");
repeaterJetty = createJetty(repeater);
repeaterClient = createNewSolrServer(repeaterJetty.getLocalPort());
for (int i = 0; i < 3; i++)
index(masterClient, "id", i, "name", "name = " + i);
masterClient.commit();
pullFromTo(masterJetty, repeaterJetty);
rQuery(3, "*:*", repeaterClient);
pullFromTo(repeaterJetty, slaveJetty);
rQuery(3, "*:*", slaveClient);
assertVersions(masterClient, repeaterClient);
assertVersions(repeaterClient, slaveClient);
for (int i = 0; i < 4; i++)
index(repeaterClient, "id", i, "name", "name = " + i);
repeaterClient.commit();
pullFromTo(masterJetty, repeaterJetty);
rQuery(3, "*:*", repeaterClient);
pullFromTo(repeaterJetty, slaveJetty);
rQuery(3, "*:*", slaveClient);
for (int i = 3; i < 6; i++)
index(masterClient, "id", i, "name", "name = " + i);
masterClient.commit();
pullFromTo(masterJetty, repeaterJetty);
rQuery(6, "*:*", repeaterClient);
pullFromTo(repeaterJetty, slaveJetty);
rQuery(6, "*:*", slaveClient);
} finally {
if (repeater != null) {
repeaterJetty.stop();
repeater.tearDown();
repeaterJetty = null;
}
}
details = getDetails(slaveClient);
}
private void assertVersions(SolrServer client1, SolrServer client2) throws Exception {
NamedList<Object> details = getDetails(client1);
ArrayList<NamedList<Object>> commits = (ArrayList<NamedList<Object>>) details.get("commits");
Long maxVersionClient1 = getVersion(client1);
Long maxVersionClient2 = getVersion(client2);
assertEquals(maxVersionClient1, maxVersionClient2);
// check vs /replication?command=indexverion call
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/replication");
params.set("command", "indexversion");
QueryRequest req = new QueryRequest(params);
NamedList<Object> resp = client1.request(req);
Long version = (Long) resp.get("indexversion");
assertEquals(maxVersionClient1, version);
// check vs /replication?command=indexversion call
resp = client2.request(req);
version = (Long) resp.get("indexversion");
assertEquals(maxVersionClient2, version);
}
private Long getVersion(SolrServer client) throws Exception {
NamedList<Object> details;
ArrayList<NamedList<Object>> commits;
details = getDetails(client);
commits = (ArrayList<NamedList<Object>>) details.get("commits");
Long maxVersionSlave= 0L;
for(NamedList<Object> commit : commits) {
Long version = (Long) commit.get("indexVersion");
maxVersionSlave = Math.max(version, maxVersionSlave);
}
assertEquals(maxVersionMaster, maxVersionSlave);
return maxVersionSlave;
}
private void pullFromSlaveToMaster() throws MalformedURLException,
IOException {
pullFromTo(slaveJetty, masterJetty);
}
private void pullFromTo(JettySolrRunner from, JettySolrRunner to) throws MalformedURLException, IOException {
String masterUrl;
URL url;
InputStream stream;
masterUrl = "http://127.0.0.1:" + masterJetty.getLocalPort() + "/solr/replication?command=fetchindex&masterUrl=";
masterUrl += "http://127.0.0.1:" + slaveJetty.getLocalPort() + "/solr/replication";
masterUrl = "http://127.0.0.1:" + to.getLocalPort()
+ "/solr/replication?command=fetchindex&masterUrl=";
masterUrl += "http://127.0.0.1:" + from.getLocalPort()
+ "/solr/replication";
url = new URL(masterUrl);
stream = url.openStream();
try {
stream.close();
} catch (IOException e) {
//e.printStackTrace();
// e.printStackTrace();
}
}