mirror of https://github.com/apache/lucene.git
SOLR-847 -- Enhance the snappull command in ReplicationHandler to accept masterUrl
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@729263 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b35c6a23ff
commit
e59ebad16e
|
@ -125,6 +125,8 @@ New Features
|
|||
26. SOLR-928: SolrDocument and SolrInputDocument now implement the Map<String,?>
|
||||
interface. This should make plugging into other standard tools easier. (ryan)
|
||||
|
||||
27. SOLR-847: Enhance the snappull command in ReplicationHandler to accept masterUrl.
|
||||
(Noble Paul, Preetam Rao via shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -91,7 +91,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||
rsp.setHttpCaching(false);
|
||||
SolrParams solrParams = req.getParams();
|
||||
final SolrParams solrParams = req.getParams();
|
||||
String command = solrParams.get(COMMAND);
|
||||
if (command == null) {
|
||||
rsp.add("status", "OK");
|
||||
|
@ -119,7 +119,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
} else if (command.equals(CMD_SNAP_PULL)) {
|
||||
new Thread() {
|
||||
public void run() {
|
||||
doSnapPull();
|
||||
doSnapPull(solrParams);
|
||||
}
|
||||
}.start();
|
||||
rsp.add("status", "OK");
|
||||
|
@ -202,16 +202,24 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
return null;
|
||||
}
|
||||
|
||||
void doSnapPull() {
|
||||
if (!isSlave)
|
||||
private volatile SnapPuller tempSnapPuller;
|
||||
|
||||
void doSnapPull(SolrParams solrParams) {
|
||||
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
|
||||
if (!isSlave && masterUrl == null)
|
||||
return;
|
||||
if (!snapPullLock.tryLock())
|
||||
return;
|
||||
try {
|
||||
snapPuller.fetchLatestIndex(core);
|
||||
tempSnapPuller = snapPuller;
|
||||
if (masterUrl != null) {
|
||||
tempSnapPuller = new SnapPuller(solrParams.toNamedList(), this, core);
|
||||
}
|
||||
tempSnapPuller.fetchLatestIndex(core);
|
||||
} catch (Exception e) {
|
||||
LOG.error("SnapPull failed ", e);
|
||||
} finally {
|
||||
tempSnapPuller = snapPuller;
|
||||
snapPullLock.unlock();
|
||||
}
|
||||
}
|
||||
|
@ -437,7 +445,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
list.add("indexPath", core.getIndexDir());
|
||||
list.add("isMaster", String.valueOf(isMaster));
|
||||
|
||||
if (isSlave) {
|
||||
SnapPuller snapPuller = tempSnapPuller;
|
||||
if (snapPuller != null) {
|
||||
list.add(MASTER_URL, snapPuller.getMasterUrl());
|
||||
if (snapPuller.getPollInterval() != null) {
|
||||
list.add(SnapPuller.POLL_INTERVAL, snapPuller.getPollInterval());
|
||||
|
@ -477,8 +486,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
details.add("replicatable" + CMD_INDEX_VERSION, commit.getVersion());
|
||||
details.add("replicatable" + GENERATION, commit.getGeneration());
|
||||
}
|
||||
|
||||
if (isSlave) {
|
||||
SnapPuller snapPuller = tempSnapPuller;
|
||||
if (snapPuller != null) {
|
||||
try {
|
||||
Properties props = new Properties();
|
||||
File f = new File(core.getDataDir(), SnapPuller.REPLICATION_PROPERTIES);
|
||||
|
@ -500,7 +509,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
} finally {
|
||||
closeNoExp(inFile);
|
||||
}
|
||||
|
||||
try {
|
||||
NamedList nl = snapPuller.getCommandResponse(CMD_DETAILS);
|
||||
details.add("masterDetails", nl.get(CMD_DETAILS));
|
||||
|
@ -640,7 +648,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
registerCloseHook();
|
||||
NamedList slave = (NamedList) initArgs.get("slave");
|
||||
if (slave != null) {
|
||||
snapPuller = new SnapPuller(slave, this, core);
|
||||
tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
|
||||
isSlave = true;
|
||||
}
|
||||
NamedList master = (NamedList) initArgs.get("master");
|
||||
|
@ -861,6 +869,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Used to write a marker for EOF
|
||||
*/
|
||||
|
|
|
@ -21,8 +21,8 @@ import org.apache.commons.httpclient.methods.PostMethod;
|
|||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.util.FastInputStream;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.JavaBinCodec;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import static org.apache.solr.handler.ReplicationHandler.*;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
|
@ -135,7 +135,7 @@ public class SnapPuller {
|
|||
}
|
||||
try {
|
||||
executorStartTime = System.currentTimeMillis();
|
||||
replicationHandler.doSnapPull();
|
||||
replicationHandler.doSnapPull(null);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception in pulling snapshot", e);
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.common.util.SimpleOrderedMap;
|
|||
import org.apache.solr.util.AbstractSolrTestCase;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URL;
|
||||
|
||||
/**
|
||||
* Test for ReplicationHandler
|
||||
|
@ -236,6 +237,97 @@ public class TestReplicationHandler extends TestCase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
public void testStopPoll() throws Exception {
|
||||
// Test:
|
||||
// setup master/slave.
|
||||
// stop polling on slave, add a doc to master and verify slave hasn't picked it.
|
||||
|
||||
//add 500 docs to master
|
||||
for (int i = 0; i < 500; i++)
|
||||
index(masterClient, "id", i, "name", "name = " + i);
|
||||
|
||||
masterClient.commit();
|
||||
|
||||
NamedList masterQueryRsp = query("*:*", masterClient);
|
||||
SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
|
||||
assertEquals(500, masterQueryResult.getNumFound());
|
||||
|
||||
//sleep for pollinterval time 3s, to let slave pull data.
|
||||
Thread.sleep(3000);
|
||||
//get docs from slave and check if number is equal to master
|
||||
NamedList slaveQueryRsp = query("*:*", slaveClient);
|
||||
SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
|
||||
assertEquals(500, slaveQueryResult.getNumFound());
|
||||
|
||||
//compare results
|
||||
String cmp = TestDistributedSearch.compare(masterQueryResult, slaveQueryResult, 0, null);
|
||||
assertEquals(null, cmp);
|
||||
|
||||
// start stop polling test
|
||||
String masterUrl = "http://localhost:" + slaveJetty.getLocalPort() + "/solr/replication?command=disablepoll";
|
||||
URL url = new URL(masterUrl);
|
||||
InputStream stream = url.openStream();
|
||||
try {
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
index(masterClient, "id", 501, "name", "name = " + 501);
|
||||
masterClient.commit();
|
||||
//sleep for pollinterval time 3s, to let slave pull data.
|
||||
Thread.sleep(3000);
|
||||
//get docs from slave and check if number is equal to master
|
||||
slaveQueryRsp = query("*:*", slaveClient);
|
||||
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
|
||||
assertEquals(500, slaveQueryResult.getNumFound());
|
||||
//get docs from slave and check if number is equal to master
|
||||
slaveQueryRsp = query("*:*", masterClient);
|
||||
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
|
||||
assertEquals(501, slaveQueryResult.getNumFound());
|
||||
}
|
||||
|
||||
public void testSnapPullWithMasterUrl() throws Exception {
|
||||
//change solrconfig on slave
|
||||
//this has no entry for pollinginterval
|
||||
copyFile(new File("." + File.separator +
|
||||
"solr" + File.separator +
|
||||
"conf" + File.separator + "solrconfig-slave1.xml"),
|
||||
new File(slave.getConfDir(), "solrconfig.xml"));
|
||||
slaveJetty.stop();
|
||||
slaveJetty = createJetty(slave, 0);
|
||||
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
|
||||
|
||||
//add 500 docs to master
|
||||
for (int i = 0; i < 500; i++)
|
||||
index(masterClient, "id", i, "name", "name = " + i);
|
||||
|
||||
masterClient.commit();
|
||||
|
||||
NamedList masterQueryRsp = query("*:*", masterClient);
|
||||
SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
|
||||
assertEquals(500, masterQueryResult.getNumFound());
|
||||
|
||||
// snappull
|
||||
String masterUrl = "http://localhost:" + slaveJetty.getLocalPort() + "/solr/replication?command=snappull&masterUrl=";
|
||||
masterUrl += "http://localhost:" + masterJetty.getLocalPort() + "/solr/replication";
|
||||
URL url = new URL(masterUrl);
|
||||
InputStream stream = url.openStream();
|
||||
try {
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
Thread.sleep(3000);
|
||||
//get docs from slave and check if number is equal to master
|
||||
NamedList slaveQueryRsp = query("*:*", slaveClient);
|
||||
SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
|
||||
assertEquals(500, slaveQueryResult.getNumFound());
|
||||
//compare results
|
||||
String cmp = TestDistributedSearch.compare(masterQueryResult, slaveQueryResult, 0, null);
|
||||
assertEquals(null, cmp);
|
||||
}
|
||||
|
||||
void copyFile(File src, File dst) throws IOException {
|
||||
InputStream in = new FileInputStream(src);
|
||||
OutputStream out = new FileOutputStream(dst);
|
||||
|
@ -322,4 +414,4 @@ public class TestReplicationHandler extends TestCase {
|
|||
AbstractSolrTestCase.recurseDelete(homeDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
<?xml version="1.0" ?>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- $Id$
|
||||
$Source$
|
||||
$Name$
|
||||
-->
|
||||
|
||||
<config>
|
||||
|
||||
<dataDir>${solr.data.dir:./solr/data}</dataDir>
|
||||
|
||||
<indexDefaults>
|
||||
<useCompoundFile>false</useCompoundFile>
|
||||
<mergeFactor>10</mergeFactor>
|
||||
<ramBufferSizeMB>32</ramBufferSizeMB>
|
||||
<maxMergeDocs>2147483647</maxMergeDocs>
|
||||
<maxFieldLength>10000</maxFieldLength>
|
||||
<writeLockTimeout>1000</writeLockTimeout>
|
||||
<commitLockTimeout>10000</commitLockTimeout>
|
||||
|
||||
<writeLockTimeout>1000</writeLockTimeout>
|
||||
<commitLockTimeout>10000</commitLockTimeout>
|
||||
|
||||
<lockType>single</lockType>
|
||||
</indexDefaults>
|
||||
|
||||
<mainIndex>
|
||||
<useCompoundFile>false</useCompoundFile>
|
||||
<mergeFactor>10</mergeFactor>
|
||||
<ramBufferSizeMB>32</ramBufferSizeMB>
|
||||
<maxMergeDocs>2147483647</maxMergeDocs>
|
||||
<maxFieldLength>10000</maxFieldLength>
|
||||
|
||||
<unlockOnStartup>true</unlockOnStartup>
|
||||
</mainIndex>
|
||||
|
||||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
</updateHandler>
|
||||
|
||||
<requestHandler name="standard" class="solr.StandardRequestHandler">
|
||||
<bool name="httpCaching">true</bool>
|
||||
</requestHandler>
|
||||
|
||||
<!-- test query parameter defaults -->
|
||||
<requestHandler name="defaults" class="solr.StandardRequestHandler">
|
||||
|
||||
</requestHandler>
|
||||
|
||||
<!-- test query parameter defaults -->
|
||||
<requestHandler name="lazy" class="solr.StandardRequestHandler" startup="lazy">
|
||||
</requestHandler>
|
||||
|
||||
<requestHandler name="/update" class="solr.XmlUpdateRequestHandler"/>
|
||||
|
||||
<requestHandler name="/replication" class="solr.ReplicationHandler">
|
||||
|
||||
</requestHandler>
|
||||
|
||||
|
||||
<!-- enable streaming for testing... -->
|
||||
<requestDispatcher handleSelect="true">
|
||||
<requestParsers enableRemoteStreaming="true" multipartUploadLimitInKB="2048"/>
|
||||
<httpCaching lastModifiedFrom="openTime" etagSeed="Solr" never304="false">
|
||||
<cacheControl>max-age=30, public</cacheControl>
|
||||
</httpCaching>
|
||||
</requestDispatcher>
|
||||
|
||||
</config>
|
Loading…
Reference in New Issue