SOLR-561 update: reserve index commit points for very short durations

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@707239 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2008-10-23 00:57:43 +00:00
parent 0d8d200c20
commit 62fc6816a3
3 changed files with 59 additions and 28 deletions

View File

@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy { public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
private IndexDeletionPolicy deletionPolicy; private IndexDeletionPolicy deletionPolicy;
private Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>(); private Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
private Map<Long, Long> reserves = new HashMap<Long, Long>(); private Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
private IndexCommit latestCommit; private IndexCommit latestCommit;
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) { public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
@ -51,13 +51,15 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
* @param reserveTime time in milliseconds for which the commit point is to be reserved * @param reserveTime time in milliseconds for which the commit point is to be reserved
*/ */
public void setReserveDuration(Long indexVersion, long reserveTime) { public void setReserveDuration(Long indexVersion, long reserveTime) {
synchronized (reserves) {
reserves.put(indexVersion, System.currentTimeMillis() + reserveTime); reserves.put(indexVersion, System.currentTimeMillis() + reserveTime);
List<Long> removeThese = new ArrayList<Long>();
for (Map.Entry<Long, Long> entry : reserves.entrySet()) {
if (entry.getValue() < System.currentTimeMillis()) removeThese.add(entry.getKey());
} }
for (Long l : removeThese) reserves.remove(l);
private void cleanReserves() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, Long> entry : reserves.entrySet()) {
if (entry.getValue() < currentTime) {
reserves.remove(entry.getKey());
}
} }
} }
@ -74,6 +76,7 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
List<IndexCommitWrapper> wrapperList = wrap(list); List<IndexCommitWrapper> wrapperList = wrap(list);
deletionPolicy.onInit(wrapperList); deletionPolicy.onInit(wrapperList);
updateCommitPoints(wrapperList); updateCommitPoints(wrapperList);
cleanReserves();
} }
/** /**
@ -83,6 +86,7 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
List<IndexCommitWrapper> wrapperList = wrap(list); List<IndexCommitWrapper> wrapperList = wrap(list);
deletionPolicy.onCommit(wrapperList); deletionPolicy.onCommit(wrapperList);
updateCommitPoints(wrapperList); updateCommitPoints(wrapperList);
cleanReserves();
} }
private class IndexCommitWrapper extends IndexCommit { private class IndexCommitWrapper extends IndexCommit {

View File

@ -27,6 +27,7 @@ import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook; import org.apache.solr.core.CloseHook;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener; import org.apache.solr.core.SolrEventListener;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.request.BinaryQueryResponseWriter; import org.apache.solr.request.BinaryQueryResponseWriter;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.request.SolrQueryResponse;
@ -89,15 +90,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private boolean replicateOnCommit = false; private boolean replicateOnCommit = false;
//private String masterUrl;
//private String pollInterval;
private int numTimesReplicated = 0; private int numTimesReplicated = 0;
private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>(); private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>();
private Integer reserveCommitDuration = SnapPuller.readInterval("01:00:00"); private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
private IndexCommit indexCommitPoint; private IndexCommit indexCommitPoint;
@ -628,15 +625,29 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
includeConfFiles = Arrays.asList(includeFiles.split(",")); includeConfFiles = Arrays.asList(includeFiles.split(","));
LOG.info("Replication enabled for following config files: " + includeConfFiles); LOG.info("Replication enabled for following config files: " + includeConfFiles);
} }
String snapshot = (String) master.get("snapshot"); List snapshot = master.getAll("snapshot");
if ("optimize".equals(master.get(REPLICATE_AFTER))) { boolean snapshotOnCommit = snapshot.contains("commit");
replicateOnOptimize = true; boolean snapshotOnOptimize = snapshot.contains("optimize");
boolean snapshoot = "optimize".equals(snapshot); List replicateAfter = master.getAll(REPLICATE_AFTER);
core.getUpdateHandler().registerOptimizeCallback(getEventListener(snapshoot)); replicateOnCommit = replicateAfter.contains("commit");
} else if ("commit".equals(master.get(REPLICATE_AFTER))) { replicateOnOptimize = replicateAfter.contains("optimize");
if (replicateOnOptimize || snapshotOnOptimize) {
core.getUpdateHandler().registerOptimizeCallback(getEventListener(snapshotOnOptimize, replicateOnOptimize));
}
if (replicateOnCommit || snapshotOnCommit) {
replicateOnCommit = true; replicateOnCommit = true;
boolean snapshoot = "commit".equals(snapshot); core.getUpdateHandler().registerCommitCallback(getEventListener(snapshotOnCommit, replicateOnCommit));
core.getUpdateHandler().registerCommitCallback(getEventListener(snapshoot)); }
if (replicateAfter.contains("startup")) {
RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
try {
indexCommitPoint = s.get().getReader().getIndexCommit();
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup",e);
} finally {
s.decref();
}
} }
String reserve = (String) master.get(RESERVE); String reserve = (String) master.get(RESERVE);
if (reserve != null && !reserve.trim().equals("")) { if (reserve != null && !reserve.trim().equals("")) {
@ -677,12 +688,14 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
private SolrEventListener getEventListener(final boolean snapshoot) { private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
return new SolrEventListener() { return new SolrEventListener() {
public void init(NamedList args) {/*no op*/ } public void init(NamedList args) {/*no op*/ }
public void postCommit() { public void postCommit() {
if(getCommit){
indexCommitPoint = core.getDeletionPolicy().getLatestCommit(); indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
}
if (snapshoot) { if (snapshoot) {
try { try {
SnapShooter snapShooter = new SnapShooter(core); SnapShooter snapShooter = new SnapShooter(core);
@ -709,8 +722,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private FastOutputStream fos; private FastOutputStream fos;
private Long indexVersion;
private IndexDeletionPolicyWrapper delPolicy;
public FileStream(SolrParams solrParams) { public FileStream(SolrParams solrParams) {
params = solrParams; params = solrParams;
delPolicy = core.getDeletionPolicy();
} }
public void write(OutputStream out) { public void write(OutputStream out) {
@ -720,7 +737,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
String sOffset = params.get(OFFSET); String sOffset = params.get(OFFSET);
String sLen = params.get(LEN); String sLen = params.get(LEN);
String sChecksum = params.get(CHECKSUM); String sChecksum = params.get(CHECKSUM);
String sindexVersion = params.get(CMD_INDEX_VERSION);
if(sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
FileInputStream inputStream = null; FileInputStream inputStream = null;
int packetsWritten = 0;
try { try {
long offset = -1; long offset = -1;
int len = -1; int len = -1;
@ -766,6 +786,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
} }
fos.write(buf, 0, (int) bytesRead); fos.write(buf, 0, (int) bytesRead);
fos.flush(); fos.flush();
if(indexVersion != null && (packetsWritten % 5 == 0)){
delPolicy.setReserveDuration(indexVersion, reserveCommitDuration);
}
} }
} else { } else {
writeNothing(); writeNothing();

View File

@ -238,11 +238,11 @@ public class SnapPuller {
boolean successfulInstall = false; boolean successfulInstall = false;
try { try {
File indexDir = new File(core.getIndexDir()); File indexDir = new File(core.getIndexDir());
downloadIndexFiles(isSnapNeeded, tmpIndexDir, client); downloadIndexFiles(isSnapNeeded, tmpIndexDir, client, latestVersion);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs"); LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) { if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
downloadConfFiles(client, confFilesToDownload); downloadConfFiles(client, confFilesToDownload, latestVersion);
if (isSnapNeeded) { if (isSnapNeeded) {
modifyIndexProps(tmpIndexDir.getName()); modifyIndexProps(tmpIndexDir.getName());
} else { } else {
@ -373,7 +373,7 @@ public class SnapPuller {
}.start(); }.start();
} }
private void downloadConfFiles(HttpClient client, List<Map<String, Object>> confFilesToDownload) throws Exception { private void downloadConfFiles(HttpClient client, List<Map<String, Object>> confFilesToDownload, long latestVersion) throws Exception {
LOG.info("Starting download of configuration files from master: " + confFilesToDownload); LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date())); File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
@ -383,7 +383,7 @@ public class SnapPuller {
"Failed to create temporary config folder: " + tmpconfDir.getName()); "Failed to create temporary config folder: " + tmpconfDir.getName());
} }
for (Map<String, Object> file : confFilesToDownload) { for (Map<String, Object> file : confFilesToDownload) {
fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), client, true); fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), client, true, latestVersion);
currentFile = file; currentFile = file;
fileFetcher.fetchFile(); fileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file)); confFilesDownloaded.add(new HashMap<String, Object>(file));
@ -392,12 +392,12 @@ public class SnapPuller {
} }
private void downloadIndexFiles(boolean snapNeeded, File snapDir, private void downloadIndexFiles(boolean snapNeeded, File snapDir,
HttpClient client) throws Exception { HttpClient client, long latestVersion) throws Exception {
for (Map<String, Object> file : filesToDownload) { for (Map<String, Object> file : filesToDownload) {
File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME)); File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME));
if (!localIndexFile.exists() || snapNeeded) { if (!localIndexFile.exists() || snapNeeded) {
fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME), fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
client, false); client, false, latestVersion);
currentFile = file; currentFile = file;
fileFetcher.fetchFile(); fileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String, Object>(file)); filesDownloaded.add(new HashMap<String, Object>(file));
@ -662,14 +662,17 @@ public class SnapPuller {
private boolean aborted = false; private boolean aborted = false;
private Long indexVersion;
FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs, FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
HttpClient client, boolean isConf) throws FileNotFoundException { HttpClient client, boolean isConf, long latestVersion) throws FileNotFoundException {
this.snapDir = dir; this.snapDir = dir;
this.fileName = (String) fileDetails.get(NAME); this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE); this.size = (Long) fileDetails.get(SIZE);
this.client = client; this.client = client;
this.isConf = isConf; this.isConf = isConf;
this.saveAs = saveAs; this.saveAs = saveAs;
indexVersion = latestVersion;
this.file = new File(snapDir, saveAs); this.file = new File(snapDir, saveAs);
this.fileChannel = new FileOutputStream(file).getChannel(); this.fileChannel = new FileOutputStream(file).getChannel();
@ -810,6 +813,7 @@ public class SnapPuller {
FastInputStream getStream() throws IOException { FastInputStream getStream() throws IOException {
post = new PostMethod(masterUrl); post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_GET_FILE); post.addParameter(COMMAND, CMD_GET_FILE);
post.addParameter(CMD_INDEX_VERSION, indexVersion.toString());
if (isConf) { if (isConf) {
post.addParameter(CONF_FILE_SHORT, fileName); post.addParameter(CONF_FILE_SHORT, fileName);
} else { } else {