SOLR-829 -- Allow slaves to request compressed files from master during replication

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@720502 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-11-25 14:19:14 +00:00
parent 156491848a
commit 5e6e0e3258
5 changed files with 105 additions and 70 deletions

View File

@ -86,7 +86,10 @@ New Features
before Tokenizer/TokenFilters. (koji) before Tokenizer/TokenFilters. (koji)
16. SOLR-868: Adding solrjs as a contrib package: contrib/javascript. 16. SOLR-868: Adding solrjs as a contrib package: contrib/javascript.
(Matthias Epheser via ryan) (Matthias Epheser via ryan)
17. SOLR-829: Allow slaves to request compressed files from master during replication
(Simon Collins, Noble Paul, Akshay Ukey via shalin)
Optimizations Optimizations

View File

@ -403,6 +403,7 @@
</lst> </lst>
</requestHandler> </requestHandler>
<!-- Please refer to http://wiki.apache.org/solr/SolrReplication for details on configuring replication -->
<!--Master config--> <!--Master config-->
<!-- <!--
<requestHandler name="/replication" class="solr.ReplicationHandler" > <requestHandler name="/replication" class="solr.ReplicationHandler" >
@ -414,9 +415,9 @@
--> -->
<!-- Slave config--> <!-- Slave config-->
<!-- <!--
<requestHandler name="/replication" class="solr.ReplicationHandler" > <requestHandler name="/replication" class="solr.ReplicationHandler">
<lst name="slave"> <lst name="slave">
<str name="masterUrl">http://localhost:8983/solr/replication</str> <str name="masterUrl">http://localhost:8983/solr/replication</str>
<str name="pollInterval">00:00:60</str> <str name="pollInterval">00:00:60</str>
</lst> </lst>
</requestHandler> </requestHandler>

View File

@ -45,6 +45,7 @@ import java.util.*;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import java.util.zip.DeflaterOutputStream;
/** /**
* <p> A Handler which provides a REST API for replication and serves replication requests from Slaves. * <p> A Handler which provides a REST API for replication and serves replication requests from Slaves.
@ -126,7 +127,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} else if (command.equals(CMD_SNAP_SHOOT)) { } else if (command.equals(CMD_SNAP_SHOOT)) {
doSnapShoot(rsp); doSnapShoot(rsp);
} else if (command.equals(CMD_SNAP_PULL)) { } else if (command.equals(CMD_SNAP_PULL)) {
doSnapPull(); new Thread() {
public void run() {
doSnapPull();
}
}.start();
rsp.add("status", "OK");
} else if (command.equals(CMD_DISABLE_POLL)) { } else if (command.equals(CMD_DISABLE_POLL)) {
if (snapPuller != null) if (snapPuller != null)
snapPuller.disablePoll(); snapPuller.disablePoll();
@ -501,16 +507,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
closeNoExp(inFile); closeNoExp(inFile);
} }
HttpClient client = null;
try { try {
client = new HttpClient(); NamedList nl = snapPuller.getCommandResponse(CMD_DETAILS);
NamedList nl = snapPuller.getCommandResponse(client, CMD_DETAILS);
details.add("masterDetails", nl.get(CMD_DETAILS)); details.add("masterDetails", nl.get(CMD_DETAILS));
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception while invoking a 'details' method on master ", e); LOG.warn("Exception while invoking a 'details' method on master ", e);
} finally {
if (client != null)
client.getHttpConnectionManager().closeIdleConnections(0);
} }
details.add(MASTER_URL, snapPuller.getMasterUrl()); details.add(MASTER_URL, snapPuller.getMasterUrl());
if (snapPuller.getPollInterval() != null) { if (snapPuller.getPollInterval() != null) {
@ -776,15 +777,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
delPolicy = core.getDeletionPolicy(); delPolicy = core.getDeletionPolicy();
} }
public void write(OutputStream out) { public void write(OutputStream out) throws IOException {
fos = new FastOutputStream(out);
String fileName = params.get(FILE); String fileName = params.get(FILE);
String cfileName = params.get(CONF_FILE_SHORT); String cfileName = params.get(CONF_FILE_SHORT);
String sOffset = params.get(OFFSET); String sOffset = params.get(OFFSET);
String sLen = params.get(LEN); String sLen = params.get(LEN);
String compress = params.get(COMPRESSION);
String sChecksum = params.get(CHECKSUM); String sChecksum = params.get(CHECKSUM);
String sindexVersion = params.get(CMD_INDEX_VERSION); String sindexVersion = params.get(CMD_INDEX_VERSION);
if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion); if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
if (Boolean.parseBoolean(compress)) {
fos = new FastOutputStream(new DeflaterOutputStream(out));
} else {
fos = new FastOutputStream(out);
}
FileInputStream inputStream = null; FileInputStream inputStream = null;
int packetsWritten = 0; int packetsWritten = 0;
try { try {
@ -918,4 +924,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String RESERVE = "commitReserveDuration"; public static final String RESERVE = "commitReserveDuration";
public static final String COMPRESSION = "compression";
public static final String EXTERNAL = "external";
public static final String INTERNAL = "internal";
} }

View File

@ -16,9 +16,7 @@
*/ */
package org.apache.solr.handler; package org.apache.solr.handler;
import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -46,6 +44,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
/** /**
* <p/> Provides functionality equivalent to the snappull script as well as a * <p/> Provides functionality equivalent to the snappull script as well as a
@ -88,10 +88,15 @@ public class SnapPuller {
private volatile boolean stop = false; private volatile boolean stop = false;
private boolean useInternal = false;
private boolean useExternal = false;
/** /**
* Disable the timer task for polling * Disable the timer task for polling
*/ */
private AtomicBoolean pollDisabled = new AtomicBoolean(false); private AtomicBoolean pollDisabled = new AtomicBoolean(false);
private final HttpClient client = new HttpClient(new MultiThreadedHttpConnectionManager());
public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore sc) { public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore sc) {
solrCore = sc; solrCore = sc;
@ -102,6 +107,9 @@ public class SnapPuller {
this.replicationHandler = handler; this.replicationHandler = handler;
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL); pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
pollInterval = readInterval(pollIntervalStr); pollInterval = readInterval(pollIntervalStr);
String compress = (String) initArgs.get(COMPRESSION);
useInternal = INTERNAL.equals(compress);
useExternal = EXTERNAL.equals(compress);
if (pollInterval != null && pollInterval > 0) { if (pollInterval != null && pollInterval > 0) {
startExecutorService(); startExecutorService();
} else { } else {
@ -133,21 +141,21 @@ public class SnapPuller {
* Gets the latest commit version and generation from the master * Gets the latest commit version and generation from the master
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
NamedList getLatestVersion(HttpClient client) throws IOException { NamedList getLatestVersion() throws IOException {
PostMethod post = new PostMethod(masterUrl); PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_INDEX_VERSION); post.addParameter(COMMAND, CMD_INDEX_VERSION);
post.addParameter("wt", "javabin"); post.addParameter("wt", "javabin");
return getNamedListResponse(client, post); return getNamedListResponse(post);
} }
NamedList getCommandResponse(HttpClient client, String cmd) throws IOException { NamedList getCommandResponse(String cmd) throws IOException {
PostMethod post = new PostMethod(masterUrl); PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, cmd); post.addParameter(COMMAND, cmd);
post.addParameter("wt", "javabin"); post.addParameter("wt", "javabin");
return getNamedListResponse(client, post); return getNamedListResponse(post);
} }
private NamedList getNamedListResponse(HttpClient client, PostMethod method) throws IOException { private NamedList getNamedListResponse(PostMethod method) throws IOException {
try { try {
int status = client.executeMethod(method); int status = client.executeMethod(method);
if (status != HttpStatus.SC_OK) { if (status != HttpStatus.SC_OK) {
@ -166,12 +174,12 @@ public class SnapPuller {
/** /**
* Fetches the list of files in a given index commit point * Fetches the list of files in a given index commit point
*/ */
void fetchFileList(long version, HttpClient client) throws IOException { void fetchFileList(long version) throws IOException {
PostMethod post = new PostMethod(masterUrl); PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_GET_FILE_LIST); post.addParameter(COMMAND, CMD_GET_FILE_LIST);
post.addParameter(CMD_INDEX_VERSION, String.valueOf(version)); post.addParameter(CMD_INDEX_VERSION, String.valueOf(version));
post.addParameter("wt", "javabin"); post.addParameter("wt", "javabin");
NamedList nl = getNamedListResponse(client, post); NamedList nl = getNamedListResponse(post);
List<Map<String, Object>> f = (List<Map<String, Object>>) nl.get(CMD_GET_FILE_LIST); List<Map<String, Object>> f = (List<Map<String, Object>>) nl.get(CMD_GET_FILE_LIST);
if (f != null) if (f != null)
filesToDownload = Collections.synchronizedList(f); filesToDownload = Collections.synchronizedList(f);
@ -191,15 +199,10 @@ public class SnapPuller {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
boolean fetchLatestIndex(SolrCore core) throws IOException { boolean fetchLatestIndex(SolrCore core) throws IOException {
HttpClient client = null;
replicationStartTime = System.currentTimeMillis(); replicationStartTime = System.currentTimeMillis();
try { try {
client = new HttpClient();
// The closing is done in a different thread. So use multiThreaded conn manager
// else it prints out a warning
client.setHttpConnectionManager(new MultiThreadedHttpConnectionManager());
//get the current 'replicateable' index version in the master //get the current 'replicateable' index version in the master
NamedList response = getLatestVersion(client); NamedList response = getLatestVersion();
long latestVersion = (Long) response.get(CMD_INDEX_VERSION); long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION); long latestGeneration = (Long) response.get(GENERATION);
if (latestVersion == 0L) { if (latestVersion == 0L) {
@ -224,7 +227,7 @@ public class SnapPuller {
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration()); LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
LOG.info("Starting replication process"); LOG.info("Starting replication process");
// get the list of files first // get the list of files first
fetchFileList(latestVersion, client); fetchFileList(latestVersion);
LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size()); LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size());
// use a synchronized list because the list is read by other threads (to show details) // use a synchronized list because the list is read by other threads (to show details)
@ -238,11 +241,11 @@ public class SnapPuller {
boolean successfulInstall = false; boolean successfulInstall = false;
try { try {
File indexDir = new File(core.getIndexDir()); File indexDir = new File(core.getIndexDir());
downloadIndexFiles(isSnapNeeded, tmpIndexDir, client, latestVersion); downloadIndexFiles(isSnapNeeded, tmpIndexDir, latestVersion);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs"); LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) { if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
downloadConfFiles(client, confFilesToDownload, latestVersion); downloadConfFiles(confFilesToDownload, latestVersion);
if (isSnapNeeded) { if (isSnapNeeded) {
modifyIndexProps(tmpIndexDir.getName()); modifyIndexProps(tmpIndexDir.getName());
} else { } else {
@ -283,7 +286,6 @@ public class SnapPuller {
replicationStartTime = 0; replicationStartTime = 0;
fileFetcher = null; fileFetcher = null;
stop = false; stop = false;
client.getHttpConnectionManager().closeIdleConnections(0);
} }
} }
@ -380,7 +382,7 @@ public class SnapPuller {
}.start(); }.start();
} }
private void downloadConfFiles(HttpClient client, List<Map<String, Object>> confFilesToDownload, long latestVersion) throws Exception { private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestVersion) throws Exception {
LOG.info("Starting download of configuration files from master: " + confFilesToDownload); LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
@ -390,7 +392,7 @@ public class SnapPuller {
"Failed to create temporary config folder: " + tmpconfDir.getName()); "Failed to create temporary config folder: " + tmpconfDir.getName());
} }
for (Map<String, Object> file : confFilesToDownload) { for (Map<String, Object> file : confFilesToDownload) {
fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), client, true, latestVersion); fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), true, latestVersion);
currentFile = file; currentFile = file;
fileFetcher.fetchFile(); fileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file)); confFilesDownloaded.add(new HashMap<String, Object>(file));
@ -401,18 +403,15 @@ public class SnapPuller {
/** /**
* Download the index files. If a new index is needed, download all the files. * Download the index files. If a new index is needed, download all the files.
* *
* @param downloadCompleteIndex is it a fresh index copy * @param downloadCompleteIndex is it a fresh index copy
* @param snapDir the directory to which files need to be downloadeed to * @param snapDir the directory to which files need to be downloadeed to
* @param client the httpclient instance * @param latestVersion the version number
* @param latestVersion the version number
*/ */
private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir, private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir, long latestVersion) throws Exception {
HttpClient client, long latestVersion) throws Exception {
for (Map<String, Object> file : filesToDownload) { for (Map<String, Object> file : filesToDownload) {
File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME)); File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME));
if (!localIndexFile.exists() || downloadCompleteIndex) { if (!localIndexFile.exists() || downloadCompleteIndex) {
fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME), fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME), false, latestVersion);
client, false, latestVersion);
currentFile = file; currentFile = file;
fileFetcher.fetchFile(); fileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String, Object>(file)); filesDownloaded.add(new HashMap<String, Object>(file));
@ -445,7 +444,7 @@ public class SnapPuller {
/** /**
* Copy a file by the File#renameTo() method. If it fails, it is considered * Copy a file by the File#renameTo() method. If it fails, it is considered
* a failure * a failure
* * <p/>
* Todo may be we should try a simple copy if it fails * Todo may be we should try a simple copy if it fails
*/ */
private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) { private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) {
@ -688,6 +687,7 @@ public class SnapPuller {
/** /**
* The class acts as a client for ReplicationHandler.FileStream. * The class acts as a client for ReplicationHandler.FileStream.
* It understands the protocol of wt=filestream * It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.FileStream * @see org.apache.solr.handler.ReplicationHandler.FileStream
*/ */
private class FileFetcher { private class FileFetcher {
@ -703,8 +703,6 @@ public class SnapPuller {
long bytesDownloaded = 0; long bytesDownloaded = 0;
HttpClient client;
FileChannel fileChannel; FileChannel fileChannel;
byte[] buf = new byte[1024 * 1024]; byte[] buf = new byte[1024 * 1024];
@ -724,11 +722,10 @@ public class SnapPuller {
private Long indexVersion; private Long indexVersion;
FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs, FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
HttpClient client, boolean isConf, long latestVersion) throws FileNotFoundException { boolean isConf, long latestVersion) throws FileNotFoundException {
this.snapDir = dir; this.snapDir = dir;
this.fileName = (String) fileDetails.get(NAME); this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE); this.size = (Long) fileDetails.get(SIZE);
this.client = client;
this.isConf = isConf; this.isConf = isConf;
this.saveAs = saveAs; this.saveAs = saveAs;
indexVersion = latestVersion; indexVersion = latestVersion;
@ -759,18 +756,7 @@ public class SnapPuller {
} }
//if there is an error continue. But continue from the point where it got broken //if there is an error continue. But continue from the point where it got broken
} finally { } finally {
// closing Inputstream and HTTP connection takes a long time, closeNoExp(is);
// so replication status shows as 'replicating' even though it is aborted.
new Thread() {
public void run() {
closeNoExp(is);
try {
if (post != null)
post.releaseConnection();
} catch (Exception e) {
}
}
}.start();
} }
} }
} finally { } finally {
@ -870,6 +856,9 @@ public class SnapPuller {
fileChannel.close(); fileChannel.close();
} catch (Exception e) {/* noop */ } catch (Exception e) {/* noop */
} }
try {
post.releaseConnection();
} catch (Exception e) {}
if (bytesDownloaded != size) { if (bytesDownloaded != size) {
//if the download is not complete then //if the download is not complete then
//delete the file being downloaded //delete the file being downloaded
@ -901,6 +890,12 @@ public class SnapPuller {
} else { } else {
post.addParameter(FILE, fileName); post.addParameter(FILE, fileName);
} }
if (useInternal) {
post.addParameter(COMPRESSION, "true");
}
if (useExternal) {
post.setRequestHeader(new Header("Accept-Encoding", "gzip,deflate"));
}
//use checksum //use checksum
if (this.includeChecksum) if (this.includeChecksum)
post.addParameter(CHECKSUM, "true"); post.addParameter(CHECKSUM, "true");
@ -914,10 +909,42 @@ public class SnapPuller {
client.executeMethod(post); client.executeMethod(post);
InputStream is = post.getResponseBodyAsStream(); InputStream is = post.getResponseBodyAsStream();
//wrap it using FastInputStream //wrap it using FastInputStream
if (useInternal) {
is = new InflaterInputStream(is);
} else if (useExternal) {
is = checkCompressed(post, is);
}
return new FastInputStream(is); return new FastInputStream(is);
} }
} }
/*
* This is copied from CommonsHttpSolrServer
*/
private InputStream checkCompressed(HttpMethod method, InputStream respBody) throws IOException {
Header contentEncodingHeader = method.getResponseHeader("Content-Encoding");
if (contentEncodingHeader != null) {
String contentEncoding = contentEncodingHeader.getValue();
if (contentEncoding.contains("gzip")) {
respBody = new GZIPInputStream(respBody);
} else if (contentEncoding.contains("deflate")) {
respBody = new InflaterInputStream(respBody);
}
} else {
Header contentTypeHeader = method.getResponseHeader("Content-Type");
if (contentTypeHeader != null) {
String contentType = contentTypeHeader.getValue();
if (contentType != null) {
if (contentType.startsWith("application/x-gzip-compressed")) {
respBody = new GZIPInputStream(respBody);
} else if (contentType.startsWith("application/x-deflate")) {
respBody = new InflaterInputStream(respBody);
}
}
}
}
return respBody;
}
static Integer readInterval(String interval) { static Integer readInterval(String interval) {
Pattern pattern = Pattern.compile(INTERVAL_PATTERN); Pattern pattern = Pattern.compile(INTERVAL_PATTERN);
@ -954,6 +981,7 @@ public class SnapPuller {
public void destroy() { public void destroy() {
if (executorService != null) executorService.shutdown(); if (executorService != null) executorService.shutdown();
client.getHttpConnectionManager().closeIdleConnections(0);
} }
String getMasterUrl() { String getMasterUrl() {

View File

@ -311,21 +311,12 @@
String abortParam = request.getParameter("abort"); String abortParam = request.getParameter("abort");
if (replicateParam != null) if (replicateParam != null)
if (replicateParam.equals("now")) { if (replicateParam.equals("now")) {
new Thread() { executeCommand("snappull", solrcore, rh);
public void run() {
executeCommand("snappull", solrcore, rh);
}
}.start();
} }
if (abortParam != null) if (abortParam != null)
if (abortParam.equals("stop")) { if (abortParam.equals("stop")) {
new Thread() { executeCommand("abortsnappull", solrcore, rh);
public void run() {
executeCommand("abortsnappull", solrcore, rh);
}
}.start();
} }
%> %>
</td> </td>