mirror of
https://github.com/apache/lucene.git
synced 2025-02-24 11:16:35 +00:00
SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1654858 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
70bc0af0ee
commit
8755ea2edd
@ -54,7 +54,7 @@ Other Changes
|
||||
* SOLR-6895: Deprecated SolrServer classes have been removed (Alan Woodward,
|
||||
Erik Hatcher)
|
||||
|
||||
* SOLR-6902Use JUnit rules instead of inheritance with distributed Solr
|
||||
* SOLR-6902: Use JUnit rules instead of inheritance with distributed Solr
|
||||
tests to allow for multiple tests without the same class.
|
||||
(Ramkumar Aiyengar, Erick Erickson, Mike McCandless)
|
||||
|
||||
@ -87,8 +87,12 @@ New Features
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
* SOLR-7014: Collapse identical catch branches in try-catch statements. (shalin)
|
||||
|
||||
* SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging.
|
||||
(Ramkumar Aiyengar via Mark Miller)
|
||||
|
||||
================== 5.0.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
@ -21,6 +21,8 @@ import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.solr.update.SolrIndexWriter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
@ -43,6 +45,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
* @see org.apache.lucene.index.IndexDeletionPolicy
|
||||
*/
|
||||
public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IndexDeletionPolicyWrapper.class.getName());
|
||||
|
||||
private final IndexDeletionPolicy deletionPolicy;
|
||||
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<>();
|
||||
private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
|
||||
@ -82,7 +86,11 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
||||
|
||||
// this is the common success case: the older time didn't exist, or
|
||||
// came before the new time.
|
||||
if (previousTime == null || previousTime <= timeToSet) break;
|
||||
if (previousTime == null || previousTime <= timeToSet) {
|
||||
LOG.debug("Commit point reservation for generation {} set to {} (requested reserve time of {})",
|
||||
indexGen, timeToSet, reserveTime);
|
||||
break;
|
||||
}
|
||||
|
||||
// At this point, we overwrote a longer reservation, so we want to restore the older one.
|
||||
// the problem is that an even longer reservation may come in concurrently
|
||||
|
@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
|
||||
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
|
||||
SolrCore core;
|
||||
|
||||
@ -212,7 +212,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
|
||||
rsp.add(STATUS, OK_STATUS);
|
||||
} else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
|
||||
deleteSnapshot(new ModifiableSolrParams(solrParams), rsp, req);
|
||||
deleteSnapshot(new ModifiableSolrParams(solrParams));
|
||||
rsp.add(STATUS, OK_STATUS);
|
||||
} else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
|
||||
String masterUrl = solrParams.get(MASTER_URL);
|
||||
@ -272,7 +272,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteSnapshot(ModifiableSolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
|
||||
private void deleteSnapshot(ModifiableSolrParams params) {
|
||||
String name = params.get("name");
|
||||
if(name == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
|
||||
@ -793,8 +793,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
} else if (clzz == List.class) {
|
||||
String ss[] = s.split(",");
|
||||
List<String> l = new ArrayList<>();
|
||||
for (int i = 0; i < ss.length; i++) {
|
||||
l.add(new Date(Long.valueOf(ss[i])).toString());
|
||||
for (String s1 : ss) {
|
||||
l.add(new Date(Long.valueOf(s1)).toString());
|
||||
}
|
||||
nl.add(key, l);
|
||||
} else {
|
||||
@ -1182,6 +1182,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
offset = offset == -1 ? 0 : offset;
|
||||
int read = (int) Math.min(buf.length, filelen - offset);
|
||||
in.readBytes(buf, 0, read);
|
||||
|
||||
fos.writeInt(read);
|
||||
if (useChecksum) {
|
||||
checksum.reset();
|
||||
@ -1190,6 +1191,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
}
|
||||
fos.write(buf, 0, read);
|
||||
fos.flush();
|
||||
LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
|
||||
|
||||
//Pause if necessary
|
||||
maxBytesBeforePause += read;
|
||||
@ -1239,8 +1241,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
FileInputStream inputStream = null;
|
||||
try {
|
||||
initWrite();
|
||||
|
||||
//if if is a conf file read from config diectory
|
||||
|
||||
//if if is a conf file read from config directory
|
||||
File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
|
||||
|
||||
if (file.exists() && file.canRead()) {
|
||||
@ -1364,7 +1366,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||
* Boolean param for tests that can be specified when using
|
||||
* {@link #CMD_FETCH_INDEX} to force the current request to block until
|
||||
* the fetch is complete. <b>NOTE:</b> This param is not advised for
|
||||
* non-test code, since the the durration of the fetch for non-trivial
|
||||
* non-test code, since the the duration of the fetch for non-trivial
|
||||
* indexes will likeley cause the request to time out.
|
||||
*
|
||||
* @lucene.internal
|
||||
|
@ -179,9 +179,7 @@ public class SnapPuller {
|
||||
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
|
||||
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
|
||||
|
||||
HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
|
||||
|
||||
return httpClient;
|
||||
return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
|
||||
}
|
||||
|
||||
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
|
||||
@ -520,7 +518,7 @@ public class SnapPuller {
|
||||
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
|
||||
}
|
||||
|
||||
openNewSearcherAndUpdateCommitPoint(isFullCopyNeeded);
|
||||
openNewSearcherAndUpdateCommitPoint();
|
||||
}
|
||||
|
||||
replicationStartTime = 0;
|
||||
@ -699,9 +697,7 @@ public class SnapPuller {
|
||||
List<String> l = new ArrayList<>();
|
||||
if (str != null && str.length() != 0) {
|
||||
String[] ss = str.split(",");
|
||||
for (int i = 0; i < ss.length; i++) {
|
||||
l.add(ss[i]);
|
||||
}
|
||||
Collections.addAll(l, ss);
|
||||
}
|
||||
sb.append(replicationTime);
|
||||
if (!l.isEmpty()) {
|
||||
@ -714,7 +710,7 @@ public class SnapPuller {
|
||||
return sb;
|
||||
}
|
||||
|
||||
private void openNewSearcherAndUpdateCommitPoint(boolean isFullCopyNeeded) throws IOException {
|
||||
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
|
||||
new ModifiableSolrParams());
|
||||
|
||||
@ -749,8 +745,7 @@ public class SnapPuller {
|
||||
private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
|
||||
// TODO: there should probably be a DirectoryFactory#concatPath(parent, name)
|
||||
// or something
|
||||
String tmpIdxDir = core.getDataDir() + tmpIdxDirName;
|
||||
return tmpIdxDir;
|
||||
return core.getDataDir() + tmpIdxDirName;
|
||||
}
|
||||
|
||||
private void reloadCore() {
|
||||
@ -777,7 +772,7 @@ public class SnapPuller {
|
||||
|
||||
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
|
||||
LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
|
||||
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
|
||||
confFilesDownloaded = Collections.synchronizedList(new ArrayList<>());
|
||||
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
|
||||
try {
|
||||
boolean status = tmpconfDir.mkdirs();
|
||||
@ -868,7 +863,7 @@ public class SnapPuller {
|
||||
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
|
||||
* <p/>
|
||||
*/
|
||||
private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
|
||||
private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) {
|
||||
LOG.debug("Moving file: {}", fname);
|
||||
boolean success = false;
|
||||
try {
|
||||
@ -902,7 +897,6 @@ public class SnapPuller {
|
||||
}
|
||||
}
|
||||
String segmentsFile = null;
|
||||
List<String> movedfiles = new ArrayList<>();
|
||||
for (Map<String, Object> f : filesDownloaded) {
|
||||
String fname = (String) f.get(NAME);
|
||||
// the segments file must be copied last
|
||||
@ -914,12 +908,11 @@ public class SnapPuller {
|
||||
segmentsFile = fname;
|
||||
continue;
|
||||
}
|
||||
if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
|
||||
movedfiles.add(fname);
|
||||
if (!moveAFile(tmpIdxDir, indexDir, fname)) return false;
|
||||
}
|
||||
//copy the segments file last
|
||||
if (segmentsFile != null) {
|
||||
if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
|
||||
if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -945,7 +938,7 @@ public class SnapPuller {
|
||||
private void copyTmpConfFiles2Conf(File tmpconfDir) {
|
||||
boolean status = false;
|
||||
File confDir = new File(solrCore.getResourceLoader().getConfigDir());
|
||||
for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<File>())) {
|
||||
for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<>())) {
|
||||
File oldFile = new File(confDir, file.getPath().substring(tmpconfDir.getPath().length(), file.getPath().length()));
|
||||
if (!oldFile.getParentFile().exists()) {
|
||||
status = oldFile.getParentFile().mkdirs();
|
||||
@ -1157,7 +1150,7 @@ public class SnapPuller {
|
||||
return null;
|
||||
tmp = new HashMap<>(tmp);
|
||||
if (tmpFileFetcher != null)
|
||||
tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
|
||||
tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded());
|
||||
return tmp;
|
||||
}
|
||||
|
||||
@ -1178,58 +1171,53 @@ public class SnapPuller {
|
||||
}
|
||||
}
|
||||
|
||||
private interface FileInterface {
|
||||
public void sync() throws IOException;
|
||||
public void write(byte[] buf, int packetSize) throws IOException;
|
||||
public void close() throws Exception;
|
||||
public void delete() throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
|
||||
*
|
||||
* @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
|
||||
*/
|
||||
private class DirectoryFileFetcher {
|
||||
boolean includeChecksum = true;
|
||||
|
||||
Directory copy2Dir;
|
||||
|
||||
String fileName;
|
||||
|
||||
String saveAs;
|
||||
|
||||
long size;
|
||||
|
||||
long bytesDownloaded = 0;
|
||||
|
||||
byte[] buf = new byte[1024 * 1024];
|
||||
|
||||
Checksum checksum;
|
||||
|
||||
int errorCount = 0;
|
||||
|
||||
private class FileFetcher {
|
||||
private final FileInterface file;
|
||||
private boolean includeChecksum = true;
|
||||
private String fileName;
|
||||
private String saveAs;
|
||||
private boolean isConf;
|
||||
|
||||
private boolean aborted = false;
|
||||
|
||||
private Long indexGen;
|
||||
|
||||
private IndexOutput outStream;
|
||||
private long size;
|
||||
private long bytesDownloaded = 0;
|
||||
private byte[] buf = new byte[1024 * 1024];
|
||||
private Checksum checksum;
|
||||
private int errorCount = 0;
|
||||
private boolean aborted = false;
|
||||
|
||||
DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
|
||||
FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
|
||||
boolean isConf, long latestGen) throws IOException {
|
||||
this.copy2Dir = tmpIndexDir;
|
||||
this.file = file;
|
||||
this.fileName = (String) fileDetails.get(NAME);
|
||||
this.size = (Long) fileDetails.get(SIZE);
|
||||
this.isConf = isConf;
|
||||
this.saveAs = saveAs;
|
||||
|
||||
indexGen = latestGen;
|
||||
|
||||
outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
|
||||
|
||||
if (includeChecksum)
|
||||
checksum = new Adler32();
|
||||
}
|
||||
|
||||
public long getBytesDownloaded() {
|
||||
return bytesDownloaded;
|
||||
}
|
||||
|
||||
/**
|
||||
* The main method which downloads file
|
||||
*/
|
||||
void fetchFile() throws Exception {
|
||||
public void fetchFile() throws Exception {
|
||||
try {
|
||||
while (true) {
|
||||
final FastInputStream is = getStream();
|
||||
@ -1248,12 +1236,12 @@ public class SnapPuller {
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
//if cleanup suceeds . The file is downloaded fully. do an fsync
|
||||
//if cleanup succeeds . The file is downloaded fully. do an fsync
|
||||
fsyncService.submit(new Runnable(){
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
copy2Dir.sync(Collections.singleton(saveAs));
|
||||
file.sync();
|
||||
} catch (IOException e) {
|
||||
fsyncException = e;
|
||||
}
|
||||
@ -1277,7 +1265,7 @@ public class SnapPuller {
|
||||
//read the size of the packet
|
||||
int packetSize = readInt(intbytes);
|
||||
if (packetSize <= 0) {
|
||||
LOG.warn("No content received for file: " + currentFile);
|
||||
LOG.warn("No content received for file: {}", fileName);
|
||||
return NO_CONTENT;
|
||||
}
|
||||
if (buf.length < packetSize)
|
||||
@ -1295,45 +1283,45 @@ public class SnapPuller {
|
||||
checksum.update(buf, 0, packetSize);
|
||||
long checkSumClient = checksum.getValue();
|
||||
if (checkSumClient != checkSumServer) {
|
||||
LOG.error("Checksum not matched between client and server for: " + currentFile);
|
||||
LOG.error("Checksum not matched between client and server for file: {}", fileName);
|
||||
//if checksum is wrong it is a problem return for retry
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
//if everything is fine, write down the packet to the file
|
||||
writeBytes(packetSize);
|
||||
file.write(buf, packetSize);
|
||||
bytesDownloaded += packetSize;
|
||||
LOG.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
|
||||
if (bytesDownloaded >= size)
|
||||
return 0;
|
||||
//errorcount is always set to zero after a successful packet
|
||||
//errorCount is always set to zero after a successful packet
|
||||
errorCount = 0;
|
||||
}
|
||||
} catch (ReplicationHandlerException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error in fetching packets ", e);
|
||||
//for any failure , increment the error count
|
||||
LOG.warn("Error in fetching file: {} (downloaded {} of {} bytes)",
|
||||
fileName, bytesDownloaded, size, e);
|
||||
//for any failure, increment the error count
|
||||
errorCount++;
|
||||
//if it fails for the same pacaket for MAX_RETRIES fail and come out
|
||||
//if it fails for the same packet for MAX_RETRIES fail and come out
|
||||
if (errorCount > MAX_RETRIES) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Fetch failed for file:" + fileName, e);
|
||||
"Failed to fetch file: " + fileName +
|
||||
" (downloaded " + bytesDownloaded + " of " + size + " bytes" +
|
||||
", error count: " + errorCount + " > " + MAX_RETRIES + ")", e);
|
||||
}
|
||||
return ERR;
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeBytes(int packetSize) throws IOException {
|
||||
outStream.writeBytes(buf, 0, packetSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
|
||||
* other wise it fails. So read everything as bytes and then extract an integer out of it
|
||||
*/
|
||||
private int readInt(byte[] b) {
|
||||
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
|
||||
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
|
||||
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
|
||||
|
||||
}
|
||||
|
||||
@ -1342,9 +1330,9 @@ public class SnapPuller {
|
||||
*/
|
||||
private long readLong(byte[] b) {
|
||||
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
|
||||
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
|
||||
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
|
||||
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
|
||||
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
|
||||
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
|
||||
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
|
||||
|
||||
}
|
||||
|
||||
@ -1353,30 +1341,30 @@ public class SnapPuller {
|
||||
*/
|
||||
private void cleanup() {
|
||||
try {
|
||||
outStream.close();
|
||||
} catch (Exception e) {/* noop */
|
||||
LOG.error("Error closing the file stream: "+ this.saveAs ,e);
|
||||
file.close();
|
||||
} catch (Exception e) {/* no-op */
|
||||
LOG.error("Error closing file: {}", this.saveAs, e);
|
||||
}
|
||||
if (bytesDownloaded != size) {
|
||||
//if the download is not complete then
|
||||
//delete the file being downloaded
|
||||
try {
|
||||
copy2Dir.deleteFile(saveAs);
|
||||
file.delete();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error deleting file in cleanup" + e.getMessage());
|
||||
LOG.error("Error deleting file: {}", this.saveAs, e);
|
||||
}
|
||||
//if the failure is due to a user abort it is returned nomally else an exception is thrown
|
||||
//if the failure is due to a user abort it is returned normally else an exception is thrown
|
||||
if (!aborted)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to download " + fileName + " completely. Downloaded "
|
||||
+ bytesDownloaded + "!=" + size);
|
||||
"Unable to download " + fileName + " completely. Downloaded "
|
||||
+ bytesDownloaded + "!=" + size);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a new stream using HttpClient
|
||||
*/
|
||||
FastInputStream getStream() throws IOException {
|
||||
private FastInputStream getStream() throws IOException {
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
@ -1392,7 +1380,7 @@ public class SnapPuller {
|
||||
params.set(FILE, fileName);
|
||||
}
|
||||
if (useInternal) {
|
||||
params.set(COMPRESSION, "true");
|
||||
params.set(COMPRESSION, "true");
|
||||
}
|
||||
//use checksum
|
||||
if (this.includeChecksum) {
|
||||
@ -1400,16 +1388,16 @@ public class SnapPuller {
|
||||
}
|
||||
//wt=filestream this is a custom protocol
|
||||
params.set(CommonParams.WT, FILE_STREAM);
|
||||
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
|
||||
// the server starts from the offset
|
||||
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
|
||||
// the server starts from the offset
|
||||
if (bytesDownloaded > 0) {
|
||||
params.set(OFFSET, Long.toString(bytesDownloaded));
|
||||
}
|
||||
|
||||
|
||||
|
||||
NamedList response;
|
||||
InputStream is = null;
|
||||
|
||||
|
||||
HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
|
||||
try {
|
||||
client.setSoTimeout(60000);
|
||||
@ -1430,274 +1418,91 @@ public class SnapPuller {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
|
||||
*
|
||||
* @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
|
||||
*/
|
||||
private class LocalFsFileFetcher {
|
||||
boolean includeChecksum = true;
|
||||
|
||||
private class DirectoryFile implements FileInterface {
|
||||
private final String saveAs;
|
||||
private Directory copy2Dir;
|
||||
private IndexOutput outStream;
|
||||
|
||||
DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException {
|
||||
this.saveAs = saveAs;
|
||||
this.copy2Dir = tmpIndexDir;
|
||||
outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
|
||||
}
|
||||
|
||||
public void sync() throws IOException {
|
||||
copy2Dir.sync(Collections.singleton(saveAs));
|
||||
}
|
||||
|
||||
public void write(byte[] buf, int packetSize) throws IOException {
|
||||
outStream.writeBytes(buf, 0, packetSize);
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
outStream.close();
|
||||
}
|
||||
|
||||
public void delete() throws Exception {
|
||||
copy2Dir.deleteFile(saveAs);
|
||||
}
|
||||
}
|
||||
|
||||
private class DirectoryFileFetcher extends FileFetcher {
|
||||
DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
|
||||
boolean isConf, long latestGen) throws IOException {
|
||||
super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
|
||||
}
|
||||
}
|
||||
|
||||
private class LocalFsFile implements FileInterface {
|
||||
private File copy2Dir;
|
||||
|
||||
String fileName;
|
||||
|
||||
String saveAs;
|
||||
|
||||
long size;
|
||||
|
||||
long bytesDownloaded = 0;
|
||||
|
||||
FileChannel fileChannel;
|
||||
|
||||
private FileOutputStream fileOutputStream;
|
||||
|
||||
byte[] buf = new byte[1024 * 1024];
|
||||
|
||||
Checksum checksum;
|
||||
|
||||
File file;
|
||||
|
||||
int errorCount = 0;
|
||||
|
||||
private boolean isConf;
|
||||
|
||||
private boolean aborted = false;
|
||||
|
||||
private Long indexGen;
|
||||
|
||||
// TODO: could do more code sharing with DirectoryFileFetcher
|
||||
LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
|
||||
boolean isConf, long latestGen) throws IOException {
|
||||
LocalFsFile(File dir, String saveAs) throws IOException {
|
||||
this.copy2Dir = dir;
|
||||
this.fileName = (String) fileDetails.get(NAME);
|
||||
this.size = (Long) fileDetails.get(SIZE);
|
||||
this.isConf = isConf;
|
||||
this.saveAs = saveAs;
|
||||
|
||||
indexGen = latestGen;
|
||||
|
||||
this.file = new File(copy2Dir, saveAs);
|
||||
|
||||
|
||||
File parentDir = this.file.getParentFile();
|
||||
if( ! parentDir.exists() ){
|
||||
if ( ! parentDir.mkdirs() ) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed to create (sub)directory for file: " + saveAs);
|
||||
"Failed to create (sub)directory for file: " + saveAs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
this.fileOutputStream = new FileOutputStream(file);
|
||||
this.fileChannel = this.fileOutputStream.getChannel();
|
||||
|
||||
if (includeChecksum)
|
||||
checksum = new Adler32();
|
||||
}
|
||||
|
||||
/**
|
||||
* The main method which downloads file
|
||||
*/
|
||||
void fetchFile() throws Exception {
|
||||
try {
|
||||
while (true) {
|
||||
final FastInputStream is = getStream();
|
||||
int result;
|
||||
try {
|
||||
//fetch packets one by one in a single request
|
||||
result = fetchPackets(is);
|
||||
if (result == 0 || result == NO_CONTENT) {
|
||||
return;
|
||||
}
|
||||
//if there is an error continue. But continue from the point where it got broken
|
||||
} finally {
|
||||
IOUtils.closeQuietly(is);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
//if cleanup suceeds . The file is downloaded fully. do an fsync
|
||||
fsyncService.submit(new Runnable(){
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
FileUtils.sync(file);
|
||||
} catch (IOException e) {
|
||||
fsyncException = e;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
public void sync() throws IOException {
|
||||
FileUtils.sync(file);
|
||||
}
|
||||
|
||||
private int fetchPackets(FastInputStream fis) throws Exception {
|
||||
byte[] intbytes = new byte[4];
|
||||
byte[] longbytes = new byte[8];
|
||||
try {
|
||||
while (true) {
|
||||
if (stop) {
|
||||
stop = false;
|
||||
aborted = true;
|
||||
throw new ReplicationHandlerException("User aborted replication");
|
||||
}
|
||||
long checkSumServer = -1;
|
||||
fis.readFully(intbytes);
|
||||
//read the size of the packet
|
||||
int packetSize = readInt(intbytes);
|
||||
if (packetSize <= 0) {
|
||||
LOG.warn("No content received for file: " + currentFile);
|
||||
return NO_CONTENT;
|
||||
}
|
||||
if (buf.length < packetSize)
|
||||
buf = new byte[packetSize];
|
||||
if (checksum != null) {
|
||||
//read the checksum
|
||||
fis.readFully(longbytes);
|
||||
checkSumServer = readLong(longbytes);
|
||||
}
|
||||
//then read the packet of bytes
|
||||
fis.readFully(buf, 0, packetSize);
|
||||
//compare the checksum as sent from the master
|
||||
if (includeChecksum) {
|
||||
checksum.reset();
|
||||
checksum.update(buf, 0, packetSize);
|
||||
long checkSumClient = checksum.getValue();
|
||||
if (checkSumClient != checkSumServer) {
|
||||
LOG.error("Checksum not matched between client and server for: " + currentFile);
|
||||
//if checksum is wrong it is a problem return for retry
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
//if everything is fine, write down the packet to the file
|
||||
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
|
||||
bytesDownloaded += packetSize;
|
||||
if (bytesDownloaded >= size)
|
||||
return 0;
|
||||
//errorcount is always set to zero after a successful packet
|
||||
errorCount = 0;
|
||||
}
|
||||
} catch (ReplicationHandlerException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error in fetching packets ", e);
|
||||
//for any failure , increment the error count
|
||||
errorCount++;
|
||||
//if it fails for the same pacaket for MAX_RETRIES fail and come out
|
||||
if (errorCount > MAX_RETRIES) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Fetch failed for file:" + fileName, e);
|
||||
}
|
||||
return ERR;
|
||||
}
|
||||
public void write(byte[] buf, int packetSize) throws IOException {
|
||||
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
|
||||
}
|
||||
|
||||
/**
|
||||
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
|
||||
* other wise it fails. So read everything as bytes and then extract an integer out of it
|
||||
*/
|
||||
private int readInt(byte[] b) {
|
||||
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
|
||||
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
|
||||
|
||||
public void close() throws Exception {
|
||||
//close the FileOutputStream (which also closes the Channel)
|
||||
fileOutputStream.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as above but to read longs from a byte array
|
||||
*/
|
||||
private long readLong(byte[] b) {
|
||||
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
|
||||
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
|
||||
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
|
||||
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* cleanup everything
|
||||
*/
|
||||
private void cleanup() {
|
||||
try {
|
||||
//close the FileOutputStream (which also closes the Channel)
|
||||
fileOutputStream.close();
|
||||
} catch (Exception e) {/* noop */
|
||||
LOG.error("Error closing the file stream: "+ this.saveAs ,e);
|
||||
}
|
||||
if (bytesDownloaded != size) {
|
||||
//if the download is not complete then
|
||||
//delete the file being downloaded
|
||||
try {
|
||||
Files.delete(file.toPath());
|
||||
} catch (SecurityException e) {
|
||||
LOG.error("Error deleting file in cleanup" + e.getMessage());
|
||||
} catch (Throwable other) {
|
||||
// TODO: should this class care if a file couldnt be deleted?
|
||||
// this just emulates previous behavior, where only SecurityException would be handled.
|
||||
}
|
||||
//if the failure is due to a user abort it is returned nomally else an exception is thrown
|
||||
if (!aborted)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Unable to download " + fileName + " completely. Downloaded "
|
||||
+ bytesDownloaded + "!=" + size);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a new stream using HttpClient
|
||||
*/
|
||||
FastInputStream getStream() throws IOException {
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
// //the method is command=filecontent
|
||||
params.set(COMMAND, CMD_GET_FILE);
|
||||
params.set(GENERATION, Long.toString(indexGen));
|
||||
params.set(CommonParams.QT, "/replication");
|
||||
//add the version to download. This is used to reserve the download
|
||||
if (isConf) {
|
||||
//set cf instead of file for config file
|
||||
params.set(CONF_FILE_SHORT, fileName);
|
||||
} else {
|
||||
params.set(FILE, fileName);
|
||||
}
|
||||
if (useInternal) {
|
||||
params.set(COMPRESSION, "true");
|
||||
}
|
||||
//use checksum
|
||||
if (this.includeChecksum) {
|
||||
params.set(CHECKSUM, true);
|
||||
}
|
||||
//wt=filestream this is a custom protocol
|
||||
params.set(CommonParams.WT, FILE_STREAM);
|
||||
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
|
||||
// the server starts from the offset
|
||||
if (bytesDownloaded > 0) {
|
||||
params.set(OFFSET, Long.toString(bytesDownloaded));
|
||||
}
|
||||
|
||||
|
||||
NamedList response;
|
||||
InputStream is = null;
|
||||
HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
|
||||
try {
|
||||
client.setSoTimeout(60000);
|
||||
client.setConnectionTimeout(15000);
|
||||
QueryRequest req = new QueryRequest(params);
|
||||
response = client.request(req);
|
||||
is = (InputStream) response.get("stream");
|
||||
if(useInternal) {
|
||||
is = new InflaterInputStream(is);
|
||||
}
|
||||
return new FastInputStream(is);
|
||||
} catch (Exception e) {
|
||||
//close stream on error
|
||||
IOUtils.closeQuietly(is);
|
||||
throw new IOException("Could not download file '" + fileName + "'", e);
|
||||
} finally {
|
||||
client.shutdown();
|
||||
}
|
||||
public void delete() throws Exception {
|
||||
Files.delete(file.toPath());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class LocalFsFileFetcher extends FileFetcher {
|
||||
LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
|
||||
boolean isConf, long latestGen) throws IOException {
|
||||
super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
|
||||
}
|
||||
}
|
||||
|
||||
NamedList getDetails() throws IOException, SolrServerException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(COMMAND, CMD_DETAILS);
|
||||
@ -1720,31 +1525,27 @@ public class SnapPuller {
|
||||
if (interval == null)
|
||||
return null;
|
||||
int result = 0;
|
||||
if (interval != null) {
|
||||
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
|
||||
if (m.find()) {
|
||||
String hr = m.group(1);
|
||||
String min = m.group(2);
|
||||
String sec = m.group(3);
|
||||
result = 0;
|
||||
try {
|
||||
if (sec != null && sec.length() > 0)
|
||||
result += Integer.parseInt(sec);
|
||||
if (min != null && min.length() > 0)
|
||||
result += (60 * Integer.parseInt(min));
|
||||
if (hr != null && hr.length() > 0)
|
||||
result += (60 * 60 * Integer.parseInt(hr));
|
||||
result *= 1000;
|
||||
} catch (NumberFormatException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
INTERVAL_ERR_MSG);
|
||||
}
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
INTERVAL_ERR_MSG);
|
||||
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
|
||||
if (m.find()) {
|
||||
String hr = m.group(1);
|
||||
String min = m.group(2);
|
||||
String sec = m.group(3);
|
||||
result = 0;
|
||||
try {
|
||||
if (sec != null && sec.length() > 0)
|
||||
result += Integer.parseInt(sec);
|
||||
if (min != null && min.length() > 0)
|
||||
result += (60 * Integer.parseInt(min));
|
||||
if (hr != null && hr.length() > 0)
|
||||
result += (60 * 60 * Integer.parseInt(hr));
|
||||
result *= 1000;
|
||||
} catch (NumberFormatException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user