diff --git a/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java b/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java index 5b4b208daa8..0f339db8678 100644 --- a/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java +++ b/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java @@ -23,6 +23,7 @@ import org.apache.lucene.store.Directory; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * A wrapper for an IndexDeletionPolicy instance. @@ -39,6 +40,7 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy { private volatile Map solrVersionVsCommits = new ConcurrentHashMap(); private final Map reserves = new ConcurrentHashMap(); private volatile IndexCommit latestCommit; + private final ConcurrentHashMap savedCommits = new ConcurrentHashMap(); public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) { this.deletionPolicy = deletionPolicy; @@ -98,6 +100,25 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy { return result; } + /** Permanently prevent this commit point from being deleted. + * A counter is used to allow a commit point to be correctly saved and released + * multiple times. */ + public synchronized void saveCommitPoint(Long indexCommitVersion) { + AtomicInteger reserveCount = savedCommits.get(indexCommitVersion); + if (reserveCount == null) reserveCount = new AtomicInteger(); + reserveCount.incrementAndGet(); + } + + /** Release a previously saved commit point */ + public synchronized void releaseCommmitPoint(Long indexCommitVersion) { + AtomicInteger reserveCount = savedCommits.get(indexCommitVersion); + if (reserveCount == null) return;// this should not happen + if (reserveCount.decrementAndGet() <= 0) { + savedCommits.remove(indexCommitVersion); + } + } + + /** * Internal use for Lucene... do not explicitly call. */ @@ -121,55 +142,74 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy { private class IndexCommitWrapper extends IndexCommit { IndexCommit delegate; + IndexCommitWrapper(IndexCommit delegate) { this.delegate = delegate; } + @Override public String getSegmentsFileName() { return delegate.getSegmentsFileName(); } + @Override public Collection getFileNames() throws IOException { return delegate.getFileNames(); } + @Override public Directory getDirectory() { return delegate.getDirectory(); } + @Override public void delete() { - Long reserve = reserves.get(delegate.getVersion()); + Long version = delegate.getVersion(); + Long reserve = reserves.get(version); if (reserve != null && System.currentTimeMillis() < reserve) return; + if(savedCommits.contains(version)) return; delegate.delete(); } + @Override public boolean isOptimized() { return delegate.isOptimized(); } + @Override public boolean equals(Object o) { return delegate.equals(o); } + @Override public int hashCode() { return delegate.hashCode(); } + @Override public long getVersion() { return delegate.getVersion(); } + @Override public long getGeneration() { return delegate.getGeneration(); } + @Override public boolean isDeleted() { return delegate.isDeleted(); } + @Override public long getTimestamp() throws IOException { return delegate.getTimestamp(); } + + @Override + public Map getUserData() throws IOException { + return delegate.getUserData(); + } } /** diff --git a/src/java/org/apache/solr/handler/ReplicationHandler.java b/src/java/org/apache/solr/handler/ReplicationHandler.java index 564ac2a70e0..9dc4ff3a21d 100644 --- a/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -18,6 +18,7 @@ package org.apache.solr.handler; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.IndexReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -815,15 +816,32 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw replicateOnStart = true; RefCounted s = core.getNewestSearcher(false); try { + IndexReader reader = s==null ? null : s.get().getReader(); + if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) { + try { + if(!replicateOnCommit && replicateOnOptimize){ + Collection commits = IndexReader.listCommits(reader.directory()); + for (IndexCommit ic : commits) { + if(ic.isOptimized()){ + if(indexCommitPoint == null || indexCommitPoint.getVersion() < ic.getVersion()) indexCommitPoint = ic; + } + } + } else{ + indexCommitPoint = reader.getIndexCommit(); + } + } finally { + if(indexCommitPoint != null){ + core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion()); + } + } + } if (core.getUpdateHandler() instanceof DirectUpdateHandler2) { ((DirectUpdateHandler2) core.getUpdateHandler()).forceOpenWriter(); } else { LOG.warn("The update handler being used is not an instance or sub-class of DirectUpdateHandler2. " + "Replicate on Startup cannot work."); - } - if (s.get().getReader().getIndexCommit() != null) - if (s.get().getReader().getIndexCommit().getGeneration() != 1L) - indexCommitPoint = s.get().getReader().getIndexCommit(); + } + } catch (IOException e) { LOG.warn("Unable to get IndexCommit on startup", e); } finally { @@ -892,13 +910,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw * This refreshes the latest replicateable index commit and optionally can create Snapshots as well */ public void postCommit() { - if (getCommit || snapshoot) { + if (getCommit) { + IndexCommit oldCommitPoint = indexCommitPoint; indexCommitPoint = core.getDeletionPolicy().getLatestCommit(); + core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion()); + if(oldCommitPoint != null){ + core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion()); + } } if (snapshoot) { try { SnapShooter snapShooter = new SnapShooter(core, null); - snapShooter.createSnapAsync(indexCommitPoint.getFileNames(), ReplicationHandler.this); + snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(), ReplicationHandler.this); } catch (Exception e) { LOG.error("Exception while snapshooting", e); }