SOLR-1458: use saved commit points when replciating optimized indexes

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@823654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2009-10-09 18:36:28 +00:00
parent 678d377917
commit b2c0be12a6
2 changed files with 70 additions and 7 deletions

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.Directory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A wrapper for an IndexDeletionPolicy instance.
@ -39,6 +40,7 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
private final Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
private volatile IndexCommit latestCommit;
private final ConcurrentHashMap<Long, AtomicInteger> savedCommits = new ConcurrentHashMap<Long, AtomicInteger>();
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
this.deletionPolicy = deletionPolicy;
@ -98,6 +100,25 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
return result;
}
/** Permanently prevent this commit point from being deleted.
* A counter is used to allow a commit point to be correctly saved and released
* multiple times. */
public synchronized void saveCommitPoint(Long indexCommitVersion) {
AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
if (reserveCount == null) reserveCount = new AtomicInteger();
reserveCount.incrementAndGet();
}
/** Release a previously saved commit point */
public synchronized void releaseCommmitPoint(Long indexCommitVersion) {
AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
if (reserveCount == null) return;// this should not happen
if (reserveCount.decrementAndGet() <= 0) {
savedCommits.remove(indexCommitVersion);
}
}
/**
* Internal use for Lucene... do not explicitly call.
*/
@ -121,55 +142,74 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
private class IndexCommitWrapper extends IndexCommit {
IndexCommit delegate;
IndexCommitWrapper(IndexCommit delegate) {
this.delegate = delegate;
}
@Override
public String getSegmentsFileName() {
return delegate.getSegmentsFileName();
}
@Override
public Collection getFileNames() throws IOException {
return delegate.getFileNames();
}
@Override
public Directory getDirectory() {
return delegate.getDirectory();
}
@Override
public void delete() {
Long reserve = reserves.get(delegate.getVersion());
Long version = delegate.getVersion();
Long reserve = reserves.get(version);
if (reserve != null && System.currentTimeMillis() < reserve) return;
if(savedCommits.contains(version)) return;
delegate.delete();
}
@Override
public boolean isOptimized() {
return delegate.isOptimized();
}
@Override
public boolean equals(Object o) {
return delegate.equals(o);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public long getVersion() {
return delegate.getVersion();
}
@Override
public long getGeneration() {
return delegate.getGeneration();
}
@Override
public boolean isDeleted() {
return delegate.isDeleted();
}
@Override
public long getTimestamp() throws IOException {
return delegate.getTimestamp();
}
@Override
public Map getUserData() throws IOException {
return delegate.getUserData();
}
}
/**

View File

@ -18,6 +18,7 @@ package org.apache.solr.handler;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -815,15 +816,32 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
replicateOnStart = true;
RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
try {
IndexReader reader = s==null ? null : s.get().getReader();
if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
try {
if(!replicateOnCommit && replicateOnOptimize){
Collection<IndexCommit> commits = IndexReader.listCommits(reader.directory());
for (IndexCommit ic : commits) {
if(ic.isOptimized()){
if(indexCommitPoint == null || indexCommitPoint.getVersion() < ic.getVersion()) indexCommitPoint = ic;
}
}
} else{
indexCommitPoint = reader.getIndexCommit();
}
} finally {
if(indexCommitPoint != null){
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
}
}
}
if (core.getUpdateHandler() instanceof DirectUpdateHandler2) {
((DirectUpdateHandler2) core.getUpdateHandler()).forceOpenWriter();
} else {
LOG.warn("The update handler being used is not an instance or sub-class of DirectUpdateHandler2. " +
"Replicate on Startup cannot work.");
}
if (s.get().getReader().getIndexCommit() != null)
if (s.get().getReader().getIndexCommit().getGeneration() != 1L)
indexCommitPoint = s.get().getReader().getIndexCommit();
}
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup", e);
} finally {
@ -892,13 +910,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
* This refreshes the latest replicateable index commit and optionally can create Snapshots as well
*/
public void postCommit() {
if (getCommit || snapshoot) {
if (getCommit) {
IndexCommit oldCommitPoint = indexCommitPoint;
indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
if(oldCommitPoint != null){
core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion());
}
}
if (snapshoot) {
try {
SnapShooter snapShooter = new SnapShooter(core, null);
snapShooter.createSnapAsync(indexCommitPoint.getFileNames(), ReplicationHandler.this);
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(), ReplicationHandler.this);
} catch (Exception e) {
LOG.error("Exception while snapshooting", e);
}