mirror of https://github.com/apache/lucene.git
SOLR-561: add more comments
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@708578 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f12ed3dee6
commit
eb5ae4dcc0
|
@ -106,13 +106,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
rsp.add("status", "OK");
|
||||
return;
|
||||
}
|
||||
//This command does not give the current index version of the master
|
||||
// It gives the current replicateable index version
|
||||
if (command.equals(CMD_INDEX_VERSION)) {
|
||||
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
|
||||
if (commitPoint != null) {
|
||||
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
|
||||
rsp.add(GENERATION, commitPoint.getGeneration());
|
||||
} else {
|
||||
// must never happen
|
||||
// This happens when replicateAfter does not have startup and no commit/optimize
|
||||
// has happened yet.
|
||||
rsp.add(CMD_INDEX_VERSION, 0L);
|
||||
rsp.add(GENERATION, 0L);
|
||||
}
|
||||
|
@ -134,6 +137,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
if (snapPuller != null)
|
||||
snapPuller.abortPull();
|
||||
} else if (command.equals(CMD_FILE_CHECKSUM)) {
|
||||
// this command is not used by anyone
|
||||
getFileChecksum(solrParams, rsp);
|
||||
} else if (command.equals(CMD_SHOW_COMMITS)) {
|
||||
rsp.add(CMD_SHOW_COMMITS, getCommits());
|
||||
|
@ -160,6 +164,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
return l;
|
||||
}
|
||||
|
||||
/**Gets the checksum of a file
|
||||
*/
|
||||
private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) {
|
||||
Checksum checksum = new Adler32();
|
||||
File dir = new File(core.getIndexDir());
|
||||
|
@ -225,6 +231,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
}
|
||||
|
||||
/**This method adds an Object of FileStream to the resposnse .
|
||||
* The FileStream implements a custom protocol which is also understoop by the SnapPuller
|
||||
*/
|
||||
private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
|
||||
ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
|
||||
rawParams.set(CommonParams.WT, FILE_STREAM);
|
||||
|
@ -244,9 +253,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
rsp.add("status", "invalid indexversion");
|
||||
return;
|
||||
}
|
||||
// reserve the indexcommit for sometime
|
||||
core.getDeletionPolicy().setReserveDuration(version, reserveCommitDuration);
|
||||
List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
|
||||
try {
|
||||
//get all the files in the commit
|
||||
Collection<String> files = commit.getFileNames();
|
||||
for (String fileName : files) {
|
||||
File file = new File(core.getIndexDir(), fileName);
|
||||
|
@ -263,10 +274,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
if (includeConfFiles == null)
|
||||
return;
|
||||
LOG.debug("Adding config files to list: " + includeConfFiles);
|
||||
//if configuration files need to be included get their details
|
||||
List<Map<String, Object>> confFiles = getConfFileCache(includeConfFiles);
|
||||
rsp.add(CONF_FILES, confFiles);
|
||||
}
|
||||
|
||||
/** for configuration files checksum of the file also is included
|
||||
* because ,unlike index ,files they may have same content but different timestamps
|
||||
* The local conf files information is cached so that everytime it does not have to
|
||||
* read the file content. The cache is refreshed only if the lastmodified of the file changes
|
||||
*/
|
||||
List<Map<String, Object>> getConfFileCache(Collection<String> filenames) {
|
||||
List<Map<String, Object>> confFiles = new ArrayList<Map<String, Object>>();
|
||||
synchronized (confFileInfoCache) {
|
||||
|
@ -348,6 +365,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
return size;
|
||||
}
|
||||
|
||||
/**Collects the details such as name, size ,lasmodified of a file
|
||||
*/
|
||||
private Map<String, Object> getFileInfo(File file) {
|
||||
Map<String, Object> fileMeta = new HashMap<String, Object>();
|
||||
fileMeta.put(NAME, file.getName());
|
||||
|
@ -658,6 +677,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
}
|
||||
|
||||
/**register a closehook
|
||||
*/
|
||||
private void registerCloseHook() {
|
||||
core.addCloseHook(new CloseHook() {
|
||||
public void close(SolrCore core) {
|
||||
|
@ -668,6 +689,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
});
|
||||
}
|
||||
|
||||
/**A responsewriter is registered automatically for wt=filestream
|
||||
*/
|
||||
private void registerFileStreamResponseWriter() {
|
||||
core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() {
|
||||
public void write(OutputStream out, SolrQueryRequest request, SolrQueryResponse resp) throws IOException {
|
||||
|
@ -688,6 +711,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
}
|
||||
|
||||
/**Register a listener for postcommit/optimize
|
||||
* @param snapshoot do a snapshoot
|
||||
* @param getCommit get a commitpoint also
|
||||
* @return an instance of the eventlistener
|
||||
*/
|
||||
|
||||
private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
|
||||
return new SolrEventListener() {
|
||||
public void init(NamedList args) {/*no op*/ }
|
||||
|
@ -744,24 +773,29 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
try {
|
||||
long offset = -1;
|
||||
int len = -1;
|
||||
//check if checksum is requested
|
||||
boolean useChecksum = Boolean.parseBoolean(sChecksum);
|
||||
if (sOffset != null)
|
||||
offset = Long.parseLong(sOffset);
|
||||
if (sLen != null)
|
||||
len = Integer.parseInt(sLen);
|
||||
if (fileName == null && cfileName == null) {
|
||||
//no filename do nothing
|
||||
writeNothing();
|
||||
}
|
||||
|
||||
File file = null;
|
||||
if (cfileName != null) {
|
||||
//if if is a conf file read from config diectory
|
||||
file = new File(core.getResourceLoader().getConfigDir(), cfileName);
|
||||
} else {
|
||||
//else read from the indexdirectory
|
||||
file = new File(core.getIndexDir(), fileName);
|
||||
}
|
||||
if (file.exists() && file.canRead()) {
|
||||
inputStream = new FileInputStream(file);
|
||||
FileChannel channel = inputStream.getChannel();
|
||||
//if offset is mentioned move the pointer to that point
|
||||
if (offset != -1)
|
||||
channel.position(offset);
|
||||
byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
|
||||
|
@ -787,6 +821,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
fos.write(buf, 0, (int) bytesRead);
|
||||
fos.flush();
|
||||
if(indexVersion != null && (packetsWritten % 5 == 0)){
|
||||
//after every 5 packets reserve the commitpoint for some time
|
||||
delPolicy.setReserveDuration(indexVersion, reserveCommitDuration);
|
||||
}
|
||||
packetsWritten++;
|
||||
|
|
|
@ -192,7 +192,8 @@ public class SnapPuller {
|
|||
|
||||
/**
|
||||
* This command downloads all the necessary files from master to install a
|
||||
* snapshot. Only changed files are downloaded.
|
||||
* snapshot. Only changed files are downloaded. it also downloads the
|
||||
* conf files (if they are modified)
|
||||
*
|
||||
* @param core the SolrCore
|
||||
* @return true on success, false if slave is already in sync
|
||||
|
@ -204,11 +205,15 @@ public class SnapPuller {
|
|||
replicationStartTime = System.currentTimeMillis();
|
||||
try {
|
||||
client = new HttpClient();
|
||||
// The closing is done in a different thread. So use multiThreaded conn manager
|
||||
// else it prints out a warning
|
||||
client.setHttpConnectionManager(new MultiThreadedHttpConnectionManager());
|
||||
//get the current 'replicateable' index version in the master
|
||||
NamedList response = getLatestVersion(client);
|
||||
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
|
||||
long latestGeneration = (Long) response.get(GENERATION);
|
||||
if (latestVersion == 0L) {
|
||||
//there is nothing to be replicated
|
||||
return false;
|
||||
}
|
||||
IndexCommit commit;
|
||||
|
@ -221,16 +226,21 @@ public class SnapPuller {
|
|||
searcherRefCounted.decref();
|
||||
}
|
||||
if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
|
||||
//master and slave are alsready in sync just return
|
||||
LOG.info("Slave in sync with master.");
|
||||
return false;
|
||||
}
|
||||
LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
|
||||
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
|
||||
LOG.info("Starting replication process");
|
||||
// get the list of files first
|
||||
fetchFileList(latestVersion, client);
|
||||
LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size());
|
||||
|
||||
// use a synchronized list because the list is read by other threads (to show details)
|
||||
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
|
||||
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
|
||||
// then a new index direcory to be created and all the files need to be copied
|
||||
boolean isSnapNeeded = commit.getGeneration() >= latestGeneration;
|
||||
File tmpIndexDir = createTempindexDir(core);
|
||||
if (isIndexStale())
|
||||
|
@ -353,6 +363,9 @@ public class SnapPuller {
|
|||
solrCore.getUpdateHandler().commit(cmd);
|
||||
}
|
||||
|
||||
|
||||
/**All the files are copied to a temp dir first
|
||||
*/
|
||||
private File createTempindexDir(SolrCore core) {
|
||||
String snapName = "index."
|
||||
+ new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date());
|
||||
|
@ -391,6 +404,12 @@ public class SnapPuller {
|
|||
copyTmpConfFiles2Conf(tmpconfDir);
|
||||
}
|
||||
|
||||
/** download the index files. if snap needed download all the files .
|
||||
* @param snapNeeded is it a fresh index copy
|
||||
* @param snapDir the directory to which files need to be downloadeed to
|
||||
* @param client the httpclient instance
|
||||
* @param latestVersion the version number
|
||||
*/
|
||||
private void downloadIndexFiles(boolean snapNeeded, File snapDir,
|
||||
HttpClient client, long latestVersion) throws Exception {
|
||||
for (Map<String, Object> file : filesToDownload) {
|
||||
|
@ -407,6 +426,9 @@ public class SnapPuller {
|
|||
}
|
||||
}
|
||||
|
||||
/**All the files which are common between master and slave must have
|
||||
* same timestamp and size else we assume they are not compatible (stale)
|
||||
*/
|
||||
private boolean isIndexStale() {
|
||||
for (Map<String, Object> file : filesToDownload) {
|
||||
File localIndexFile = new File(solrCore.getIndexDir(), (String) file
|
||||
|
@ -421,6 +443,10 @@ public class SnapPuller {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**Copy a file by the File#renameTo() method. if it fails , it is considered
|
||||
* a failure
|
||||
* todo may be we should try a simple copy if it fails
|
||||
*/
|
||||
private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) {
|
||||
File indexFileInSnap = new File(snapDir, fname);
|
||||
File indexFileInIndex = new File(indexDir, fname);
|
||||
|
@ -439,11 +465,17 @@ public class SnapPuller {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**Copy all index files from the temp index dir to the actual index
|
||||
*/
|
||||
|
||||
private boolean copyIndexFiles(File snapDir, File indexDir) {
|
||||
String segmentsFile = null;
|
||||
List<String> copiedfiles = new ArrayList<String>();
|
||||
for (Map<String, Object> f : filesDownloaded) {
|
||||
String fname = (String) f.get(NAME);
|
||||
// the segments file must be copied last
|
||||
// or else if there is a failure in between the
|
||||
// index will be corrupted
|
||||
if (fname.startsWith("segments_")) {
|
||||
//The segments file must be copied in the end
|
||||
//Otherwise , if the copy fails index ends up corrupted
|
||||
|
@ -453,12 +485,16 @@ public class SnapPuller {
|
|||
if (!copyAFile(snapDir, indexDir, fname, copiedfiles)) return false;
|
||||
copiedfiles.add(fname);
|
||||
}
|
||||
//copy the segments file last
|
||||
if (segmentsFile != null) {
|
||||
if (!copyAFile(snapDir, indexDir, segmentsFile, copiedfiles)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**The conf files are copied to the tmp dir to the config dir
|
||||
* A backup of the old file is maintained
|
||||
*/
|
||||
private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException {
|
||||
File confDir = new File(solrCore.getResourceLoader().getConfigDir());
|
||||
try {
|
||||
|
@ -487,6 +523,9 @@ public class SnapPuller {
|
|||
return new SimpleDateFormat(SnapShooter.DATE_FMT).format(d);
|
||||
}
|
||||
|
||||
/**if the index is stale by any chance. use the new feature of solr to load index
|
||||
* from a different dir in the data dir.
|
||||
*/
|
||||
private void modifyIndexProps(String snap) {
|
||||
LOG.info("New index installed. Updating index properties...");
|
||||
File idxprops = new File(solrCore.getDataDir() + "index.properties");
|
||||
|
@ -515,6 +554,9 @@ public class SnapPuller {
|
|||
}
|
||||
}
|
||||
|
||||
/**The local conf files are compared with the conf files in the master. If they are
|
||||
* same (by checksum) do not copy
|
||||
*/
|
||||
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
|
||||
if (confFilesToDownload == null || confFilesToDownload.isEmpty())
|
||||
return Collections.EMPTY_LIST;
|
||||
|
@ -534,6 +576,8 @@ public class SnapPuller {
|
|||
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
|
||||
}
|
||||
|
||||
/**delete the directree recursively
|
||||
*/
|
||||
static boolean delTree(File dir) {
|
||||
if (dir == null || !dir.exists())
|
||||
return false;
|
||||
|
@ -554,21 +598,22 @@ public class SnapPuller {
|
|||
return dir.delete();
|
||||
}
|
||||
|
||||
/**periodic polling is disabled
|
||||
*/
|
||||
void disablePoll() {
|
||||
pollDisabled.set(true);
|
||||
LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable polling
|
||||
* Enable periodic polling
|
||||
*/
|
||||
void enablePoll() {
|
||||
pollDisabled.set(false);
|
||||
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the ongoing pull
|
||||
/** Stops the ongoing pull
|
||||
*/
|
||||
void abortPull() {
|
||||
stop = true;
|
||||
|
@ -578,9 +623,12 @@ public class SnapPuller {
|
|||
return replicationStartTime;
|
||||
}
|
||||
|
||||
/**used by details page for display.
|
||||
*/
|
||||
List<Map<String, Object>> getConfFilesToDownload() {
|
||||
//make a copy first because it can be null later
|
||||
List<Map<String, Object>> tmp = confFilesToDownload;
|
||||
//create a new instance. or else iterator may fail
|
||||
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
|
||||
}
|
||||
|
||||
|
@ -632,6 +680,10 @@ public class SnapPuller {
|
|||
|
||||
}
|
||||
|
||||
/**The class acts as a client for ReplicationHandler.FileStream.
|
||||
* It understands the protoolc well
|
||||
*
|
||||
*/
|
||||
private class FileFetcher {
|
||||
boolean includeChecksum = true;
|
||||
|
||||
|
@ -681,18 +733,25 @@ public class SnapPuller {
|
|||
checksum = new Adler32();
|
||||
}
|
||||
|
||||
/**The main method which downloads file
|
||||
* @throws Exception
|
||||
*/
|
||||
void fetchFile() throws Exception {
|
||||
try {
|
||||
while (true) {
|
||||
final FastInputStream is = getStream();
|
||||
int result;
|
||||
try {
|
||||
//fetch packets one by one in a single request
|
||||
result = fetchPackets(is);
|
||||
if (result == 0 || result == NO_CONTENT) {
|
||||
// if the file is downloaded properly set the
|
||||
// timestamp same as that in the server
|
||||
if (file.exists())
|
||||
file.setLastModified(lastmodified);
|
||||
return;
|
||||
}
|
||||
//if there is an error continue. But continue from the point where it got broken
|
||||
} finally {
|
||||
//closing Inputstream and HTTP connection takes a long time,
|
||||
// so replication status shows as 'replicating' even though it is aborted.
|
||||
|
@ -725,6 +784,7 @@ public class SnapPuller {
|
|||
}
|
||||
long checkSumServer = -1;
|
||||
fis.readFully(intbytes);
|
||||
//read the size of the packet
|
||||
int packetSize = readInt(intbytes);
|
||||
if (packetSize <= 0) {
|
||||
LOG.warn("No content recieved for file: " + currentFile);
|
||||
|
@ -733,30 +793,38 @@ public class SnapPuller {
|
|||
if (buf.length < packetSize)
|
||||
buf = new byte[packetSize];
|
||||
if (checksum != null) {
|
||||
//read the checksum
|
||||
fis.readFully(longbytes);
|
||||
checkSumServer = readLong(longbytes);
|
||||
}
|
||||
//then read the packet of bytes
|
||||
fis.readFully(buf, 0, packetSize);
|
||||
//compare the checksum as sent from the master
|
||||
if (includeChecksum) {
|
||||
checksum.reset();
|
||||
checksum.update(buf, 0, packetSize);
|
||||
long checkSumClient = checksum.getValue();
|
||||
if (checkSumClient != checkSumServer) {
|
||||
LOG.error("Checksum not matched between client and server for: " + currentFile);
|
||||
//if checksum is wrong it is a problem return for retry
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
//if everything is fine, write down the packet to the file
|
||||
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
|
||||
bytesDownloaded += packetSize;
|
||||
if (bytesDownloaded >= size)
|
||||
return 0;
|
||||
//errorcount is always set to zero after a successful packet
|
||||
errorCount = 0;
|
||||
}
|
||||
} catch (ReplicationHandlerException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error in fetching packets ", e);
|
||||
//for any failure , increment the error count
|
||||
errorCount++;
|
||||
//if it fails for the same pacaket for MAX_RETRIES fail and come out
|
||||
if (errorCount > MAX_RETRIES) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Snappull failed for file:" + fileName, e);
|
||||
|
@ -769,9 +837,6 @@ public class SnapPuller {
|
|||
* The webcontainer flushes the data only after it fills the buffer size.
|
||||
* So, all data has to be read as readFully() other wise it fails. So read
|
||||
* everything as bytes and then extract int out of it
|
||||
*
|
||||
* @param b
|
||||
* @return
|
||||
*/
|
||||
private int readInt(byte[] b) {
|
||||
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
|
||||
|
@ -781,9 +846,6 @@ public class SnapPuller {
|
|||
|
||||
/**
|
||||
* Same as above but to read long
|
||||
*
|
||||
* @param b
|
||||
* @return
|
||||
*/
|
||||
private long readLong(byte[] b) {
|
||||
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
|
||||
|
@ -793,17 +855,23 @@ public class SnapPuller {
|
|||
|
||||
}
|
||||
|
||||
/**cleanup everything
|
||||
*/
|
||||
private void cleanup() {
|
||||
try {
|
||||
//close the file
|
||||
fileChannel.close();
|
||||
} catch (Exception e) {/* noop */
|
||||
}
|
||||
if (bytesDownloaded != size) {
|
||||
//if the download is notcomplete then
|
||||
//delete the file being downloaded
|
||||
try {
|
||||
file.delete();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error deleting file in cleanup" + e.getMessage());
|
||||
}
|
||||
//if the failure is due to a user abort it is returned nomally else an exception is thrown
|
||||
if (!aborted)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to download " + fileName + " completely. Downloaded "
|
||||
|
@ -811,23 +879,33 @@ public class SnapPuller {
|
|||
}
|
||||
}
|
||||
|
||||
/**Open a new stream using HttpClient
|
||||
*/
|
||||
FastInputStream getStream() throws IOException {
|
||||
post = new PostMethod(masterUrl);
|
||||
//the method is command=filecontent
|
||||
post.addParameter(COMMAND, CMD_GET_FILE);
|
||||
//add the version to download. This is used to reserve the download
|
||||
post.addParameter(CMD_INDEX_VERSION, indexVersion.toString());
|
||||
if (isConf) {
|
||||
//set cf instead of file for config file
|
||||
post.addParameter(CONF_FILE_SHORT, fileName);
|
||||
} else {
|
||||
post.addParameter(FILE, fileName);
|
||||
}
|
||||
//use checksum
|
||||
if (this.includeChecksum)
|
||||
post.addParameter(CHECKSUM, "true");
|
||||
//wt=filestream this is a custom protocol
|
||||
post.addParameter("wt", FILE_STREAM);
|
||||
//This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
|
||||
// the server starts from the offset
|
||||
if (bytesDownloaded > 0) {
|
||||
post.addParameter(OFFSET, "" + bytesDownloaded);
|
||||
}
|
||||
client.executeMethod(post);
|
||||
InputStream is = post.getResponseBodyAsStream();
|
||||
//wrap it using FastInputStream
|
||||
return new FastInputStream(is);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue