From 8755ea2edd20ef738713a84dd8f24137c3cbcf42 Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Mon, 26 Jan 2015 18:30:45 +0000 Subject: [PATCH] SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1654858 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 6 +- .../solr/core/IndexDeletionPolicyWrapper.java | 10 +- .../solr/handler/ReplicationHandler.java | 18 +- .../org/apache/solr/handler/SnapPuller.java | 487 ++++++------------ 4 files changed, 168 insertions(+), 353 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 81be6773be6..d2cdef960d8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -54,7 +54,7 @@ Other Changes * SOLR-6895: Deprecated SolrServer classes have been removed (Alan Woodward, Erik Hatcher) -* SOLR-6902Use JUnit rules instead of inheritance with distributed Solr +* SOLR-6902: Use JUnit rules instead of inheritance with distributed Solr tests to allow for multiple tests without the same class. (Ramkumar Aiyengar, Erick Erickson, Mike McCandless) @@ -87,8 +87,12 @@ New Features Other Changes ---------------------- + * SOLR-7014: Collapse identical catch branches in try-catch statements. (shalin) +* SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging. + (Ramkumar Aiyengar via Mark Miller) + ================== 5.0.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java b/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java index f48c9daf41b..036d108c9c1 100644 --- a/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java +++ b/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java @@ -21,6 +21,8 @@ import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.solr.update.SolrIndexWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; @@ -43,6 +45,8 @@ import java.util.concurrent.atomic.AtomicInteger; * @see org.apache.lucene.index.IndexDeletionPolicy */ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy { + private static final Logger LOG = LoggerFactory.getLogger(IndexDeletionPolicyWrapper.class.getName()); + private final IndexDeletionPolicy deletionPolicy; private volatile Map solrVersionVsCommits = new ConcurrentHashMap<>(); private final Map reserves = new ConcurrentHashMap<>(); @@ -82,7 +86,11 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy { // this is the common success case: the older time didn't exist, or // came before the new time. - if (previousTime == null || previousTime <= timeToSet) break; + if (previousTime == null || previousTime <= timeToSet) { + LOG.debug("Commit point reservation for generation {} set to {} (requested reserve time of {})", + indexGen, timeToSet, reserveTime); + break; + } // At this point, we overwrote a longer reservation, so we want to restore the older one. // the problem is that an even longer reservation may come in concurrently diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index 91f281326a1..9b1c9636bb5 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory; * @since solr 1.4 */ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware { - + private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName()); SolrCore core; @@ -212,7 +212,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req); rsp.add(STATUS, OK_STATUS); } else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) { - deleteSnapshot(new ModifiableSolrParams(solrParams), rsp, req); + deleteSnapshot(new ModifiableSolrParams(solrParams)); rsp.add(STATUS, OK_STATUS); } else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) { String masterUrl = solrParams.get(MASTER_URL); @@ -272,7 +272,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } } - private void deleteSnapshot(ModifiableSolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) { + private void deleteSnapshot(ModifiableSolrParams params) { String name = params.get("name"); if(name == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name"); @@ -793,8 +793,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } else if (clzz == List.class) { String ss[] = s.split(","); List l = new ArrayList<>(); - for (int i = 0; i < ss.length; i++) { - l.add(new Date(Long.valueOf(ss[i])).toString()); + for (String s1 : ss) { + l.add(new Date(Long.valueOf(s1)).toString()); } nl.add(key, l); } else { @@ -1182,6 +1182,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw offset = offset == -1 ? 0 : offset; int read = (int) Math.min(buf.length, filelen - offset); in.readBytes(buf, 0, read); + fos.writeInt(read); if (useChecksum) { checksum.reset(); @@ -1190,6 +1191,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } fos.write(buf, 0, read); fos.flush(); + LOG.debug("Wrote {} bytes for file {}", offset + read, fileName); //Pause if necessary maxBytesBeforePause += read; @@ -1239,8 +1241,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw FileInputStream inputStream = null; try { initWrite(); - - //if if is a conf file read from config diectory + + //if if is a conf file read from config directory File file = new File(core.getResourceLoader().getConfigDir(), cfileName); if (file.exists() && file.canRead()) { @@ -1364,7 +1366,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw * Boolean param for tests that can be specified when using * {@link #CMD_FETCH_INDEX} to force the current request to block until * the fetch is complete. NOTE: This param is not advised for - * non-test code, since the the durration of the fetch for non-trivial + * non-test code, since the the duration of the fetch for non-trivial * indexes will likeley cause the request to time out. * * @lucene.internal diff --git a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java index a36a0420493..6aeb718a555 100644 --- a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java +++ b/solr/core/src/java/org/apache/solr/handler/SnapPuller.java @@ -179,9 +179,7 @@ public class SnapPuller { httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword); httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression); - HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager()); - - return httpClient; + return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager()); } public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) { @@ -520,7 +518,7 @@ public class SnapPuller { solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded); } - openNewSearcherAndUpdateCommitPoint(isFullCopyNeeded); + openNewSearcherAndUpdateCommitPoint(); } replicationStartTime = 0; @@ -699,9 +697,7 @@ public class SnapPuller { List l = new ArrayList<>(); if (str != null && str.length() != 0) { String[] ss = str.split(","); - for (int i = 0; i < ss.length; i++) { - l.add(ss[i]); - } + Collections.addAll(l, ss); } sb.append(replicationTime); if (!l.isEmpty()) { @@ -714,7 +710,7 @@ public class SnapPuller { return sb; } - private void openNewSearcherAndUpdateCommitPoint(boolean isFullCopyNeeded) throws IOException { + private void openNewSearcherAndUpdateCommitPoint() throws IOException { SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams()); @@ -749,8 +745,7 @@ public class SnapPuller { private String createTempindexDir(SolrCore core, String tmpIdxDirName) { // TODO: there should probably be a DirectoryFactory#concatPath(parent, name) // or something - String tmpIdxDir = core.getDataDir() + tmpIdxDirName; - return tmpIdxDir; + return core.getDataDir() + tmpIdxDirName; } private void reloadCore() { @@ -777,7 +772,7 @@ public class SnapPuller { private void downloadConfFiles(List> confFilesToDownload, long latestGeneration) throws Exception { LOG.info("Starting download of configuration files from master: " + confFilesToDownload); - confFilesDownloaded = Collections.synchronizedList(new ArrayList>()); + confFilesDownloaded = Collections.synchronizedList(new ArrayList<>()); File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); try { boolean status = tmpconfDir.mkdirs(); @@ -868,7 +863,7 @@ public class SnapPuller { * Copy a file by the File#renameTo() method. If it fails, it is considered a failure *

*/ - private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List copiedfiles) { + private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) { LOG.debug("Moving file: {}", fname); boolean success = false; try { @@ -902,7 +897,6 @@ public class SnapPuller { } } String segmentsFile = null; - List movedfiles = new ArrayList<>(); for (Map f : filesDownloaded) { String fname = (String) f.get(NAME); // the segments file must be copied last @@ -914,12 +908,11 @@ public class SnapPuller { segmentsFile = fname; continue; } - if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false; - movedfiles.add(fname); + if (!moveAFile(tmpIdxDir, indexDir, fname)) return false; } //copy the segments file last if (segmentsFile != null) { - if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false; + if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false; } return true; } @@ -945,7 +938,7 @@ public class SnapPuller { private void copyTmpConfFiles2Conf(File tmpconfDir) { boolean status = false; File confDir = new File(solrCore.getResourceLoader().getConfigDir()); - for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList())) { + for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<>())) { File oldFile = new File(confDir, file.getPath().substring(tmpconfDir.getPath().length(), file.getPath().length())); if (!oldFile.getParentFile().exists()) { status = oldFile.getParentFile().mkdirs(); @@ -1157,7 +1150,7 @@ public class SnapPuller { return null; tmp = new HashMap<>(tmp); if (tmpFileFetcher != null) - tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded); + tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded()); return tmp; } @@ -1178,58 +1171,53 @@ public class SnapPuller { } } + private interface FileInterface { + public void sync() throws IOException; + public void write(byte[] buf, int packetSize) throws IOException; + public void close() throws Exception; + public void delete() throws Exception; + } + /** * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream * * @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream */ - private class DirectoryFileFetcher { - boolean includeChecksum = true; - - Directory copy2Dir; - - String fileName; - - String saveAs; - - long size; - - long bytesDownloaded = 0; - - byte[] buf = new byte[1024 * 1024]; - - Checksum checksum; - - int errorCount = 0; - + private class FileFetcher { + private final FileInterface file; + private boolean includeChecksum = true; + private String fileName; + private String saveAs; private boolean isConf; - - private boolean aborted = false; - private Long indexGen; - private IndexOutput outStream; + private long size; + private long bytesDownloaded = 0; + private byte[] buf = new byte[1024 * 1024]; + private Checksum checksum; + private int errorCount = 0; + private boolean aborted = false; - DirectoryFileFetcher(Directory tmpIndexDir, Map fileDetails, String saveAs, + FileFetcher(FileInterface file, Map fileDetails, String saveAs, boolean isConf, long latestGen) throws IOException { - this.copy2Dir = tmpIndexDir; + this.file = file; this.fileName = (String) fileDetails.get(NAME); this.size = (Long) fileDetails.get(SIZE); this.isConf = isConf; this.saveAs = saveAs; - indexGen = latestGen; - - outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE); - if (includeChecksum) checksum = new Adler32(); } + public long getBytesDownloaded() { + return bytesDownloaded; + } + /** * The main method which downloads file */ - void fetchFile() throws Exception { + public void fetchFile() throws Exception { try { while (true) { final FastInputStream is = getStream(); @@ -1248,12 +1236,12 @@ public class SnapPuller { } } finally { cleanup(); - //if cleanup suceeds . The file is downloaded fully. do an fsync + //if cleanup succeeds . The file is downloaded fully. do an fsync fsyncService.submit(new Runnable(){ @Override public void run() { try { - copy2Dir.sync(Collections.singleton(saveAs)); + file.sync(); } catch (IOException e) { fsyncException = e; } @@ -1277,7 +1265,7 @@ public class SnapPuller { //read the size of the packet int packetSize = readInt(intbytes); if (packetSize <= 0) { - LOG.warn("No content received for file: " + currentFile); + LOG.warn("No content received for file: {}", fileName); return NO_CONTENT; } if (buf.length < packetSize) @@ -1295,45 +1283,45 @@ public class SnapPuller { checksum.update(buf, 0, packetSize); long checkSumClient = checksum.getValue(); if (checkSumClient != checkSumServer) { - LOG.error("Checksum not matched between client and server for: " + currentFile); + LOG.error("Checksum not matched between client and server for file: {}", fileName); //if checksum is wrong it is a problem return for retry return 1; } } //if everything is fine, write down the packet to the file - writeBytes(packetSize); + file.write(buf, packetSize); bytesDownloaded += packetSize; + LOG.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName); if (bytesDownloaded >= size) return 0; - //errorcount is always set to zero after a successful packet + //errorCount is always set to zero after a successful packet errorCount = 0; } } catch (ReplicationHandlerException e) { throw e; } catch (Exception e) { - LOG.warn("Error in fetching packets ", e); - //for any failure , increment the error count + LOG.warn("Error in fetching file: {} (downloaded {} of {} bytes)", + fileName, bytesDownloaded, size, e); + //for any failure, increment the error count errorCount++; - //if it fails for the same pacaket for MAX_RETRIES fail and come out + //if it fails for the same packet for MAX_RETRIES fail and come out if (errorCount > MAX_RETRIES) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Fetch failed for file:" + fileName, e); + "Failed to fetch file: " + fileName + + " (downloaded " + bytesDownloaded + " of " + size + " bytes" + + ", error count: " + errorCount + " > " + MAX_RETRIES + ")", e); } return ERR; } } - protected void writeBytes(int packetSize) throws IOException { - outStream.writeBytes(buf, 0, packetSize); - } - /** * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully() * other wise it fails. So read everything as bytes and then extract an integer out of it */ private int readInt(byte[] b) { return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) - | ((b[2] & 0xff) << 8) | (b[3] & 0xff)); + | ((b[2] & 0xff) << 8) | (b[3] & 0xff)); } @@ -1342,9 +1330,9 @@ public class SnapPuller { */ private long readLong(byte[] b) { return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48) - | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32) - | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16) - | ((b[6] & 0xff) << 8) | ((b[7] & 0xff)); + | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32) + | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16) + | ((b[6] & 0xff) << 8) | ((b[7] & 0xff)); } @@ -1353,30 +1341,30 @@ public class SnapPuller { */ private void cleanup() { try { - outStream.close(); - } catch (Exception e) {/* noop */ - LOG.error("Error closing the file stream: "+ this.saveAs ,e); + file.close(); + } catch (Exception e) {/* no-op */ + LOG.error("Error closing file: {}", this.saveAs, e); } if (bytesDownloaded != size) { //if the download is not complete then //delete the file being downloaded try { - copy2Dir.deleteFile(saveAs); + file.delete(); } catch (Exception e) { - LOG.error("Error deleting file in cleanup" + e.getMessage()); + LOG.error("Error deleting file: {}", this.saveAs, e); } - //if the failure is due to a user abort it is returned nomally else an exception is thrown + //if the failure is due to a user abort it is returned normally else an exception is thrown if (!aborted) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Unable to download " + fileName + " completely. Downloaded " - + bytesDownloaded + "!=" + size); + "Unable to download " + fileName + " completely. Downloaded " + + bytesDownloaded + "!=" + size); } } /** * Open a new stream using HttpClient */ - FastInputStream getStream() throws IOException { + private FastInputStream getStream() throws IOException { ModifiableSolrParams params = new ModifiableSolrParams(); @@ -1392,7 +1380,7 @@ public class SnapPuller { params.set(FILE, fileName); } if (useInternal) { - params.set(COMPRESSION, "true"); + params.set(COMPRESSION, "true"); } //use checksum if (this.includeChecksum) { @@ -1400,16 +1388,16 @@ public class SnapPuller { } //wt=filestream this is a custom protocol params.set(CommonParams.WT, FILE_STREAM); - // This happen if there is a failure there is a retry. the offset= ensures that - // the server starts from the offset + // This happen if there is a failure there is a retry. the offset= ensures that + // the server starts from the offset if (bytesDownloaded > 0) { params.set(OFFSET, Long.toString(bytesDownloaded)); } - + NamedList response; InputStream is = null; - + HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler try { client.setSoTimeout(60000); @@ -1430,274 +1418,91 @@ public class SnapPuller { } } } - - /** - * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream - * - * @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream - */ - private class LocalFsFileFetcher { - boolean includeChecksum = true; + private class DirectoryFile implements FileInterface { + private final String saveAs; + private Directory copy2Dir; + private IndexOutput outStream; + + DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException { + this.saveAs = saveAs; + this.copy2Dir = tmpIndexDir; + outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE); + } + + public void sync() throws IOException { + copy2Dir.sync(Collections.singleton(saveAs)); + } + + public void write(byte[] buf, int packetSize) throws IOException { + outStream.writeBytes(buf, 0, packetSize); + } + + public void close() throws Exception { + outStream.close(); + } + + public void delete() throws Exception { + copy2Dir.deleteFile(saveAs); + } + } + + private class DirectoryFileFetcher extends FileFetcher { + DirectoryFileFetcher(Directory tmpIndexDir, Map fileDetails, String saveAs, + boolean isConf, long latestGen) throws IOException { + super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen); + } + } + + private class LocalFsFile implements FileInterface { private File copy2Dir; - String fileName; - - String saveAs; - - long size; - - long bytesDownloaded = 0; - FileChannel fileChannel; - private FileOutputStream fileOutputStream; - - byte[] buf = new byte[1024 * 1024]; - - Checksum checksum; - File file; - int errorCount = 0; - - private boolean isConf; - - private boolean aborted = false; - - private Long indexGen; - - // TODO: could do more code sharing with DirectoryFileFetcher - LocalFsFileFetcher(File dir, Map fileDetails, String saveAs, - boolean isConf, long latestGen) throws IOException { + LocalFsFile(File dir, String saveAs) throws IOException { this.copy2Dir = dir; - this.fileName = (String) fileDetails.get(NAME); - this.size = (Long) fileDetails.get(SIZE); - this.isConf = isConf; - this.saveAs = saveAs; - - indexGen = latestGen; this.file = new File(copy2Dir, saveAs); - + File parentDir = this.file.getParentFile(); if( ! parentDir.exists() ){ if ( ! parentDir.mkdirs() ) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Failed to create (sub)directory for file: " + saveAs); + "Failed to create (sub)directory for file: " + saveAs); } } - + this.fileOutputStream = new FileOutputStream(file); this.fileChannel = this.fileOutputStream.getChannel(); - - if (includeChecksum) - checksum = new Adler32(); } - /** - * The main method which downloads file - */ - void fetchFile() throws Exception { - try { - while (true) { - final FastInputStream is = getStream(); - int result; - try { - //fetch packets one by one in a single request - result = fetchPackets(is); - if (result == 0 || result == NO_CONTENT) { - return; - } - //if there is an error continue. But continue from the point where it got broken - } finally { - IOUtils.closeQuietly(is); - } - } - } finally { - cleanup(); - //if cleanup suceeds . The file is downloaded fully. do an fsync - fsyncService.submit(new Runnable(){ - @Override - public void run() { - try { - FileUtils.sync(file); - } catch (IOException e) { - fsyncException = e; - } - } - }); - } + public void sync() throws IOException { + FileUtils.sync(file); } - private int fetchPackets(FastInputStream fis) throws Exception { - byte[] intbytes = new byte[4]; - byte[] longbytes = new byte[8]; - try { - while (true) { - if (stop) { - stop = false; - aborted = true; - throw new ReplicationHandlerException("User aborted replication"); - } - long checkSumServer = -1; - fis.readFully(intbytes); - //read the size of the packet - int packetSize = readInt(intbytes); - if (packetSize <= 0) { - LOG.warn("No content received for file: " + currentFile); - return NO_CONTENT; - } - if (buf.length < packetSize) - buf = new byte[packetSize]; - if (checksum != null) { - //read the checksum - fis.readFully(longbytes); - checkSumServer = readLong(longbytes); - } - //then read the packet of bytes - fis.readFully(buf, 0, packetSize); - //compare the checksum as sent from the master - if (includeChecksum) { - checksum.reset(); - checksum.update(buf, 0, packetSize); - long checkSumClient = checksum.getValue(); - if (checkSumClient != checkSumServer) { - LOG.error("Checksum not matched between client and server for: " + currentFile); - //if checksum is wrong it is a problem return for retry - return 1; - } - } - //if everything is fine, write down the packet to the file - fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize)); - bytesDownloaded += packetSize; - if (bytesDownloaded >= size) - return 0; - //errorcount is always set to zero after a successful packet - errorCount = 0; - } - } catch (ReplicationHandlerException e) { - throw e; - } catch (Exception e) { - LOG.warn("Error in fetching packets ", e); - //for any failure , increment the error count - errorCount++; - //if it fails for the same pacaket for MAX_RETRIES fail and come out - if (errorCount > MAX_RETRIES) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Fetch failed for file:" + fileName, e); - } - return ERR; - } + public void write(byte[] buf, int packetSize) throws IOException { + fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize)); } - /** - * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully() - * other wise it fails. So read everything as bytes and then extract an integer out of it - */ - private int readInt(byte[] b) { - return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16) - | ((b[2] & 0xff) << 8) | (b[3] & 0xff)); - + public void close() throws Exception { + //close the FileOutputStream (which also closes the Channel) + fileOutputStream.close(); } - /** - * Same as above but to read longs from a byte array - */ - private long readLong(byte[] b) { - return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48) - | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32) - | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16) - | ((b[6] & 0xff) << 8) | ((b[7] & 0xff)); - - } - - /** - * cleanup everything - */ - private void cleanup() { - try { - //close the FileOutputStream (which also closes the Channel) - fileOutputStream.close(); - } catch (Exception e) {/* noop */ - LOG.error("Error closing the file stream: "+ this.saveAs ,e); - } - if (bytesDownloaded != size) { - //if the download is not complete then - //delete the file being downloaded - try { - Files.delete(file.toPath()); - } catch (SecurityException e) { - LOG.error("Error deleting file in cleanup" + e.getMessage()); - } catch (Throwable other) { - // TODO: should this class care if a file couldnt be deleted? - // this just emulates previous behavior, where only SecurityException would be handled. - } - //if the failure is due to a user abort it is returned nomally else an exception is thrown - if (!aborted) - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Unable to download " + fileName + " completely. Downloaded " - + bytesDownloaded + "!=" + size); - } - } - - /** - * Open a new stream using HttpClient - */ - FastInputStream getStream() throws IOException { - - ModifiableSolrParams params = new ModifiableSolrParams(); - -// //the method is command=filecontent - params.set(COMMAND, CMD_GET_FILE); - params.set(GENERATION, Long.toString(indexGen)); - params.set(CommonParams.QT, "/replication"); - //add the version to download. This is used to reserve the download - if (isConf) { - //set cf instead of file for config file - params.set(CONF_FILE_SHORT, fileName); - } else { - params.set(FILE, fileName); - } - if (useInternal) { - params.set(COMPRESSION, "true"); - } - //use checksum - if (this.includeChecksum) { - params.set(CHECKSUM, true); - } - //wt=filestream this is a custom protocol - params.set(CommonParams.WT, FILE_STREAM); - // This happen if there is a failure there is a retry. the offset= ensures that - // the server starts from the offset - if (bytesDownloaded > 0) { - params.set(OFFSET, Long.toString(bytesDownloaded)); - } - - - NamedList response; - InputStream is = null; - HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler - try { - client.setSoTimeout(60000); - client.setConnectionTimeout(15000); - QueryRequest req = new QueryRequest(params); - response = client.request(req); - is = (InputStream) response.get("stream"); - if(useInternal) { - is = new InflaterInputStream(is); - } - return new FastInputStream(is); - } catch (Exception e) { - //close stream on error - IOUtils.closeQuietly(is); - throw new IOException("Could not download file '" + fileName + "'", e); - } finally { - client.shutdown(); - } + public void delete() throws Exception { + Files.delete(file.toPath()); } } - + + private class LocalFsFileFetcher extends FileFetcher { + LocalFsFileFetcher(File dir, Map fileDetails, String saveAs, + boolean isConf, long latestGen) throws IOException { + super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen); + } + } + NamedList getDetails() throws IOException, SolrServerException { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(COMMAND, CMD_DETAILS); @@ -1720,31 +1525,27 @@ public class SnapPuller { if (interval == null) return null; int result = 0; - if (interval != null) { - Matcher m = INTERVAL_PATTERN.matcher(interval.trim()); - if (m.find()) { - String hr = m.group(1); - String min = m.group(2); - String sec = m.group(3); - result = 0; - try { - if (sec != null && sec.length() > 0) - result += Integer.parseInt(sec); - if (min != null && min.length() > 0) - result += (60 * Integer.parseInt(min)); - if (hr != null && hr.length() > 0) - result += (60 * 60 * Integer.parseInt(hr)); - result *= 1000; - } catch (NumberFormatException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - INTERVAL_ERR_MSG); - } - } else { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - INTERVAL_ERR_MSG); + Matcher m = INTERVAL_PATTERN.matcher(interval.trim()); + if (m.find()) { + String hr = m.group(1); + String min = m.group(2); + String sec = m.group(3); + result = 0; + try { + if (sec != null && sec.length() > 0) + result += Integer.parseInt(sec); + if (min != null && min.length() > 0) + result += (60 * Integer.parseInt(min)); + if (hr != null && hr.length() > 0) + result += (60 * 60 * Integer.parseInt(hr)); + result *= 1000; + } catch (NumberFormatException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG); } - + } else { + throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG); } + return result; }