SOLR-6485

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1627340 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2014-09-24 14:47:37 +00:00
parent 83cbdfda3d
commit defaa27e69
5 changed files with 291 additions and 103 deletions

View File

@ -135,6 +135,9 @@ New Features
of arbitrary functions, ie: stats.field={!func}product(price,popularity)
(hossman)
* SOLR-6485: ReplicationHandler should have an option to throttle the speed of
replication (Varun Thacker, NOble Paul)
Bug Fixes
----------------------

View File

@ -53,9 +53,10 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -64,8 +65,8 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrDeletionPolicy;
@ -1066,7 +1067,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
}
if(oldCommitPoint != null){
core.getDeletionPolicy().releaseCommitPoint(oldCommitPoint.getGeneration());
core.getDeletionPolicy().releaseCommitPointAndExtendReserve(oldCommitPoint.getGeneration());
}
***/
}
@ -1094,6 +1095,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
};
}
/**This class is used to read and send files in the lucene index
*
*/
private class DirectoryFileStream {
protected SolrParams params;
@ -1102,40 +1106,83 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
protected Long indexGen;
protected IndexDeletionPolicyWrapper delPolicy;
protected String fileName;
protected String cfileName;
protected String sOffset;
protected String sLen;
protected String compress;
protected boolean useChecksum;
protected long offset = -1;
protected int len = -1;
protected Checksum checksum;
private RateLimiter rateLimiter;
byte[] buf;
public DirectoryFileStream(SolrParams solrParams) {
params = solrParams;
delPolicy = core.getDeletionPolicy();
fileName = params.get(FILE);
cfileName = params.get(CONF_FILE_SHORT);
sOffset = params.get(OFFSET);
sLen = params.get(LEN);
compress = params.get(COMPRESSION);
useChecksum = params.getBool(CHECKSUM, false);
indexGen = params.getLong(GENERATION, null);
if (useChecksum) {
checksum = new Adler32();
}
//No throttle if MAX_WRITE_PER_SECOND is not specified
double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE);
rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
}
public void write(OutputStream out) throws IOException {
String fileName = params.get(FILE);
String cfileName = params.get(CONF_FILE_SHORT);
String sOffset = params.get(OFFSET);
String sLen = params.get(LEN);
String compress = params.get(COMPRESSION);
String sChecksum = params.get(CHECKSUM);
String sGen = params.get(GENERATION);
if (sGen != null) indexGen = Long.parseLong(sGen);
protected void initWrite() throws IOException {
if (sOffset != null) offset = Long.parseLong(sOffset);
if (sLen != null) len = Integer.parseInt(sLen);
if (fileName == null && cfileName == null) {
// no filename do nothing
writeNothingAndFlush();
}
buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
//reserve commit point till write is complete
if(indexGen != null) {
delPolicy.saveCommitPoint(indexGen);
}
}
protected void createOutputStream(OutputStream out) {
if (Boolean.parseBoolean(compress)) {
fos = new FastOutputStream(new DeflaterOutputStream(out));
} else {
fos = new FastOutputStream(out);
}
}
protected void releaseCommitPointAndExtendReserve() {
if(indexGen != null) {
//release the commit point as the write is complete
delPolicy.releaseCommitPoint(indexGen);
//Reserve the commit point for another 10s for the next file to be to fetched.
//We need to keep extending the commit reservation between requests so that the replica can fetch
//all the files correctly.
delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
}
}
public void write(OutputStream out) throws IOException {
createOutputStream(out);
int packetsWritten = 0;
IndexInput in = null;
try {
long offset = -1;
int len = -1;
// check if checksum is requested
boolean useChecksum = Boolean.parseBoolean(sChecksum);
if (sOffset != null) offset = Long.parseLong(sOffset);
if (sLen != null) len = Integer.parseInt(sLen);
if (fileName == null && cfileName == null) {
// no filename do nothing
writeNothing();
}
initWrite();
RefCounted<SolrIndexSearcher> sref = core.getSearcher();
Directory dir;
try {
@ -1147,17 +1194,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
in = dir.openInput(fileName, IOContext.READONCE);
// if offset is mentioned move the pointer to that point
if (offset != -1) in.seek(offset);
byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
Checksum checksum = null;
if (useChecksum) checksum = new Adler32();
long filelen = dir.fileLength(fileName);
long maxBytesBeforePause = 0;
while (true) {
offset = offset == -1 ? 0 : offset;
int read = (int) Math.min(buf.length, filelen - offset);
in.readBytes(buf, 0, read);
fos.writeInt((int) read);
fos.writeInt(read);
if (useChecksum) {
checksum.reset();
checksum.update(buf, 0, read);
@ -1165,13 +1210,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
fos.write(buf, 0, read);
fos.flush();
if (indexGen != null && (packetsWritten % 5 == 0)) {
// after every 5 packets reserve the commitpoint for some time
delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
//Pause if necessary
maxBytesBeforePause += read;
if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
rateLimiter.pause(maxBytesBeforePause);
maxBytesBeforePause = 0;
}
packetsWritten++;
if (read != buf.length) {
writeNothing();
writeNothingAndFlush();
fos.close();
break;
}
@ -1184,6 +1231,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
if (in != null) {
in.close();
}
releaseCommitPointAndExtendReserve();
}
}
@ -1191,12 +1239,14 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
/**
* Used to write a marker for EOF
*/
protected void writeNothing() throws IOException {
protected void writeNothingAndFlush() throws IOException {
fos.writeInt(0);
fos.flush();
}
}
/**This is used to write files in the conf directory.
*/
private class LocalFsFileStream extends DirectoryFileStream {
public LocalFsFileStream(SolrParams solrParams) {
@ -1205,39 +1255,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
@Override
public void write(OutputStream out) throws IOException {
String fileName = params.get(FILE);
String cfileName = params.get(CONF_FILE_SHORT);
String sOffset = params.get(OFFSET);
String sLen = params.get(LEN);
String compress = params.get(COMPRESSION);
String sChecksum = params.get(CHECKSUM);
String sGen = params.get(GENERATION);
if (sGen != null) indexGen = Long.parseLong(sGen);
if (Boolean.parseBoolean(compress)) {
fos = new FastOutputStream(new DeflaterOutputStream(out));
} else {
fos = new FastOutputStream(out);
}
createOutputStream(out);
FileInputStream inputStream = null;
int packetsWritten = 0;
try {
long offset = -1;
int len = -1;
//check if checksum is requested
boolean useChecksum = Boolean.parseBoolean(sChecksum);
if (sOffset != null)
offset = Long.parseLong(sOffset);
if (sLen != null)
len = Integer.parseInt(sLen);
if (fileName == null && cfileName == null) {
//no filename do nothing
writeNothing();
}
File file = null;
initWrite();
//if if is a conf file read from config diectory
file = new File(core.getResourceLoader().getConfigDir(), cfileName);
File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
if (file.exists() && file.canRead()) {
inputStream = new FileInputStream(file);
@ -1245,17 +1269,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
//if offset is mentioned move the pointer to that point
if (offset != -1)
channel.position(offset);
byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
Checksum checksum = null;
if (useChecksum)
checksum = new Adler32();
ByteBuffer bb = ByteBuffer.wrap(buf);
while (true) {
bb.clear();
long bytesRead = channel.read(bb);
if (bytesRead <= 0) {
writeNothing();
writeNothingAndFlush();
fos.close();
break;
}
@ -1267,19 +1287,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
fos.write(buf, 0, (int) bytesRead);
fos.flush();
if (indexGen != null && (packetsWritten % 5 == 0)) {
//after every 5 packets reserve the commitpoint for some time
delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
}
packetsWritten++;
}
} else {
writeNothing();
writeNothingAndFlush();
}
} catch (IOException e) {
LOG.warn("Exception while writing response for params: " + params, e);
} finally {
IOUtils.closeQuietly(inputStream);
releaseCommitPointAndExtendReserve();
}
}
}
@ -1328,6 +1344,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String SIZE = "size";
public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
public static final String CONF_FILE_SHORT = "cf";
public static final String CHECKSUM = "checksum";

View File

@ -0,0 +1,68 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<updateHandler class="solr.DirectUpdateHandler2">
</updateHandler>
<requestHandler name="standard" class="solr.StandardRequestHandler">
<bool name="httpCaching">true</bool>
</requestHandler>
<requestHandler name="/replication" class="solr.ReplicationHandler">
<lst name="defaults">
<str name="maxWriteMBPerSec">0.1</str>
</lst>
</requestHandler>
<!-- test query parameter defaults -->
<requestHandler name="defaults" class="solr.StandardRequestHandler">
<lst name="defaults">
<int name="rows">4</int>
<bool name="hl">true</bool>
<str name="hl.fl">text,name,subject,title,whitetok</str>
</lst>
</requestHandler>
<!-- test query parameter defaults -->
<requestHandler name="lazy" class="solr.StandardRequestHandler" startup="lazy">
<lst name="defaults">
<int name="rows">4</int>
<bool name="hl">true</bool>
<str name="hl.fl">text,name,subject,title,whitetok</str>
</lst>
</requestHandler>
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
<!-- enable streaming for testing... -->
<requestDispatcher handleSelect="true">
<requestParsers enableRemoteStreaming="true" multipartUploadLimitInKB="2048"/>
<httpCaching lastModifiedFrom="openTime" etagSeed="Solr" never304="false">
<cacheControl>max-age=30, public</cacheControl>
</httpCaching>
</requestDispatcher>
</config>

View File

@ -28,12 +28,16 @@ import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
@ -485,13 +489,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
private void invokeReplicationCommand(int pJettyPort, String pCommand) throws IOException
{
String masterUrl = buildUrl(pJettyPort) + "/replication?command=" + pCommand;
try {
URL u = new URL(masterUrl);
InputStream stream = u.openStream();
stream.close();
} catch (IOException e) {
//e.printStackTrace();
}
URL u = new URL(masterUrl);
InputStream stream = u.openStream();
stream.close();
}
@Test
@ -639,14 +639,6 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
// todo: make SolrJ easier to pass arbitrary params to
// TODO: precommit WILL screw with the rest of this test
String masterUrl = buildUrl(masterJetty.getLocalPort()) + "/update?prepareCommit=true";
URL url = new URL(masterUrl);
// InputStream stream = url.openStream();
// try {
// stream.close();
// } catch (IOException e) {
// //e.printStackTrace();
// }
masterClient.commit();
@ -655,15 +647,11 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
assertEquals(nDocs, masterQueryResult.getNumFound());
// snappull
masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/replication?command=fetchindex&masterUrl=";
String masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/replication?command=fetchindex&masterUrl=";
masterUrl += buildUrl(masterJetty.getLocalPort()) + "/replication";
url = new URL(masterUrl);
URL url = new URL(masterUrl);
InputStream stream = url.openStream();
try {
stream.close();
} catch (IOException e) {
//e.printStackTrace();
}
stream.close();
//get docs from slave and check if number is equal to master
NamedList slaveQueryRsp = rQuery(nDocs, "*:*", slaveClient);
@ -1005,7 +993,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
pullFromTo(slaveJetty, masterJetty);
}
private void pullFromTo(JettySolrRunner from, JettySolrRunner to) throws MalformedURLException, IOException {
private void pullFromTo(JettySolrRunner from, JettySolrRunner to) throws IOException {
String masterUrl;
URL url;
InputStream stream;
@ -1014,11 +1002,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
+ buildUrl(from.getLocalPort()) + "/replication";
url = new URL(masterUrl);
stream = url.openStream();
try {
stream.close();
} catch (IOException e) {
// e.printStackTrace();
}
stream.close();
}
@Test
@ -1295,6 +1279,110 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
checkForSingleIndex(masterJetty);
checkForSingleIndex(slaveJetty);
}
@Test
public void testRateLimitedReplication() throws Exception {
//clean index
masterClient.deleteByQuery("*:*");
slaveClient.deleteByQuery("*:*");
masterClient.commit();
slaveClient.commit();
masterJetty.stop();
slaveJetty.stop();
//Start master with the new solrconfig
master.copyConfigFile(CONF_DIR + "solrconfig-master-throttled.xml", "solrconfig.xml");
useFactory(null);
masterJetty = createJetty(master);
masterClient.shutdown();
masterClient = createNewSolrServer(masterJetty.getLocalPort());
//index docs
final int totalDocs = TestUtil.nextInt(random(), 50, 100);
for (int i = 0; i < totalDocs; i++)
index(masterClient, "id", i, "name", TestUtil.randomSimpleString(random(), 1000 , 5000));
masterClient.commit();
//Check Index Size
String dataDir = master.getDataDir();
masterClient.shutdown();
masterJetty.stop();
Directory dir = FSDirectory.open(Paths.get(dataDir, "index"));
String[] files = dir.listAll();
long totalBytes = 0;
for(String file : files) {
totalBytes += dir.fileLength(file);
}
float approximateTimeInSeconds = Math.round( totalBytes/1024/1024/0.1 ); // maxWriteMBPerSec=0.1 in solrconfig
//Start again and replicate the data
useFactory(null);
masterJetty = createJetty(master);
masterClient = createNewSolrServer(masterJetty.getLocalPort());
//start slave
slave.setTestPort(masterJetty.getLocalPort());
slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
slaveJetty = createJetty(slave);
slaveClient.shutdown();
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
long startTime = System.nanoTime();
pullFromMasterToSlave();
//Add a few more docs in the master. Just to make sure that we are replicating the correct index point
//These extra docs should not get replicated
new Thread(new AddExtraDocs(masterClient, totalDocs)).start();
//Wait and make sure that it actually replicated correctly.
NamedList slaveQueryRsp = rQuery(totalDocs, "*:*", slaveClient);
SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(totalDocs, slaveQueryResult.getNumFound());
long timeTaken = System.nanoTime() - startTime;
long timeTakenInSeconds = TimeUnit.SECONDS.convert(timeTaken, TimeUnit.NANOSECONDS);
//Let's make sure it took more than approximateTimeInSeconds to make sure that it was throttled
boolean isElapsed = false;
if(timeTakenInSeconds - approximateTimeInSeconds > 0) {
isElapsed = true;
}
assertTrue(isElapsed);
}
private class AddExtraDocs implements Runnable {
SolrServer masterClient;
int startId;
public AddExtraDocs(SolrServer masterClient, int startId) {
this.masterClient = masterClient;
this.startId = startId;
}
@Override
public void run() {
final int totalDocs = TestUtil.nextInt(random(), 1, 10);
for (int i = 0; i < totalDocs; i++) {
try {
index(masterClient, "id", i + startId, "name", TestUtil.randomSimpleString(random(), 1000 , 5000));
} catch (Exception e) {
//Do nothing. Wasn't able to add doc.
}
}
try {
masterClient.commit();
} catch (Exception e) {
//Do nothing. No extra doc got committed.
}
}
}
/**
* character copy of file using UTF-8. If port is non-null, will be substituted any time "TEST_PORT" is found.

View File

@ -121,6 +121,17 @@ public abstract class SolrParams implements Serializable {
}
}
/** Returns the Long value of the param, or null if not set */
public Long getLong(String param, Long def) {
String val = get(param);
try {
return val== null ? def : Long.parseLong(val);
}
catch( Exception ex ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, ex.getMessage(), ex );
}
}
/** Returns the int value of the param, or def if not set */
public int getInt(String param, int def) {
String val = get(param);