More comments, fixed typos.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@708837 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-10-29 08:54:54 +00:00
parent 2cc8be768a
commit bca3f50107
2 changed files with 110 additions and 82 deletions

View File

@ -25,9 +25,9 @@ import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook; import org.apache.solr.core.CloseHook;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener; import org.apache.solr.core.SolrEventListener;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.request.BinaryQueryResponseWriter; import org.apache.solr.request.BinaryQueryResponseWriter;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.request.SolrQueryResponse;
@ -106,15 +106,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add("status", "OK"); rsp.add("status", "OK");
return; return;
} }
//This command does not give the current index version of the master // This command does not give the current index version of the master
// It gives the current replicateable index version // It gives the current 'replicateable' index version
if (command.equals(CMD_INDEX_VERSION)) { if (command.equals(CMD_INDEX_VERSION)) {
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
if (commitPoint != null) { if (commitPoint != null) {
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion()); rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
rsp.add(GENERATION, commitPoint.getGeneration()); rsp.add(GENERATION, commitPoint.getGeneration());
} else { } else {
// This happens when replicateAfter does not have startup and no commit/optimize // This happens when replication is not configured to happen after startup and no commit/optimize
// has happened yet. // has happened yet.
rsp.add(CMD_INDEX_VERSION, 0L); rsp.add(CMD_INDEX_VERSION, 0L);
rsp.add(GENERATION, 0L); rsp.add(GENERATION, 0L);
@ -164,7 +164,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return l; return l;
} }
/**Gets the checksum of a file /**
* Gets the checksum of a file
*/ */
private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) { private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) {
Checksum checksum = new Adler32(); Checksum checksum = new Adler32();
@ -231,8 +232,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
} }
/**This method adds an Object of FileStream to the resposnse . /**
* The FileStream implements a custom protocol which is also understoop by the SnapPuller * This method adds an Object of FileStream to the resposnse .
* The FileStream implements a custom protocol which is understood by SnapPuller.FileFetcher
*
* @see org.apache.solr.handler.SnapPuller.FileFetcher
*/ */
private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) { private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams); ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
@ -279,10 +283,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CONF_FILES, confFiles); rsp.add(CONF_FILES, confFiles);
} }
/** for configuration files checksum of the file also is included /**
* because ,unlike index ,files they may have same content but different timestamps * For configuration files, checksum of the file is included
* because, unlike index files, they may have same content but different timestamps.
* <p/>
* The local conf files information is cached so that everytime it does not have to * The local conf files information is cached so that everytime it does not have to
* read the file content. The cache is refreshed only if the lastmodified of the file changes * compute the checksum. The cache is refreshed only if the lastModified of the file changes
*/ */
List<Map<String, Object>> getConfFileCache(Collection<String> filenames) { List<Map<String, Object>> getConfFileCache(Collection<String> filenames) {
List<Map<String, Object>> confFiles = new ArrayList<Map<String, Object>>(); List<Map<String, Object>> confFiles = new ArrayList<Map<String, Object>>();
@ -291,7 +297,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
Checksum checksum = null; Checksum checksum = null;
for (String cf : filenames) { for (String cf : filenames) {
File f = new File(confDir, cf); File f = new File(confDir, cf);
if (!f.exists() || f.isDirectory()) continue;//must not happen if (!f.exists() || f.isDirectory()) continue; //must not happen
FileInfo info = confFileInfoCache.get(cf); FileInfo info = confFileInfoCache.get(cf);
if (info == null || info.lastmodified != f.lastModified() || info.size != f.length()) { if (info == null || info.lastmodified != f.lastModified() || info.size != f.length()) {
if (checksum == null) checksum = new Adler32(); if (checksum == null) checksum = new Adler32();
@ -305,7 +311,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
private static class FileInfo { private static class FileInfo {
long lastmodified; long lastmodified;
String name; String name;
long size; long size;
@ -365,7 +370,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return size; return size;
} }
/**Collects the details such as name, size ,lasmodified of a file /**
* Collects the details such as name, size ,lastModified of a file
*/ */
private Map<String, Object> getFileInfo(File file) { private Map<String, Object> getFileInfo(File file) {
Map<String, Object> fileMeta = new HashMap<String, Object>(); Map<String, Object> fileMeta = new HashMap<String, Object>();
@ -376,19 +382,19 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
public String getDescription() { public String getDescription() {
return ""; return "ReplicationHandler provides replication of index and configuration files from Master to Slaves";
} }
public String getSourceId() { public String getSourceId() {
return ""; return "$Id$";
} }
public String getSource() { public String getSource() {
return ""; return "$URL$";
} }
public String getVersion() { public String getVersion() {
return "$Id$"; return "$Revision$";
} }
String readableSize(long size) { String readableSize(long size) {
@ -451,6 +457,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return list; return list;
} }
/**
* Used for showing statistics and progress information.
*/
void getReplicationDetails(SolrQueryResponse resp) { void getReplicationDetails(SolrQueryResponse resp) {
String timeLastReplicated = "", confFilesReplicated = "", confFilesReplicatedTime = "", timesIndexReplicated = "", timesConfigReplicated = ""; String timeLastReplicated = "", confFilesReplicated = "", confFilesReplicatedTime = "", timesIndexReplicated = "", timesConfigReplicated = "";
NamedList<Object> details = new SimpleOrderedMap<Object>(); NamedList<Object> details = new SimpleOrderedMap<Object>();
@ -645,9 +654,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
LOG.info("Replication enabled for following config files: " + includeConfFiles); LOG.info("Replication enabled for following config files: " + includeConfFiles);
} }
List snapshot = master.getAll("snapshot"); List snapshot = master.getAll("snapshot");
boolean snapshotOnCommit = snapshot.contains("commit"); boolean snapshotOnCommit = snapshot.contains("commit");
boolean snapshotOnOptimize = snapshot.contains("optimize"); boolean snapshotOnOptimize = snapshot.contains("optimize");
List replicateAfter = master.getAll(REPLICATE_AFTER); List replicateAfter = master.getAll(REPLICATE_AFTER);
replicateOnCommit = replicateAfter.contains("commit"); replicateOnCommit = replicateAfter.contains("commit");
replicateOnOptimize = replicateAfter.contains("optimize"); replicateOnOptimize = replicateAfter.contains("optimize");
@ -663,7 +672,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try { try {
indexCommitPoint = s.get().getReader().getIndexCommit(); indexCommitPoint = s.get().getReader().getIndexCommit();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup",e); LOG.warn("Unable to get IndexCommit on startup", e);
} finally { } finally {
s.decref(); s.decref();
} }
@ -677,7 +686,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
} }
/**register a closehook /**
* register a closehook
*/ */
private void registerCloseHook() { private void registerCloseHook() {
core.addCloseHook(new CloseHook() { core.addCloseHook(new CloseHook() {
@ -689,7 +699,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}); });
} }
/**A responsewriter is registered automatically for wt=filestream /**
* A ResponseWriter is registered automatically for wt=filestream
* This response writer is used to transfer index files in a block-by-block manner within
* the same HTTP response.
*/ */
private void registerFileStreamResponseWriter() { private void registerFileStreamResponseWriter() {
core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() { core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() {
@ -711,18 +724,22 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
/**Register a listener for postcommit/optimize /**
* Register a listener for postcommit/optimize
*
* @param snapshoot do a snapshoot * @param snapshoot do a snapshoot
* @param getCommit get a commitpoint also * @param getCommit get a commitpoint also
* @return an instance of the eventlistener * @return an instance of the eventlistener
*/ */
private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) { private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
return new SolrEventListener() { return new SolrEventListener() {
public void init(NamedList args) {/*no op*/ } public void init(NamedList args) {/*no op*/ }
/**
* This refreshes the latest replicateable index commit and optionally can create Snapshots as well
*/
public void postCommit() { public void postCommit() {
if(getCommit){ if (getCommit) {
indexCommitPoint = core.getDeletionPolicy().getLatestCommit(); indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
} }
if (snapshoot) { if (snapshoot) {
@ -767,7 +784,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
String sLen = params.get(LEN); String sLen = params.get(LEN);
String sChecksum = params.get(CHECKSUM); String sChecksum = params.get(CHECKSUM);
String sindexVersion = params.get(CMD_INDEX_VERSION); String sindexVersion = params.get(CMD_INDEX_VERSION);
if(sindexVersion != null) indexVersion = Long.parseLong(sindexVersion); if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
FileInputStream inputStream = null; FileInputStream inputStream = null;
int packetsWritten = 0; int packetsWritten = 0;
try { try {
@ -820,7 +837,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
fos.write(buf, 0, (int) bytesRead); fos.write(buf, 0, (int) bytesRead);
fos.flush(); fos.flush();
if(indexVersion != null && (packetsWritten % 5 == 0)){ if (indexVersion != null && (packetsWritten % 5 == 0)) {
//after every 5 packets reserve the commitpoint for some time //after every 5 packets reserve the commitpoint for some time
delPolicy.setReserveDuration(indexVersion, reserveCommitDuration); delPolicy.setReserveDuration(indexVersion, reserveCommitDuration);
} }
@ -836,6 +853,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
} }
/**
* Used to write a marker for EOF
*/
private void writeNothing() throws IOException { private void writeNothing() throws IOException {
fos.writeInt(0); fos.writeInt(0);
fos.flush(); fos.flush();

View File

@ -131,10 +131,6 @@ public class SnapPuller {
/** /**
* Gets the latest commit version and generation from the master * Gets the latest commit version and generation from the master
*
* @param client
* @return
* @throws IOException
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
NamedList getLatestVersion(HttpClient client) throws IOException { NamedList getLatestVersion(HttpClient client) throws IOException {
@ -151,8 +147,7 @@ public class SnapPuller {
return getNamedListResponse(client, post); return getNamedListResponse(client, post);
} }
private NamedList getNamedListResponse(HttpClient client, PostMethod method) private NamedList getNamedListResponse(HttpClient client, PostMethod method) throws IOException {
throws IOException {
try { try {
int status = client.executeMethod(method); int status = client.executeMethod(method);
if (status != HttpStatus.SC_OK) { if (status != HttpStatus.SC_OK) {
@ -169,12 +164,7 @@ public class SnapPuller {
} }
/** /**
* Fetches the list of files in a given snapshot * Fetches the list of files in a given index commit point
*
* @param version
* @param client
* @return
* @throws IOException
*/ */
void fetchFileList(long version, HttpClient client) throws IOException { void fetchFileList(long version, HttpClient client) throws IOException {
PostMethod post = new PostMethod(masterUrl); PostMethod post = new PostMethod(masterUrl);
@ -192,8 +182,8 @@ public class SnapPuller {
/** /**
* This command downloads all the necessary files from master to install a * This command downloads all the necessary files from master to install a
* snapshot. Only changed files are downloaded. it also downloads the * index commit point. Only changed files are downloaded. It also downloads the
* conf files (if they are modified) * conf files (if they are modified).
* *
* @param core the SolrCore * @param core the SolrCore
* @return true on success, false if slave is already in sync * @return true on success, false if slave is already in sync
@ -297,6 +287,10 @@ public class SnapPuller {
} }
} }
/**
* Helper method to record the last replication's details so that we can show them on the
* statistics page across restarts.
*/
private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles) { private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles) {
FileOutputStream outFile = null; FileOutputStream outFile = null;
FileInputStream inFile = null; FileInputStream inFile = null;
@ -364,11 +358,11 @@ public class SnapPuller {
} }
/**All the files are copied to a temp dir first /**
* All the files are copied to a temp dir first
*/ */
private File createTempindexDir(SolrCore core) { private File createTempindexDir(SolrCore core) {
String snapName = "index." String snapName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date());
+ new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date());
File snapDir = new File(core.getDataDir(), snapName); File snapDir = new File(core.getDataDir(), snapName);
snapDir.mkdirs(); snapDir.mkdirs();
return snapDir; return snapDir;
@ -404,17 +398,19 @@ public class SnapPuller {
copyTmpConfFiles2Conf(tmpconfDir); copyTmpConfFiles2Conf(tmpconfDir);
} }
/** download the index files. if snap needed download all the files . /**
* @param snapNeeded is it a fresh index copy * Download the index files. If a new index is needed, download all the files.
* @param snapDir the directory to which files need to be downloadeed to *
* @param client the httpclient instance * @param downloadCompleteIndex is it a fresh index copy
* @param snapDir the directory to which files need to be downloadeed to
* @param client the httpclient instance
* @param latestVersion the version number * @param latestVersion the version number
*/ */
private void downloadIndexFiles(boolean snapNeeded, File snapDir, private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir,
HttpClient client, long latestVersion) throws Exception { HttpClient client, long latestVersion) throws Exception {
for (Map<String, Object> file : filesToDownload) { for (Map<String, Object> file : filesToDownload) {
File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME)); File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME));
if (!localIndexFile.exists() || snapNeeded) { if (!localIndexFile.exists() || downloadCompleteIndex) {
fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME), fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
client, false, latestVersion); client, false, latestVersion);
currentFile = file; currentFile = file;
@ -426,8 +422,11 @@ public class SnapPuller {
} }
} }
/**All the files which are common between master and slave must have /**
* same timestamp and size else we assume they are not compatible (stale) * All the files which are common between master and slave must have
* same timestamp and size else we assume they are not compatible (stale).
*
* @return true if the index stale and we need to download a fresh copy, false otherwise.
*/ */
private boolean isIndexStale() { private boolean isIndexStale() {
for (Map<String, Object> file : filesToDownload) { for (Map<String, Object> file : filesToDownload) {
@ -443,9 +442,11 @@ public class SnapPuller {
return false; return false;
} }
/**Copy a file by the File#renameTo() method. if it fails , it is considered /**
* Copy a file by the File#renameTo() method. If it fails, it is considered
* a failure * a failure
* todo may be we should try a simple copy if it fails *
* Todo may be we should try a simple copy if it fails
*/ */
private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) { private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) {
File indexFileInSnap = new File(snapDir, fname); File indexFileInSnap = new File(snapDir, fname);
@ -465,9 +466,10 @@ public class SnapPuller {
return true; return true;
} }
/**Copy all index files from the temp index dir to the actual index /**
* Copy all index files from the temp index dir to the actual index.
* The segments_N file is copied last.
*/ */
private boolean copyIndexFiles(File snapDir, File indexDir) { private boolean copyIndexFiles(File snapDir, File indexDir) {
String segmentsFile = null; String segmentsFile = null;
List<String> copiedfiles = new ArrayList<String>(); List<String> copiedfiles = new ArrayList<String>();
@ -492,7 +494,8 @@ public class SnapPuller {
return true; return true;
} }
/**The conf files are copied to the tmp dir to the config dir /**
* The conf files are copied to the tmp dir to the conf dir.
* A backup of the old file is maintained * A backup of the old file is maintained
*/ */
private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException { private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException {
@ -523,8 +526,8 @@ public class SnapPuller {
return new SimpleDateFormat(SnapShooter.DATE_FMT).format(d); return new SimpleDateFormat(SnapShooter.DATE_FMT).format(d);
} }
/**if the index is stale by any chance. use the new feature of solr to load index /**
* from a different dir in the data dir. * If the index is stale by any chance, load index from a different dir in the data dir.
*/ */
private void modifyIndexProps(String snap) { private void modifyIndexProps(String snap) {
LOG.info("New index installed. Updating index properties..."); LOG.info("New index installed. Updating index properties...");
@ -554,8 +557,11 @@ public class SnapPuller {
} }
} }
/**The local conf files are compared with the conf files in the master. If they are /**
* same (by checksum) do not copy * The local conf files are compared with the conf files in the master. If they are
* same (by checksum) do not copy.
*
* @return a list of configuration files which have changed on the master and need to be downloaded.
*/ */
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) { private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
if (confFilesToDownload == null || confFilesToDownload.isEmpty()) if (confFilesToDownload == null || confFilesToDownload.isEmpty())
@ -576,7 +582,8 @@ public class SnapPuller {
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values(); return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
} }
/**delete the directree recursively /**
* Delete the directory tree recursively
*/ */
static boolean delTree(File dir) { static boolean delTree(File dir) {
if (dir == null || !dir.exists()) if (dir == null || !dir.exists())
@ -598,7 +605,8 @@ public class SnapPuller {
return dir.delete(); return dir.delete();
} }
/**periodic polling is disabled /**
* Disable periodic polling
*/ */
void disablePoll() { void disablePoll() {
pollDisabled.set(true); pollDisabled.set(true);
@ -613,7 +621,8 @@ public class SnapPuller {
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled); LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
} }
/** Stops the ongoing pull /**
* Stops the ongoing pull
*/ */
void abortPull() { void abortPull() {
stop = true; stop = true;
@ -623,8 +632,6 @@ public class SnapPuller {
return replicationStartTime; return replicationStartTime;
} }
/**used by details page for display.
*/
List<Map<String, Object>> getConfFilesToDownload() { List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later //make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload; List<Map<String, Object>> tmp = confFilesToDownload;
@ -673,16 +680,15 @@ public class SnapPuller {
} }
private class ReplicationHandlerException extends InterruptedException { private class ReplicationHandlerException extends InterruptedException {
public ReplicationHandlerException(String message) { public ReplicationHandlerException(String message) {
super(message); super(message);
} }
} }
/**The class acts as a client for ReplicationHandler.FileStream. /**
* It understands the protoolc well * The class acts as a client for ReplicationHandler.FileStream.
* * It understands the protocol of wt=filestream
* @see org.apache.solr.handler.ReplicationHandler.FileStream
*/ */
private class FileFetcher { private class FileFetcher {
boolean includeChecksum = true; boolean includeChecksum = true;
@ -733,8 +739,8 @@ public class SnapPuller {
checksum = new Adler32(); checksum = new Adler32();
} }
/**The main method which downloads file /**
* @throws Exception * The main method which downloads file
*/ */
void fetchFile() throws Exception { void fetchFile() throws Exception {
try { try {
@ -753,7 +759,7 @@ public class SnapPuller {
} }
//if there is an error continue. But continue from the point where it got broken //if there is an error continue. But continue from the point where it got broken
} finally { } finally {
//closing Inputstream and HTTP connection takes a long time, // closing Inputstream and HTTP connection takes a long time,
// so replication status shows as 'replicating' even though it is aborted. // so replication status shows as 'replicating' even though it is aborted.
new Thread() { new Thread() {
public void run() { public void run() {
@ -806,7 +812,7 @@ public class SnapPuller {
long checkSumClient = checksum.getValue(); long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) { if (checkSumClient != checkSumServer) {
LOG.error("Checksum not matched between client and server for: " + currentFile); LOG.error("Checksum not matched between client and server for: " + currentFile);
//if checksum is wrong it is a problem return for retry //if checksum is wrong it is a problem return for retry
return 1; return 1;
} }
} }
@ -836,7 +842,7 @@ public class SnapPuller {
/** /**
* The webcontainer flushes the data only after it fills the buffer size. * The webcontainer flushes the data only after it fills the buffer size.
* So, all data has to be read as readFully() other wise it fails. So read * So, all data has to be read as readFully() other wise it fails. So read
* everything as bytes and then extract int out of it * everything as bytes and then extract an integer out of it
*/ */
private int readInt(byte[] b) { private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
@ -845,7 +851,7 @@ public class SnapPuller {
} }
/** /**
* Same as above but to read long * Same as above but to read longs from a byte array
*/ */
private long readLong(byte[] b) { private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48) return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
@ -855,7 +861,8 @@ public class SnapPuller {
} }
/**cleanup everything /**
* cleanup everything
*/ */
private void cleanup() { private void cleanup() {
try { try {
@ -864,7 +871,7 @@ public class SnapPuller {
} catch (Exception e) {/* noop */ } catch (Exception e) {/* noop */
} }
if (bytesDownloaded != size) { if (bytesDownloaded != size) {
//if the download is notcomplete then //if the download is not complete then
//delete the file being downloaded //delete the file being downloaded
try { try {
file.delete(); file.delete();
@ -879,7 +886,8 @@ public class SnapPuller {
} }
} }
/**Open a new stream using HttpClient /**
* Open a new stream using HttpClient
*/ */
FastInputStream getStream() throws IOException { FastInputStream getStream() throws IOException {
post = new PostMethod(masterUrl); post = new PostMethod(masterUrl);
@ -898,7 +906,7 @@ public class SnapPuller {
post.addParameter(CHECKSUM, "true"); post.addParameter(CHECKSUM, "true");
//wt=filestream this is a custom protocol //wt=filestream this is a custom protocol
post.addParameter("wt", FILE_STREAM); post.addParameter("wt", FILE_STREAM);
//This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset // the server starts from the offset
if (bytesDownloaded > 0) { if (bytesDownloaded > 0) {
post.addParameter(OFFSET, "" + bytesDownloaded); post.addParameter(OFFSET, "" + bytesDownloaded);