From eb5ae4dcc0a0eb153d3e5d0c0d7e7326fe527a99 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Tue, 28 Oct 2008 14:05:52 +0000 Subject: [PATCH] SOLR-561: add more comments git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@708578 13f79535-47bb-0310-9956-ffa450edef68 --- .../solr/handler/ReplicationHandler.java | 37 ++++++- .../org/apache/solr/handler/SnapPuller.java | 98 +++++++++++++++++-- 2 files changed, 124 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/solr/handler/ReplicationHandler.java b/src/java/org/apache/solr/handler/ReplicationHandler.java index 8b38e2c6f99..c361f7a5dae 100644 --- a/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -106,13 +106,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw rsp.add("status", "OK"); return; } + //This command does not give the current index version of the master + // It gives the current replicateable index version if (command.equals(CMD_INDEX_VERSION)) { IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change if (commitPoint != null) { rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion()); rsp.add(GENERATION, commitPoint.getGeneration()); } else { - // must never happen + // This happens when replicateAfter does not have startup and no commit/optimize + // has happened yet. rsp.add(CMD_INDEX_VERSION, 0L); rsp.add(GENERATION, 0L); } @@ -134,6 +137,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw if (snapPuller != null) snapPuller.abortPull(); } else if (command.equals(CMD_FILE_CHECKSUM)) { + // this command is not used by anyone getFileChecksum(solrParams, rsp); } else if (command.equals(CMD_SHOW_COMMITS)) { rsp.add(CMD_SHOW_COMMITS, getCommits()); @@ -160,6 +164,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return l; } + /**Gets the checksum of a file + */ private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) { Checksum checksum = new Adler32(); File dir = new File(core.getIndexDir()); @@ -225,6 +231,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } } + /**This method adds an Object of FileStream to the resposnse . + * The FileStream implements a custom protocol which is also understoop by the SnapPuller + */ private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) { ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams); rawParams.set(CommonParams.WT, FILE_STREAM); @@ -244,9 +253,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw rsp.add("status", "invalid indexversion"); return; } + // reserve the indexcommit for sometime core.getDeletionPolicy().setReserveDuration(version, reserveCommitDuration); List> result = new ArrayList>(); try { + //get all the files in the commit Collection files = commit.getFileNames(); for (String fileName : files) { File file = new File(core.getIndexDir(), fileName); @@ -263,10 +274,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw if (includeConfFiles == null) return; LOG.debug("Adding config files to list: " + includeConfFiles); + //if configuration files need to be included get their details List> confFiles = getConfFileCache(includeConfFiles); rsp.add(CONF_FILES, confFiles); } + /** for configuration files checksum of the file also is included + * because ,unlike index ,files they may have same content but different timestamps + * The local conf files information is cached so that everytime it does not have to + * read the file content. The cache is refreshed only if the lastmodified of the file changes + */ List> getConfFileCache(Collection filenames) { List> confFiles = new ArrayList>(); synchronized (confFileInfoCache) { @@ -348,6 +365,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return size; } + /**Collects the details such as name, size ,lasmodified of a file + */ private Map getFileInfo(File file) { Map fileMeta = new HashMap(); fileMeta.put(NAME, file.getName()); @@ -658,6 +677,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } } + /**register a closehook + */ private void registerCloseHook() { core.addCloseHook(new CloseHook() { public void close(SolrCore core) { @@ -668,6 +689,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw }); } + /**A responsewriter is registered automatically for wt=filestream + */ private void registerFileStreamResponseWriter() { core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() { public void write(OutputStream out, SolrQueryRequest request, SolrQueryResponse resp) throws IOException { @@ -688,6 +711,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } + /**Register a listener for postcommit/optimize + * @param snapshoot do a snapshoot + * @param getCommit get a commitpoint also + * @return an instance of the eventlistener + */ + private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) { return new SolrEventListener() { public void init(NamedList args) {/*no op*/ } @@ -744,24 +773,29 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw try { long offset = -1; int len = -1; + //check if checksum is requested boolean useChecksum = Boolean.parseBoolean(sChecksum); if (sOffset != null) offset = Long.parseLong(sOffset); if (sLen != null) len = Integer.parseInt(sLen); if (fileName == null && cfileName == null) { + //no filename do nothing writeNothing(); } File file = null; if (cfileName != null) { + //if if is a conf file read from config diectory file = new File(core.getResourceLoader().getConfigDir(), cfileName); } else { + //else read from the indexdirectory file = new File(core.getIndexDir(), fileName); } if (file.exists() && file.canRead()) { inputStream = new FileInputStream(file); FileChannel channel = inputStream.getChannel(); + //if offset is mentioned move the pointer to that point if (offset != -1) channel.position(offset); byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len]; @@ -787,6 +821,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw fos.write(buf, 0, (int) bytesRead); fos.flush(); if(indexVersion != null && (packetsWritten % 5 == 0)){ + //after every 5 packets reserve the commitpoint for some time delPolicy.setReserveDuration(indexVersion, reserveCommitDuration); } packetsWritten++; diff --git a/src/java/org/apache/solr/handler/SnapPuller.java b/src/java/org/apache/solr/handler/SnapPuller.java index 23c3010c8ef..b213f9165f5 100644 --- a/src/java/org/apache/solr/handler/SnapPuller.java +++ b/src/java/org/apache/solr/handler/SnapPuller.java @@ -192,7 +192,8 @@ public class SnapPuller { /** * This command downloads all the necessary files from master to install a - * snapshot. Only changed files are downloaded. + * snapshot. Only changed files are downloaded. it also downloads the + * conf files (if they are modified) * * @param core the SolrCore * @return true on success, false if slave is already in sync @@ -204,11 +205,15 @@ public class SnapPuller { replicationStartTime = System.currentTimeMillis(); try { client = new HttpClient(); + // The closing is done in a different thread. So use multiThreaded conn manager + // else it prints out a warning client.setHttpConnectionManager(new MultiThreadedHttpConnectionManager()); + //get the current 'replicateable' index version in the master NamedList response = getLatestVersion(client); long latestVersion = (Long) response.get(CMD_INDEX_VERSION); long latestGeneration = (Long) response.get(GENERATION); if (latestVersion == 0L) { + //there is nothing to be replicated return false; } IndexCommit commit; @@ -221,16 +226,21 @@ public class SnapPuller { searcherRefCounted.decref(); } if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) { + //master and slave are alsready in sync just return LOG.info("Slave in sync with master."); return false; } LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration); LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration()); LOG.info("Starting replication process"); + // get the list of files first fetchFileList(latestVersion, client); LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size()); + // use a synchronized list because the list is read by other threads (to show details) filesDownloaded = Collections.synchronizedList(new ArrayList>()); + // if the generateion of master is older than that of the slave , it means they are not compatible to be copied + // then a new index direcory to be created and all the files need to be copied boolean isSnapNeeded = commit.getGeneration() >= latestGeneration; File tmpIndexDir = createTempindexDir(core); if (isIndexStale()) @@ -353,6 +363,9 @@ public class SnapPuller { solrCore.getUpdateHandler().commit(cmd); } + + /**All the files are copied to a temp dir first + */ private File createTempindexDir(SolrCore core) { String snapName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date()); @@ -391,6 +404,12 @@ public class SnapPuller { copyTmpConfFiles2Conf(tmpconfDir); } + /** download the index files. if snap needed download all the files . + * @param snapNeeded is it a fresh index copy + * @param snapDir the directory to which files need to be downloadeed to + * @param client the httpclient instance + * @param latestVersion the version number + */ private void downloadIndexFiles(boolean snapNeeded, File snapDir, HttpClient client, long latestVersion) throws Exception { for (Map file : filesToDownload) { @@ -407,6 +426,9 @@ public class SnapPuller { } } + /**All the files which are common between master and slave must have + * same timestamp and size else we assume they are not compatible (stale) + */ private boolean isIndexStale() { for (Map file : filesToDownload) { File localIndexFile = new File(solrCore.getIndexDir(), (String) file @@ -421,6 +443,10 @@ public class SnapPuller { return false; } + /**Copy a file by the File#renameTo() method. if it fails , it is considered + * a failure + * todo may be we should try a simple copy if it fails + */ private boolean copyAFile(File snapDir, File indexDir, String fname, List copiedfiles) { File indexFileInSnap = new File(snapDir, fname); File indexFileInIndex = new File(indexDir, fname); @@ -439,11 +465,17 @@ public class SnapPuller { return true; } + /**Copy all index files from the temp index dir to the actual index + */ + private boolean copyIndexFiles(File snapDir, File indexDir) { String segmentsFile = null; List copiedfiles = new ArrayList(); for (Map f : filesDownloaded) { String fname = (String) f.get(NAME); + // the segments file must be copied last + // or else if there is a failure in between the + // index will be corrupted if (fname.startsWith("segments_")) { //The segments file must be copied in the end //Otherwise , if the copy fails index ends up corrupted @@ -453,12 +485,16 @@ public class SnapPuller { if (!copyAFile(snapDir, indexDir, fname, copiedfiles)) return false; copiedfiles.add(fname); } + //copy the segments file last if (segmentsFile != null) { if (!copyAFile(snapDir, indexDir, segmentsFile, copiedfiles)) return false; } return true; } + /**The conf files are copied to the tmp dir to the config dir + * A backup of the old file is maintained + */ private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException { File confDir = new File(solrCore.getResourceLoader().getConfigDir()); try { @@ -487,6 +523,9 @@ public class SnapPuller { return new SimpleDateFormat(SnapShooter.DATE_FMT).format(d); } + /**if the index is stale by any chance. use the new feature of solr to load index + * from a different dir in the data dir. + */ private void modifyIndexProps(String snap) { LOG.info("New index installed. Updating index properties..."); File idxprops = new File(solrCore.getDataDir() + "index.properties"); @@ -515,6 +554,9 @@ public class SnapPuller { } } + /**The local conf files are compared with the conf files in the master. If they are + * same (by checksum) do not copy + */ private Collection> getModifiedConfFiles(List> confFilesToDownload) { if (confFilesToDownload == null || confFilesToDownload.isEmpty()) return Collections.EMPTY_LIST; @@ -534,6 +576,8 @@ public class SnapPuller { return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values(); } + /**delete the directree recursively + */ static boolean delTree(File dir) { if (dir == null || !dir.exists()) return false; @@ -554,21 +598,22 @@ public class SnapPuller { return dir.delete(); } + /**periodic polling is disabled + */ void disablePoll() { pollDisabled.set(true); LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled); } /** - * Enable polling + * Enable periodic polling */ void enablePoll() { pollDisabled.set(false); LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled); } - /** - * Stops the ongoing pull + /** Stops the ongoing pull */ void abortPull() { stop = true; @@ -578,9 +623,12 @@ public class SnapPuller { return replicationStartTime; } + /**used by details page for display. + */ List> getConfFilesToDownload() { //make a copy first because it can be null later List> tmp = confFilesToDownload; + //create a new instance. or else iterator may fail return tmp == null ? Collections.EMPTY_LIST : new ArrayList>(tmp); } @@ -632,6 +680,10 @@ public class SnapPuller { } + /**The class acts as a client for ReplicationHandler.FileStream. + * It understands the protoolc well + * + */ private class FileFetcher { boolean includeChecksum = true; @@ -681,18 +733,25 @@ public class SnapPuller { checksum = new Adler32(); } + /**The main method which downloads file + * @throws Exception + */ void fetchFile() throws Exception { try { while (true) { final FastInputStream is = getStream(); int result; try { + //fetch packets one by one in a single request result = fetchPackets(is); if (result == 0 || result == NO_CONTENT) { + // if the file is downloaded properly set the + // timestamp same as that in the server if (file.exists()) file.setLastModified(lastmodified); return; } + //if there is an error continue. But continue from the point where it got broken } finally { //closing Inputstream and HTTP connection takes a long time, // so replication status shows as 'replicating' even though it is aborted. @@ -725,6 +784,7 @@ public class SnapPuller { } long checkSumServer = -1; fis.readFully(intbytes); + //read the size of the packet int packetSize = readInt(intbytes); if (packetSize <= 0) { LOG.warn("No content recieved for file: " + currentFile); @@ -733,30 +793,38 @@ public class SnapPuller { if (buf.length < packetSize) buf = new byte[packetSize]; if (checksum != null) { + //read the checksum fis.readFully(longbytes); checkSumServer = readLong(longbytes); } + //then read the packet of bytes fis.readFully(buf, 0, packetSize); + //compare the checksum as sent from the master if (includeChecksum) { checksum.reset(); checksum.update(buf, 0, packetSize); long checkSumClient = checksum.getValue(); if (checkSumClient != checkSumServer) { LOG.error("Checksum not matched between client and server for: " + currentFile); + //if checksum is wrong it is a problem return for retry return 1; } } + //if everything is fine, write down the packet to the file fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize)); bytesDownloaded += packetSize; if (bytesDownloaded >= size) return 0; + //errorcount is always set to zero after a successful packet errorCount = 0; } } catch (ReplicationHandlerException e) { throw e; } catch (Exception e) { LOG.warn("Error in fetching packets ", e); + //for any failure , increment the error count errorCount++; + //if it fails for the same pacaket for MAX_RETRIES fail and come out if (errorCount > MAX_RETRIES) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Snappull failed for file:" + fileName, e); @@ -769,9 +837,6 @@ public class SnapPuller { * The webcontainer flushes the data only after it fills the buffer size. * So, all data has to be read as readFully() other wise it fails. So read * everything as bytes and then extract int out of it - * - * @param b - * @return */ private int readInt(byte[] b) { return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) @@ -781,9 +846,6 @@ public class SnapPuller { /** * Same as above but to read long - * - * @param b - * @return */ private long readLong(byte[] b) { return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48) @@ -793,17 +855,23 @@ public class SnapPuller { } + /**cleanup everything + */ private void cleanup() { try { + //close the file fileChannel.close(); } catch (Exception e) {/* noop */ } if (bytesDownloaded != size) { + //if the download is notcomplete then + //delete the file being downloaded try { file.delete(); } catch (Exception e) { LOG.error("Error deleting file in cleanup" + e.getMessage()); } + //if the failure is due to a user abort it is returned nomally else an exception is thrown if (!aborted) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to download " + fileName + " completely. Downloaded " @@ -811,23 +879,33 @@ public class SnapPuller { } } + /**Open a new stream using HttpClient + */ FastInputStream getStream() throws IOException { post = new PostMethod(masterUrl); + //the method is command=filecontent post.addParameter(COMMAND, CMD_GET_FILE); + //add the version to download. This is used to reserve the download post.addParameter(CMD_INDEX_VERSION, indexVersion.toString()); if (isConf) { + //set cf instead of file for config file post.addParameter(CONF_FILE_SHORT, fileName); } else { post.addParameter(FILE, fileName); } + //use checksum if (this.includeChecksum) post.addParameter(CHECKSUM, "true"); + //wt=filestream this is a custom protocol post.addParameter("wt", FILE_STREAM); + //This happen if there is a failure there is a retry. the offset= ensures that + // the server starts from the offset if (bytesDownloaded > 0) { post.addParameter(OFFSET, "" + bytesDownloaded); } client.executeMethod(post); InputStream is = post.getResponseBodyAsStream(); + //wrap it using FastInputStream return new FastInputStream(is); } }