SOLR-1068 -- Use a single threaded executor to fsync replicated index and configuration files before moving them

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@761577 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-04-03 09:08:37 +00:00
parent c7c9ef6a63
commit 6639925275
3 changed files with 73 additions and 37 deletions

View File

@ -369,7 +369,7 @@ Other Changes
21. Upgraded to Lucene 2.9-dev r752164 (shalin) 21. Upgraded to Lucene 2.9-dev r752164 (shalin)
22. SOLR-1068: Use fsync on replicated index and configuration files (yonik, shalin) 22. SOLR-1068: Use fsync on replicated index and configuration files (yonik, Noble Paul, shalin)
23. SOLR-952: Cleanup duplicated code in deprecated HighlightingUtils (hossman) 23. SOLR-952: Cleanup duplicated code in deprecated HighlightingUtils (hossman)

View File

@ -20,6 +20,7 @@ package org.apache.solr.common.util;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.FileNotFoundException;
/** /**
* @version $Id$ * @version $Id$
@ -50,6 +51,9 @@ public class FileUtils {
* @throws IOException if the file could not be synced * @throws IOException if the file could not be synced
*/ */
public static void sync(File fullFile) throws IOException { public static void sync(File fullFile) throws IOException {
if (fullFile == null || !fullFile.exists())
throw new FileNotFoundException("File does not exist " + fullFile);
boolean success = false; boolean success = false;
int retryCount = 0; int retryCount = 0;
IOException exc = null; IOException exc = null;

View File

@ -38,9 +38,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors; import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -87,6 +85,8 @@ public class SnapPuller {
private volatile FileFetcher fileFetcher; private volatile FileFetcher fileFetcher;
private volatile ExecutorService fsyncService;
private volatile boolean stop = false; private volatile boolean stop = false;
private boolean useInternal = false; private boolean useInternal = false;
@ -247,6 +247,8 @@ public class SnapPuller {
fetchFileList(latestVersion); fetchFileList(latestVersion);
LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size()); LOG.info("Number of files in latest snapshot in master: " + filesToDownload.size());
// Create the sync service
fsyncService = Executors.newSingleThreadExecutor();
// use a synchronized list because the list is read by other threads (to show details) // use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied // if the generateion of master is older than that of the slave , it means they are not compatible to be copied
@ -261,7 +263,7 @@ public class SnapPuller {
downloadIndexFiles(isSnapNeeded, tmpIndexDir, latestVersion); downloadIndexFiles(isSnapNeeded, tmpIndexDir, latestVersion);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs"); LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) { if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestVersion); downloadConfFiles(confFilesToDownload, latestVersion);
if (isSnapNeeded) { if (isSnapNeeded) {
modifyIndexProps(tmpIndexDir.getName()); modifyIndexProps(tmpIndexDir.getName());
@ -274,6 +276,7 @@ public class SnapPuller {
reloadCore(); reloadCore();
} }
} else { } else {
terminateAndWaitFsyncService();
LOG.info("Conf files are not downloaded or are in sync"); LOG.info("Conf files are not downloaded or are in sync");
if (isSnapNeeded) { if (isSnapNeeded) {
modifyIndexProps(tmpIndexDir.getName()); modifyIndexProps(tmpIndexDir.getName());
@ -302,10 +305,29 @@ public class SnapPuller {
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null; filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
replicationStartTime = 0; replicationStartTime = 0;
fileFetcher = null; fileFetcher = null;
fsyncService = null;
stop = false; stop = false;
fsyncException = null;
} }
} }
private volatile Exception fsyncException;
/**
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated
*
* @throws Exception
*/
private void terminateAndWaitFsyncService() throws Exception {
if (fsyncService.isTerminated()) return;
fsyncService.shutdown();
// give a long wait say 1 hr
fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
// if any fsync failed, throw that exception back
Exception fsyncExceptionCopy = fsyncException;
if (fsyncExceptionCopy != null) throw fsyncExceptionCopy;
}
/** /**
* Helper method to record the last replication's details so that we can show them on the statistics page across * Helper method to record the last replication's details so that we can show them on the statistics page across
* restarts. * restarts.
@ -394,6 +416,7 @@ public class SnapPuller {
LOG.info("Starting download of configuration files from master: " + confFilesToDownload); LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
try {
boolean status = tmpconfDir.mkdirs(); boolean status = tmpconfDir.mkdirs();
if (!status) { if (!status) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@ -406,7 +429,13 @@ public class SnapPuller {
fileFetcher.fetchFile(); fileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file)); confFilesDownloaded.add(new HashMap<String, Object>(file));
} }
// this is called before copying the files to the original conf dir
// so that if there is an exception avoid corrupting the original files.
terminateAndWaitFsyncService();
copyTmpConfFiles2Conf(tmpconfDir); copyTmpConfFiles2Conf(tmpconfDir);
} finally {
delTree(tmpconfDir);
}
} }
/** /**
@ -491,13 +520,11 @@ public class SnapPuller {
continue; continue;
} }
if (!copyAFile(snapDir, indexDir, fname, copiedfiles)) return false; if (!copyAFile(snapDir, indexDir, fname, copiedfiles)) return false;
FileUtils.sync(new File(indexDir, fname));
copiedfiles.add(fname); copiedfiles.add(fname);
} }
//copy the segments file last //copy the segments file last
if (segmentsFile != null) { if (segmentsFile != null) {
if (!copyAFile(snapDir, indexDir, segmentsFile, copiedfiles)) return false; if (!copyAFile(snapDir, indexDir, segmentsFile, copiedfiles)) return false;
FileUtils.sync(new File(indexDir, segmentsFile));
} }
return true; return true;
} }
@ -507,7 +534,6 @@ public class SnapPuller {
*/ */
private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException { private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException {
File confDir = new File(solrCore.getResourceLoader().getConfigDir()); File confDir = new File(solrCore.getResourceLoader().getConfigDir());
try {
for (File file : tmpconfDir.listFiles()) { for (File file : tmpconfDir.listFiles()) {
File oldFile = new File(confDir, file.getName()); File oldFile = new File(confDir, file.getName());
if (oldFile.exists()) { if (oldFile.exists()) {
@ -520,15 +546,11 @@ public class SnapPuller {
} }
boolean status = file.renameTo(oldFile); boolean status = file.renameTo(oldFile);
if (status) { if (status) {
FileUtils.sync(oldFile);
} else { } else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to rename: " + file + " to: " + oldFile); "Unable to rename: " + file + " to: " + oldFile);
} }
} }
} finally {
delTree(tmpconfDir);
}
} }
private String getDateAsStr(Date d) { private String getDateAsStr(Date d) {
@ -786,6 +808,16 @@ public class SnapPuller {
} }
} finally { } finally {
cleanup(); cleanup();
//if cleanup suceeds . The file is downloaded fully. do an fsync
fsyncService.submit(new Runnable(){
public void run() {
try {
FileUtils.sync(file);
} catch (IOException e) {
fsyncException = e;
}
}
});
} }
} }