mirror of https://github.com/apache/lucene.git
SOLR-13872: Fixed Backup failures due to race conditions in saving/reserving commit points
(cherry picked from commit 30e55e2b6e
)
This commit is contained in:
parent
eeea9fe2c7
commit
8c12979fdd
|
@ -86,6 +86,9 @@ Bug Fixes
|
|||
* SOLR-13882: Collections API COLSTATUS does not check live_nodes when reporting replica's status.
|
||||
(Erick Erickson, Andrzej Białecki)
|
||||
|
||||
* SOLR-13872: Fixed Backup failures - typically manifesting as NoSuchFileException - due to race conditions
|
||||
in saving/reserving commit points (hossman)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -261,6 +262,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
SolrIndexSearcher searcher = searchHolder.get();
|
||||
Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
|
||||
try {
|
||||
final IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
|
||||
log.debug(core.getCoreContainer()
|
||||
.getZkController().getNodeName()
|
||||
+ " replicated "
|
||||
|
@ -268,8 +270,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
+ " from "
|
||||
+ leaderUrl
|
||||
+ " gen:"
|
||||
+ (core.getDeletionPolicy().getLatestCommit() != null ? "null"
|
||||
: core.getDeletionPolicy().getLatestCommit().getGeneration())
|
||||
+ (null == commit ? "null" : commit.getGeneration())
|
||||
+ " data:" + core.getDataDir()
|
||||
+ " index:" + core.getIndexDir()
|
||||
+ " newIndex:" + core.getNewIndexDir()
|
||||
|
|
|
@ -20,8 +20,11 @@ import java.io.IOException;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -53,67 +56,165 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
|||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final IndexDeletionPolicy deletionPolicy;
|
||||
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<>();
|
||||
private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
|
||||
private volatile IndexCommit latestCommit;
|
||||
private final ConcurrentHashMap<Long, AtomicInteger> savedCommits = new ConcurrentHashMap<>();
|
||||
private final SolrSnapshotMetaDataManager snapshotMgr;
|
||||
|
||||
/**
|
||||
* The set of all known commits <em>after</em> the last completed call to {@link #onInit} or
|
||||
* {@link #onCommit} on our {@link #getWrappedDeletionPolicy()}.
|
||||
*
|
||||
* <p>
|
||||
* This map is atomically replaced by {@linke #updateKnownCommitPoints}.
|
||||
* The keys are the {@link IndexCommit#getGeneration} of each commit
|
||||
* </p>
|
||||
*
|
||||
* @see #getAndSaveCommitPoint
|
||||
* @see #getCommits
|
||||
* @see #updateKnownCommitPoints
|
||||
*/
|
||||
private volatile Map<Long, IndexCommit> knownCommits = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* The most recent commit included in call to {@link #onInit} or
|
||||
* {@link #onCommit} <em>beore</em> delegating to our {@link #getWrappedDeletionPolicy()}.
|
||||
*
|
||||
* <p>
|
||||
* <b>NOTE:</b> This may be null if there is not yet a single commit to our index.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* This commit is implicitly protected from deletion in {@link IndexCommitWrapper#delete}
|
||||
* </p>
|
||||
*
|
||||
* @see #getLatestCommit
|
||||
* @see #updateLatestCommit
|
||||
*/
|
||||
private volatile IndexCommit latestCommit;
|
||||
|
||||
/**
|
||||
* The set of all commit generations htat have been reserved for for some amount of time.
|
||||
* <p>
|
||||
* The keys of the {@link IndexCommit#getGeneration} of a commit, the values are the
|
||||
* {@link System#nanoTime} that the commit should be reserved until.
|
||||
* </p>
|
||||
*
|
||||
* @see #setReserveDuration
|
||||
* @see #cleanReserves
|
||||
*/
|
||||
private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* The set of all commit generations that have been saved until explicitly released
|
||||
* <p>
|
||||
* The keys of the {@link IndexCommit#getGeneration} of a commit, the values are
|
||||
* a reference count of the number of callers who have "saved" this commit.
|
||||
* {@link #releaseCommitPoint} automatically removes mappings once the ref count reaches 0.
|
||||
* </p>
|
||||
*
|
||||
* @see #getAndSaveLatestCommit
|
||||
* @see #saveCommitPoint
|
||||
* @see #releaseCommitPoint
|
||||
*/
|
||||
private final Map<Long, AtomicInteger> savedCommits = new ConcurrentHashMap<>();
|
||||
|
||||
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy, SolrSnapshotMetaDataManager snapshotMgr) {
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
this.snapshotMgr = snapshotMgr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the most recent commit point
|
||||
* Returns the most recent commit point.
|
||||
* <p>
|
||||
* It is recommended to reserve a commit point for the duration of usage so that
|
||||
* it is not deleted by the underlying deletion policy
|
||||
* <b>NOTE:</b> This method makes no garuntee that the commit returned still exists as the
|
||||
* moment this method completes. Callers are encouraged to use {@link #getAndSaveLatestCommit} instead.
|
||||
* </p>
|
||||
*
|
||||
* @return the most recent commit point
|
||||
* @return the most recent commit point, or null if there have not been any commits
|
||||
* @see #getAndSaveLatestCommit
|
||||
*/
|
||||
public IndexCommit getLatestCommit() {
|
||||
return latestCommit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically Saves (via reference counting) & Returns the most recent commit point.
|
||||
* <p>
|
||||
* If the return value is non-null, then the caller <em>MUST</em> call {@link #releaseCommitPoint}
|
||||
* when finished using it in order to decrement the reference count, or the commit will be preserved
|
||||
* in the Directory forever.
|
||||
* </p>
|
||||
*
|
||||
* @return the most recent commit point, or null if there have not been any commits
|
||||
* @see #saveCommitPoint
|
||||
* @see #releaseCommitPoint
|
||||
*/
|
||||
public synchronized IndexCommit getAndSaveLatestCommit() {
|
||||
final IndexCommit commit = getLatestCommit();
|
||||
if (null != commit) {
|
||||
saveCommitPoint(commit.getGeneration());
|
||||
}
|
||||
return commit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically Saves (via reference counting) & Returns the specified commit if available.
|
||||
* <p>
|
||||
* If the return value is non-null, then the caller <em>MUST</em> call {@link #releaseCommitPoint}
|
||||
* when finished using it in order to decrement the reference count, or the commit will be preserved
|
||||
* in the Directory forever.
|
||||
* </p>
|
||||
*
|
||||
* @return the commit point with the specified generation, or null if not available
|
||||
* @see #saveCommitPoint
|
||||
* @see #releaseCommitPoint
|
||||
*/
|
||||
public synchronized IndexCommit getAndSaveCommitPoint(Long generation) {
|
||||
if (null == generation) {
|
||||
throw new NullPointerException("generation to get and save must not be null");
|
||||
}
|
||||
final IndexCommit commit = knownCommits.get(generation);
|
||||
if ( (null != commit && false != commit.isDeleted())
|
||||
|| (null == commit && null != latestCommit && generation < latestCommit.getGeneration()) ) {
|
||||
throw new IllegalStateException
|
||||
("Specified index generation is too old to be saved: " + generation);
|
||||
}
|
||||
final AtomicInteger refCount
|
||||
= savedCommits.computeIfAbsent(generation, s -> { return new AtomicInteger(); });
|
||||
final int currentCount = refCount.incrementAndGet();
|
||||
log.debug("Saving generation={}, refCount={}", generation, currentCount);
|
||||
return commit;
|
||||
}
|
||||
|
||||
public IndexDeletionPolicy getWrappedDeletionPolicy() {
|
||||
return deletionPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the duration for which commit point is to be reserved by the deletion policy.
|
||||
* <p>
|
||||
* <b>NOTE:</b> This method does not make any garuntees that the specified index generation exists,
|
||||
* or that the specified generation has not already ben deleted. The only garuntee is that
|
||||
* <em>if</em> the specified generation exists now, or is created at some point in the future, then
|
||||
* it will be resered for <em>at least</em> the specified <code>reserveTime</code>.
|
||||
* </p>
|
||||
*
|
||||
* @param indexGen gen of the commit point to be reserved
|
||||
* @param reserveTime time in milliseconds for which the commit point is to be reserved
|
||||
* @param reserveTime durration in milliseconds (relative to 'now') for which the commit point is to be reserved
|
||||
*/
|
||||
public void setReserveDuration(Long indexGen, long reserveTime) {
|
||||
long timeToSet = System.nanoTime() + TimeUnit.NANOSECONDS.convert(reserveTime, TimeUnit.MILLISECONDS);
|
||||
for(;;) {
|
||||
Long previousTime = reserves.put(indexGen, timeToSet);
|
||||
|
||||
// this is the common success case: the older time didn't exist, or
|
||||
// came before the new time.
|
||||
if (previousTime == null || previousTime <= timeToSet) {
|
||||
log.debug("Commit point reservation for generation {} set to {} (requested reserve time of {})",
|
||||
indexGen, timeToSet, reserveTime);
|
||||
break;
|
||||
}
|
||||
|
||||
// At this point, we overwrote a longer reservation, so we want to restore the older one.
|
||||
// the problem is that an even longer reservation may come in concurrently
|
||||
// and we don't want to overwrite that one too. We simply keep retrying in a loop
|
||||
// with the maximum time value we have seen.
|
||||
timeToSet = previousTime;
|
||||
}
|
||||
// since 'reserves' is a concurrent HashMap, we don't need to synchronize this method as long as all
|
||||
// operations on 'reserves' are done atomically.
|
||||
//
|
||||
// Here we'll use Map.merge to ensure that we atomically replace any existing timestamp if
|
||||
// and only if our new reservation timetsamp is larger.
|
||||
final long reserveAsNanoTime
|
||||
= System.nanoTime() + TimeUnit.NANOSECONDS.convert(reserveTime, TimeUnit.MILLISECONDS);
|
||||
reserves.merge(indexGen, reserveAsNanoTime, BinaryOperator.maxBy(Comparator.naturalOrder()));
|
||||
}
|
||||
|
||||
private void cleanReserves() {
|
||||
long currentTime = System.nanoTime();
|
||||
for (Map.Entry<Long, Long> entry : reserves.entrySet()) {
|
||||
if (entry.getValue() < currentTime) {
|
||||
reserves.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
final long currentNanoTime = System.nanoTime();
|
||||
// use removeIf to ensure we're removing "old" entries atomically
|
||||
reserves.entrySet().removeIf(e -> e.getValue() < currentNanoTime);
|
||||
}
|
||||
|
||||
private List<IndexCommitWrapper> wrap(List<? extends IndexCommit> list) {
|
||||
|
@ -122,50 +223,95 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
|||
return result;
|
||||
}
|
||||
|
||||
/** Permanently prevent this commit point from being deleted.
|
||||
* A counter is used to allow a commit point to be correctly saved and released
|
||||
* multiple times. */
|
||||
public synchronized void saveCommitPoint(Long indexCommitGen) {
|
||||
AtomicInteger reserveCount = savedCommits.get(indexCommitGen);
|
||||
if (reserveCount == null) reserveCount = new AtomicInteger();
|
||||
reserveCount.incrementAndGet();
|
||||
savedCommits.put(indexCommitGen, reserveCount);
|
||||
/**
|
||||
* Permanently prevent this commit point from being deleted (if it has not already) using a refrence count.
|
||||
* <p>
|
||||
* <b>NOTE:</b> Callers <em>MUST</em> call {@link #releaseCommitPoint} when finished using it
|
||||
* in order to decrement the reference count, or the commit will be preserved in the Directory forever.
|
||||
* </p>
|
||||
*
|
||||
* @param generation the generation of the IndexComit to save until released
|
||||
* @see #getAndSaveLatestCommit
|
||||
* @see #getAndSaveCommitPoint
|
||||
* @see #releaseCommitPoint
|
||||
* @throws IllegalStateException if generation is already too old to be saved
|
||||
*/
|
||||
public synchronized void saveCommitPoint(Long generation) {
|
||||
getAndSaveCommitPoint(generation); // will handle the logic for us, just ignore the results
|
||||
}
|
||||
|
||||
/** Release a previously saved commit point */
|
||||
public synchronized void releaseCommitPoint(Long indexCommitGen) {
|
||||
AtomicInteger reserveCount = savedCommits.get(indexCommitGen);
|
||||
if (reserveCount == null) return;// this should not happen
|
||||
if (reserveCount.decrementAndGet() <= 0) {
|
||||
savedCommits.remove(indexCommitGen);
|
||||
/**
|
||||
* Release a previously saved commit point.
|
||||
* <p>
|
||||
* This is a convinience wrapper around {@link #releaseCommitPoint(Long)} that will ignore null input.
|
||||
* </p>
|
||||
*/
|
||||
public synchronized void releaseCommitPoint(IndexCommit commit) {
|
||||
if (null != commit) {
|
||||
releaseCommitPoint(commit.getGeneration());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a previously saved commit point.
|
||||
* <p>
|
||||
* This method does not enforce that that the specified generation has previously been saved,
|
||||
* or even that it's 'non-null'. But if both are true then it will decrement the reference
|
||||
* count for the specified generation.
|
||||
* </p>
|
||||
*/
|
||||
public synchronized void releaseCommitPoint(Long generation) {
|
||||
if (null == generation) {
|
||||
return;
|
||||
}
|
||||
final AtomicInteger refCount = savedCommits.get(generation);
|
||||
if (null != refCount) { // shouldn't happen if balanced save/release calls in callers
|
||||
final int currentCount = refCount.decrementAndGet();
|
||||
log.debug("Released generation={}, refCount={}", generation, currentCount);
|
||||
if (currentCount <= 0) {
|
||||
savedCommits.remove(generation); // counter no longer needed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal use for Lucene... do not explicitly call.
|
||||
* <p>
|
||||
* This Impl passes the list of commits to the delegate Policy <em>AFTER</em> wrapping each
|
||||
* commit in a proxy class that only proxies {@link IndexCommit#delete} if they are not already saved.
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public void onInit(List<? extends IndexCommit> list) throws IOException {
|
||||
List<IndexCommitWrapper> wrapperList = wrap(list);
|
||||
updateLatestCommit(wrapperList);
|
||||
deletionPolicy.onInit(wrapperList);
|
||||
updateCommitPoints(wrapperList);
|
||||
updateKnownCommitPoints(wrapperList);
|
||||
cleanReserves();
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal use for Lucene... do not explicitly call.
|
||||
* <p>
|
||||
* This Impl passes the list of commits to the delegate Policy <em>AFTER</em> wrapping each
|
||||
* commit in a proxy class that only proxies {@link IndexCommit#delete} if they are not already saved.
|
||||
* </p>
|
||||
*/
|
||||
@Override
|
||||
public void onCommit(List<? extends IndexCommit> list) throws IOException {
|
||||
List<IndexCommitWrapper> wrapperList = wrap(list);
|
||||
updateLatestCommit(wrapperList);
|
||||
deletionPolicy.onCommit(wrapperList);
|
||||
updateCommitPoints(wrapperList);
|
||||
updateKnownCommitPoints(wrapperList);
|
||||
cleanReserves();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper class that synchronizes the {@link IndexCommit#delete} calls and only passes
|
||||
* them to the wrapped commit if they should not be saved or reserved.
|
||||
*/
|
||||
private class IndexCommitWrapper extends IndexCommit {
|
||||
IndexCommit delegate;
|
||||
|
||||
final IndexCommit delegate;
|
||||
|
||||
IndexCommitWrapper(IndexCommit delegate) {
|
||||
this.delegate = delegate;
|
||||
|
@ -188,12 +334,21 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
|||
|
||||
@Override
|
||||
public void delete() {
|
||||
Long gen = delegate.getGeneration();
|
||||
Long reserve = reserves.get(gen);
|
||||
if (reserve != null && System.nanoTime() < reserve) return;
|
||||
if (savedCommits.containsKey(gen)) return;
|
||||
if (snapshotMgr.isSnapshotted(gen)) return;
|
||||
delegate.delete();
|
||||
// Box it now to prevent multiple autoboxing when doing multiple map lookups
|
||||
final Long gen = delegate.getGeneration();
|
||||
|
||||
// synchronize on the policy wrapper so that we don't delegate the delete call
|
||||
// concurrently with another thread trying to save this commit
|
||||
synchronized (IndexDeletionPolicyWrapper.this) {
|
||||
if ( (System.nanoTime() < reserves.getOrDefault(gen, 0L)) ||
|
||||
savedCommits.containsKey(gen) ||
|
||||
snapshotMgr.isSnapshotted(gen) ||
|
||||
(null != latestCommit && gen.longValue() == latestCommit.getGeneration()) ) {
|
||||
return; // skip deletion
|
||||
}
|
||||
log.debug("Deleting generation={}", gen);
|
||||
delegate.delete(); // delegate deletion
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -218,7 +373,9 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
|||
|
||||
@Override
|
||||
public boolean isDeleted() {
|
||||
return delegate.isDeleted();
|
||||
synchronized (IndexDeletionPolicyWrapper.this) {
|
||||
return delegate.isDeleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -228,36 +385,121 @@ public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param gen the gen of the commit point
|
||||
* @return a commit point corresponding to the given version
|
||||
* Returns the commit with the specified generation <em>if it is known</em>.
|
||||
* <p>
|
||||
* <b>NOTE:</b> This method makes no garuntee that the commit returned still exists as the
|
||||
* moment this method completes. Callers are encouraged to use {@link #getAndSaveLatestCommit} instead.
|
||||
* </p>
|
||||
*
|
||||
* @param gen the generation of the commit point requested
|
||||
* @return a commit point corresponding to the given version if available, or null if not yet created or already deleted
|
||||
* @deprecated use {@link #getAndSaveCommitPoint} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public IndexCommit getCommitPoint(Long gen) {
|
||||
return solrVersionVsCommits.get(gen);
|
||||
return knownCommits.get(gen);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the commit points for the index.
|
||||
* This map instance may change between commits and commit points may be deleted.
|
||||
* It is recommended to reserve a commit point for the duration of usage
|
||||
* Returns a Map of all currently known commits, keyed by their generation.
|
||||
* <p>
|
||||
* <b>NOTE:</b> This map instance may change between commits and commit points may be deleted.
|
||||
* This API is intended for "informational purposes" only, to provide an "at the moment" view of
|
||||
* the current list of known commits. Callers that need to ensure commits exist for an extended period
|
||||
* must wrap this call and all subsequent usage of the results in a synchornization block.
|
||||
* </p>
|
||||
*
|
||||
* @return a Map of version to commit points
|
||||
* @return a Map of generation to commit points
|
||||
*/
|
||||
public Map<Long, IndexCommit> getCommits() {
|
||||
return solrVersionVsCommits;
|
||||
return Collections.unmodifiableMap(knownCommits);
|
||||
}
|
||||
|
||||
private void updateCommitPoints(List<IndexCommitWrapper> list) {
|
||||
Map<Long, IndexCommit> map = new ConcurrentHashMap<>();
|
||||
/**
|
||||
* Updates {@link #latestCommit}.
|
||||
* <p>
|
||||
* This is handled special, and not included in {@link #updateKnownCommitPoints}, because we need to
|
||||
* ensure this happens <em>before</em> delegating calls to {@link #onInit} or {@link #onCommit} to our
|
||||
* inner Policy. Doing this ensures that we can always protect {@link #latestCommit} from being deleted.
|
||||
* </p>
|
||||
* <p>
|
||||
* If we did not do this, and waited to update <code>latestCommit</code> in
|
||||
* <code>updateKnownCommitPoints()</code> then we would need to wrap synchronization completley around
|
||||
* the (delegated) <code>onInit()</code> and <code>onCommit()</code> calls, to ensure there was no
|
||||
* window of time when {@link #getAndSaveLatestCommit} might return the "old" latest commit, after our
|
||||
* delegate Policy had already deleted it.
|
||||
* </p>
|
||||
* <p>
|
||||
* (Since Saving/Reserving (other) commits is handled indirectly ("by reference") via the generation
|
||||
* callers can still safely (try) to reserve "old" commits using an explicit generation since
|
||||
* {@link IndexCommitWrapper#delete} is synchornized on <code>this</code>)
|
||||
*
|
||||
* @see #latestCommit
|
||||
* @see #updateKnownCommitPoints
|
||||
*/
|
||||
private synchronized void updateLatestCommit(final List<IndexCommitWrapper> list) {
|
||||
// NOTE: There's a hypothetical, not neccessarily possible/plausible, situation that
|
||||
// could lead to this combination of updateLatestCommit + updateKnownCommitPoints not
|
||||
// being as thread safe as completley synchornizing in onInit/onCommit...
|
||||
// - knownCommits==(1, 2, 3, 4), latestCommit==4
|
||||
// - onCommit(1, 2, 3, 4, 5, 6, 7) - we immediately update latestCommit=7
|
||||
// - before knownCommits is updated, some client calls getAndSaveCommitPoint(6)
|
||||
// - call fails "too old to be saved" even though it's in flight
|
||||
// (this assumes some future caller/use-case that doesn't currently exist)
|
||||
//
|
||||
// The upside of this current approach, and not completley synchornizing onInit/onCommit
|
||||
// is that we have no control over what delegate is used, or how long those calls might take.
|
||||
//
|
||||
// If the hypotehtical situation above ever becomes problematic, then an alternative approach might be
|
||||
// to *add* to the Set/Map of all known commits *before* delegating, then *remove* everything except
|
||||
// the new (non-deleted) commits *after* delegating.
|
||||
|
||||
assert null != list;
|
||||
if (list.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
final IndexCommitWrapper newLast = list.get(list.size() - 1);
|
||||
assert ! newLast.isDeleted()
|
||||
: "Code flaw: Last commit already deleted, call this method before delegating onCommit/onInit";
|
||||
|
||||
latestCommit = newLast.delegate;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Updates the state of all "current" commits.
|
||||
* <p>
|
||||
* This method is safe to call <em>after</em> delegating to ou inner <code>IndexDeletionPolicy</code>
|
||||
* (w/o synchornizing the delegate calls) because even if the delegate decides to
|
||||
* {@link IndexCommit#delete} a commit that a concurrent thread may wish to reserve/save,
|
||||
* that {@link IndexCommitWrapper} will ensure that call is synchronized.
|
||||
* </p>
|
||||
*
|
||||
* @see #updateLatestCommit
|
||||
*/
|
||||
private synchronized void updateKnownCommitPoints(final List<IndexCommitWrapper> list) {
|
||||
assert null != list;
|
||||
assert (list.isEmpty() || null != latestCommit) : "Code flaw: How is latestCommit not set yet?";
|
||||
assert (null == latestCommit || ! latestCommit.isDeleted())
|
||||
: "Code flaw: How did the latestCommit get set but deleted?";
|
||||
assert (list.isEmpty() || latestCommit == list.get(list.size() - 1).delegate)
|
||||
: "Code flaw, updateLatestCommit() should have already been called";
|
||||
|
||||
final Map<Long, IndexCommit> map = new ConcurrentHashMap<>();
|
||||
for (IndexCommitWrapper wrapper : list) {
|
||||
if (!wrapper.isDeleted())
|
||||
if (!wrapper.isDeleted()) {
|
||||
map.put(wrapper.delegate.getGeneration(), wrapper.delegate);
|
||||
}
|
||||
}
|
||||
solrVersionVsCommits = map;
|
||||
if (!list.isEmpty()) {
|
||||
latestCommit = ((list.get(list.size() - 1)).delegate);
|
||||
}
|
||||
knownCommits = map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method for unpacking the timestamp infor from the user data
|
||||
* @see SolrIndexWriter#COMMIT_TIME_MSEC_KEY
|
||||
* @see IndexCommit#getUserData
|
||||
*/
|
||||
public static long getCommitTimestamp(IndexCommit commit) throws IOException {
|
||||
final Map<String,String> commitData = commit.getUserData();
|
||||
String commitTime = commitData.get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
|
||||
|
|
|
@ -616,104 +616,121 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void getFileList(SolrParams solrParams, SolrQueryResponse rsp) {
|
||||
String v = solrParams.required().get(GENERATION);
|
||||
long gen = Long.parseLong(v);
|
||||
if (gen == -1) {
|
||||
IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
|
||||
if(commitPoint == null) {
|
||||
rsp.add(CMD_GET_FILE_LIST, Collections.EMPTY_LIST);
|
||||
return;
|
||||
}
|
||||
gen = commitPoint.getGeneration();
|
||||
}
|
||||
IndexCommit commit = core.getDeletionPolicy().getCommitPoint(gen);
|
||||
|
||||
if (commit == null) {
|
||||
reportErrorOnResponse(rsp, "invalid index generation", null);
|
||||
return;
|
||||
}
|
||||
|
||||
// reserve the indexcommit for sometime
|
||||
core.getDeletionPolicy().setReserveDuration(gen, reserveCommitDuration);
|
||||
List<Map<String, Object>> result = new ArrayList<>();
|
||||
Directory dir = null;
|
||||
final IndexDeletionPolicyWrapper delPol = core.getDeletionPolicy();
|
||||
final long gen = Long.parseLong(solrParams.required().get(GENERATION));
|
||||
|
||||
IndexCommit commit = null;
|
||||
try {
|
||||
dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
|
||||
SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
|
||||
for (SegmentCommitInfo commitInfo : infos) {
|
||||
for (String file : commitInfo.files()) {
|
||||
Map<String, Object> fileMeta = new HashMap<>();
|
||||
fileMeta.put(NAME, file);
|
||||
fileMeta.put(SIZE, dir.fileLength(file));
|
||||
|
||||
try (final IndexInput in = dir.openInput(file, IOContext.READONCE)) {
|
||||
if (gen == -1) {
|
||||
commit = delPol.getAndSaveLatestCommit();
|
||||
if (null == commit) {
|
||||
rsp.add(CMD_GET_FILE_LIST, Collections.EMPTY_LIST);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
commit = delPol.getAndSaveCommitPoint(gen);
|
||||
} catch (IllegalStateException ignored) {
|
||||
/* handle this below the same way we handle a return value of null... */
|
||||
}
|
||||
if (null == commit) {
|
||||
// The gen they asked for either doesn't exist or has already been deleted
|
||||
reportErrorOnResponse(rsp, "invalid index generation", null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
assert null != commit;
|
||||
|
||||
List<Map<String, Object>> result = new ArrayList<>();
|
||||
Directory dir = null;
|
||||
try {
|
||||
dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
|
||||
SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
|
||||
for (SegmentCommitInfo commitInfo : infos) {
|
||||
for (String file : commitInfo.files()) {
|
||||
Map<String, Object> fileMeta = new HashMap<>();
|
||||
fileMeta.put(NAME, file);
|
||||
fileMeta.put(SIZE, dir.fileLength(file));
|
||||
|
||||
try (final IndexInput in = dir.openInput(file, IOContext.READONCE)) {
|
||||
try {
|
||||
long checksum = CodecUtil.retrieveChecksum(in);
|
||||
fileMeta.put(CHECKSUM, checksum);
|
||||
} catch (Exception e) {
|
||||
//TODO Should this trigger a larger error?
|
||||
log.warn("Could not read checksum from index file: " + file, e);
|
||||
}
|
||||
}
|
||||
|
||||
result.add(fileMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// add the segments_N file
|
||||
|
||||
Map<String, Object> fileMeta = new HashMap<>();
|
||||
fileMeta.put(NAME, infos.getSegmentsFileName());
|
||||
fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
|
||||
if (infos.getId() != null) {
|
||||
try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
|
||||
try {
|
||||
long checksum = CodecUtil.retrieveChecksum(in);
|
||||
fileMeta.put(CHECKSUM, checksum);
|
||||
fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
|
||||
} catch (Exception e) {
|
||||
//TODO Should this trigger a larger error?
|
||||
log.warn("Could not read checksum from index file: " + file, e);
|
||||
log.warn("Could not read checksum from index file: " + infos.getSegmentsFileName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
result.add(fileMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// add the segments_N file
|
||||
|
||||
Map<String, Object> fileMeta = new HashMap<>();
|
||||
fileMeta.put(NAME, infos.getSegmentsFileName());
|
||||
fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
|
||||
if (infos.getId() != null) {
|
||||
try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
|
||||
result.add(fileMeta);
|
||||
} catch (IOException e) {
|
||||
log.error("Unable to get file names for indexCommit generation: " + commit.getGeneration(), e);
|
||||
reportErrorOnResponse(rsp, "unable to get file names for given index generation", e);
|
||||
return;
|
||||
} finally {
|
||||
if (dir != null) {
|
||||
try {
|
||||
fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
|
||||
} catch (Exception e) {
|
||||
//TODO Should this trigger a larger error?
|
||||
log.warn("Could not read checksum from index file: " + infos.getSegmentsFileName(), e);
|
||||
core.getDirectoryFactory().release(dir);
|
||||
} catch (IOException e) {
|
||||
SolrException.log(log, "Could not release directory after fetching file list", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
result.add(fileMeta);
|
||||
} catch (IOException e) {
|
||||
log.error("Unable to get file names for indexCommit generation: " + gen, e);
|
||||
reportErrorOnResponse(rsp, "unable to get file names for given index generation", e);
|
||||
return;
|
||||
} finally {
|
||||
if (dir != null) {
|
||||
rsp.add(CMD_GET_FILE_LIST, result);
|
||||
|
||||
if (solrParams.getBool(TLOG_FILES, false)) {
|
||||
try {
|
||||
core.getDirectoryFactory().release(dir);
|
||||
} catch (IOException e) {
|
||||
SolrException.log(log, "Could not release directory after fetching file list", e);
|
||||
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
|
||||
log.info("Adding tlog files to list: " + tlogfiles);
|
||||
rsp.add(TLOG_FILES, tlogfiles);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Unable to get tlog file names for indexCommit generation: " + commit.getGeneration(), e);
|
||||
reportErrorOnResponse(rsp, "unable to get tlog file names for given index generation", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
rsp.add(CMD_GET_FILE_LIST, result);
|
||||
|
||||
if (solrParams.getBool(TLOG_FILES, false)) {
|
||||
try {
|
||||
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
|
||||
log.info("Adding tlog files to list: " + tlogfiles);
|
||||
rsp.add(TLOG_FILES, tlogfiles);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Unable to get tlog file names for indexCommit generation: " + gen, e);
|
||||
reportErrorOnResponse(rsp, "unable to get tlog file names for given index generation", e);
|
||||
|
||||
if (confFileNameAlias.size() < 1 || core.getCoreContainer().isZooKeeperAware())
|
||||
return;
|
||||
log.debug("Adding config files to list: " + includeConfFiles);
|
||||
//if configuration files need to be included get their details
|
||||
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
|
||||
rsp.add(STATUS, OK_STATUS);
|
||||
|
||||
} finally {
|
||||
if (null != commit) {
|
||||
// before releasing the save on our commit point, set a short reserve duration since
|
||||
// the main reason remote nodes will ask for the file list is because they are preparing to
|
||||
// replicate from us...
|
||||
delPol.setReserveDuration(commit.getGeneration(), reserveCommitDuration);
|
||||
delPol.releaseCommitPoint(commit);
|
||||
}
|
||||
}
|
||||
|
||||
if (confFileNameAlias.size() < 1 || core.getCoreContainer().isZooKeeperAware())
|
||||
return;
|
||||
log.debug("Adding config files to list: " + includeConfFiles);
|
||||
//if configuration files need to be included get their details
|
||||
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
|
||||
rsp.add(STATUS, OK_STATUS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the list of tlog files associated to a commit point.
|
||||
* NOTE: The commit <b>MUST</b> be reserved before calling this method
|
||||
*/
|
||||
List<Map<String, Object>> getTlogFileList(IndexCommit commit) throws IOException {
|
||||
long maxVersion = this.getMaxVersion(commit);
|
||||
|
@ -736,6 +753,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
/**
|
||||
* Retrieves the maximum version number from an index commit.
|
||||
* NOTE: The commit <b>MUST</b> be reserved before calling this method
|
||||
*/
|
||||
private long getMaxVersion(IndexCommit commit) throws IOException {
|
||||
try (DirectoryReader reader = DirectoryReader.open(commit)) {
|
||||
|
|
|
@ -149,67 +149,71 @@ public class SnapShooter {
|
|||
}
|
||||
|
||||
public NamedList createSnapshot() throws Exception {
|
||||
IndexCommit indexCommit;
|
||||
if (commitName != null) {
|
||||
indexCommit = getIndexCommitFromName();
|
||||
final IndexCommit indexCommit = getAndSaveIndexCommit();
|
||||
try {
|
||||
return createSnapshot(indexCommit);
|
||||
} else {
|
||||
indexCommit = getIndexCommit();
|
||||
IndexDeletionPolicyWrapper deletionPolicy = solrCore.getDeletionPolicy();
|
||||
deletionPolicy.saveCommitPoint(indexCommit.getGeneration());
|
||||
try {
|
||||
return createSnapshot(indexCommit);
|
||||
} finally {
|
||||
deletionPolicy.releaseCommitPoint(indexCommit.getGeneration());
|
||||
}
|
||||
} finally {
|
||||
solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
|
||||
}
|
||||
}
|
||||
|
||||
private IndexCommit getIndexCommit() throws IOException {
|
||||
IndexDeletionPolicyWrapper delPolicy = solrCore.getDeletionPolicy();
|
||||
IndexCommit indexCommit = delPolicy.getLatestCommit();
|
||||
if (indexCommit != null) {
|
||||
return indexCommit;
|
||||
/**
|
||||
* If {@link #commitName} is non-null, then fetches the generation from the
|
||||
* {@link SolrSnapshotMetaDataManager} and then returns
|
||||
* {@link IndexDeletionPolicyWrapper#getAndSaveCommitPoint}, otherwise it returns
|
||||
* {@link IndexDeletionPolicyWrapper#getAndSaveLatestCommit}.
|
||||
* <p>
|
||||
* Either way:
|
||||
* <ul>
|
||||
* <li>This method does error handling for all cases where the commit can't be found
|
||||
* and wraps them in {@link SolrException}
|
||||
* </li>
|
||||
* <li>If this method returns, the result will be non null, and the caller <em>MUST</em>
|
||||
* call {@link IndexDeletionPolicyWrapper#releaseCommitPoint} when finished
|
||||
* </li>
|
||||
* </ul>
|
||||
*/
|
||||
private IndexCommit getAndSaveIndexCommit() throws IOException {
|
||||
final IndexDeletionPolicyWrapper delPolicy = solrCore.getDeletionPolicy();
|
||||
if (null != commitName) {
|
||||
final SolrSnapshotMetaDataManager snapshotMgr = solrCore.getSnapshotMetaDataManager();
|
||||
// We're going to tell the delPolicy to "save" this commit -- even though it's a named snapshot
|
||||
// that will already be protected -- just in case another thread deletes the name.
|
||||
// Because of this, we want to sync on the delPolicy to ensure there is no window of time after
|
||||
// snapshotMgr confirms commitName exists, but before we have a chance to 'save' it, when
|
||||
// the commitName might be deleted *and* the IndexWriter might call onCommit()
|
||||
synchronized (delPolicy) {
|
||||
final Optional<IndexCommit> namedCommit = snapshotMgr.getIndexCommitByName(commitName);
|
||||
if (namedCommit.isPresent()) {
|
||||
final IndexCommit commit = namedCommit.get();
|
||||
log.debug("Using named commit: name={}, generation={}", commitName, commit.getGeneration());
|
||||
delPolicy.saveCommitPoint(commit.getGeneration());
|
||||
return commit;
|
||||
}
|
||||
} // else...
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an index commit with name " +
|
||||
commitName + " for core " + solrCore.getName());
|
||||
}
|
||||
return solrCore.withSearcher(searcher -> searcher.getIndexReader().getIndexCommit());
|
||||
}
|
||||
|
||||
private IndexCommit getIndexCommitFromName() throws IOException {
|
||||
assert commitName !=null;
|
||||
IndexCommit indexCommit;
|
||||
SolrSnapshotMetaDataManager snapshotMgr = solrCore.getSnapshotMetaDataManager();
|
||||
Optional<IndexCommit> commit = snapshotMgr.getIndexCommitByName(commitName);
|
||||
if (commit.isPresent()) {
|
||||
indexCommit = commit.get();
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an index commit with name " + commitName +
|
||||
" for core " + solrCore.getName());
|
||||
// else: not a named commit...
|
||||
final IndexCommit commit = delPolicy.getAndSaveLatestCommit();
|
||||
if (null == commit) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Index does not yet have any commits for core " +
|
||||
solrCore.getName());
|
||||
}
|
||||
return indexCommit;
|
||||
log.debug("Using latest commit: generation={}", commit.getGeneration());
|
||||
return commit;
|
||||
}
|
||||
|
||||
public void createSnapAsync(final int numberToKeep, Consumer<NamedList> result) throws IOException {
|
||||
IndexCommit indexCommit;
|
||||
if (commitName != null) {
|
||||
indexCommit = getIndexCommitFromName();
|
||||
} else {
|
||||
indexCommit = getIndexCommit();
|
||||
}
|
||||
createSnapAsync(indexCommit, numberToKeep, result);
|
||||
}
|
||||
|
||||
private void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, Consumer<NamedList> result) {
|
||||
//TODO should use Solr's ExecutorUtil
|
||||
new Thread(() -> {
|
||||
try {
|
||||
result.accept(createSnapshot(indexCommit));
|
||||
result.accept(createSnapshot());
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while creating snapshot", e);
|
||||
NamedList snapShootDetails = new NamedList<>();
|
||||
snapShootDetails.add("exception", e.getMessage());
|
||||
result.accept(snapShootDetails);
|
||||
} finally {
|
||||
solrCore.getDeletionPolicy().releaseCommitPoint(indexCommit.getGeneration());
|
||||
}
|
||||
if (snapshotName == null) {
|
||||
try {
|
||||
|
@ -222,7 +226,17 @@ public class SnapShooter {
|
|||
|
||||
}
|
||||
|
||||
// note: remember to reserve the indexCommit first so it won't get deleted concurrently
|
||||
/**
|
||||
* Handles the logic of creating a snapshot
|
||||
* <p>
|
||||
* <b>NOTE:</b> The caller <em>MUST</em> ensure that the {@link IndexCommit} is saved prior to
|
||||
* calling this method, and released after calling this method, or there is no no garuntee that the
|
||||
* method will function correctly.
|
||||
* </p>
|
||||
*
|
||||
* @see IndexDeletionPolicyWrapper#saveCommitPoint
|
||||
* @see IndexDeletionPolicyWrapper#releaseCommitPoint
|
||||
*/
|
||||
protected NamedList createSnapshot(final IndexCommit indexCommit) throws Exception {
|
||||
assert indexCommit != null;
|
||||
log.info("Creating backup snapshot " + (snapshotName == null ? "<not named>" : snapshotName) + " at " + baseSnapDirPath);
|
||||
|
@ -235,6 +249,7 @@ public class SnapShooter {
|
|||
Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
|
||||
try {
|
||||
for(String fileName : files) {
|
||||
log.debug("Copying fileName={} from dir={} to snapshot={}", fileName, dir, snapshotDirPath);
|
||||
backupRepo.copyFileFrom(dir, fileName, snapshotDirPath);
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.IndexDeletionPolicyWrapper;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotManager;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
|
||||
|
@ -40,19 +41,25 @@ class CreateSnapshotOp implements CoreAdminHandler.CoreAdminOp {
|
|||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to locate core " + cname);
|
||||
}
|
||||
|
||||
String indexDirPath = core.getIndexDir();
|
||||
IndexCommit ic = core.getDeletionPolicy().getLatestCommit();
|
||||
if (ic == null) {
|
||||
ic = core.withSearcher(searcher -> searcher.getIndexReader().getIndexCommit());
|
||||
}
|
||||
SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
|
||||
final String indexDirPath = core.getIndexDir();
|
||||
final IndexDeletionPolicyWrapper delPol = core.getDeletionPolicy();
|
||||
final IndexCommit ic = delPol.getAndSaveLatestCommit();
|
||||
try {
|
||||
if (null == ic) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"No index commits to snapshot in core " + cname);
|
||||
}
|
||||
final SolrSnapshotMetaDataManager mgr = core.getSnapshotMetaDataManager();
|
||||
mgr.snapshot(commitName, indexDirPath, ic.getGeneration());
|
||||
|
||||
it.rsp.add(CoreAdminParams.CORE, core.getName());
|
||||
it.rsp.add(CoreAdminParams.COMMIT_NAME, commitName);
|
||||
it.rsp.add(SolrSnapshotManager.INDEX_DIR_PATH, indexDirPath);
|
||||
it.rsp.add(SolrSnapshotManager.GENERATION_NUM, ic.getGeneration());
|
||||
it.rsp.add(SolrSnapshotManager.FILE_LIST, ic.getFileNames());
|
||||
it.rsp.add(CoreAdminParams.CORE, core.getName());
|
||||
it.rsp.add(CoreAdminParams.COMMIT_NAME, commitName);
|
||||
it.rsp.add(SolrSnapshotManager.INDEX_DIR_PATH, indexDirPath);
|
||||
it.rsp.add(SolrSnapshotManager.GENERATION_NUM, ic.getGeneration());
|
||||
it.rsp.add(SolrSnapshotManager.FILE_LIST, ic.getFileNames());
|
||||
} finally {
|
||||
delPol.releaseCommitPoint(ic);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -247,6 +247,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
|
|||
|
||||
if (directoryFactory.searchersReserveCommitPoints()) {
|
||||
// reserve commit point for life of searcher
|
||||
// TODO: This may not be safe w/softCommit, see SOLR-13908
|
||||
core.getDeletionPolicy().saveCommitPoint(reader.getIndexCommit().getGeneration());
|
||||
}
|
||||
|
||||
|
|
|
@ -16,23 +16,35 @@
|
|||
*/
|
||||
package org.apache.solr.handler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.handler.admin.CoreAdminHandler;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
public class TestCoreBackup extends SolrTestCaseJ4 {
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
@Before // unique core per test
|
||||
public void coreInit() throws Exception {
|
||||
initCore("solrconfig.xml", "schema.xml");
|
||||
}
|
||||
@After // unique core per test
|
||||
public void coreDestroy() throws Exception {
|
||||
deleteCore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackupWithDocsNotSearchable() throws Exception {
|
||||
//See SOLR-11616 to see when this issue can be triggered
|
||||
|
||||
|
@ -43,6 +55,8 @@ public class TestCoreBackup extends SolrTestCaseJ4 {
|
|||
|
||||
assertU(commit("openSearcher", "false"));
|
||||
assertQ(req("q", "*:*"), "//result[@numFound='1']");
|
||||
assertQ(req("q", "id:1"), "//result[@numFound='1']");
|
||||
assertQ(req("q", "id:2"), "//result[@numFound='0']");
|
||||
|
||||
//call backup
|
||||
String location = createTempDir().toFile().getAbsolutePath();
|
||||
|
@ -56,6 +70,313 @@ public class TestCoreBackup extends SolrTestCaseJ4 {
|
|||
"core", DEFAULT_TEST_COLLECTION_NAME, "name", snapshotName, "location", location)
|
||||
, resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
|
||||
simpleBackupCheck(new File(location, "snapshot." + snapshotName), 2);
|
||||
}
|
||||
|
||||
public void testBackupBeforeFirstCommit() throws Exception {
|
||||
|
||||
// even w/o a user sending any data, the SolrCore initialiation logic should have automatically created
|
||||
// an "empty" commit point that can be backed up...
|
||||
final IndexCommit empty = h.getCore().getDeletionPolicy().getLatestCommit();
|
||||
assertNotNull(empty);
|
||||
|
||||
// white box sanity check that the commit point of the "reader" available from SolrIndexSearcher
|
||||
// matches the commit point that IDPW claims is the "latest"
|
||||
//
|
||||
// this is important to ensure that backup/snapshot behavior is consistent with user expection
|
||||
// when using typical commit + openSearcher
|
||||
assertEquals(empty, h.getCore().withSearcher(s -> s.getIndexReader().getIndexCommit()));
|
||||
|
||||
assertEquals(1L, empty.getGeneration());
|
||||
assertNotNull(empty.getSegmentsFileName());
|
||||
final String initialEmptyIndexSegmentFileName = empty.getSegmentsFileName();
|
||||
|
||||
final CoreContainer cores = h.getCoreContainer();
|
||||
final CoreAdminHandler admin = new CoreAdminHandler(cores);
|
||||
|
||||
final File backupDir = createTempDir().toFile();
|
||||
|
||||
{ // first a backup before we've ever done *anything*...
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", "empty_backup1",
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot.empty_backup1"),
|
||||
0, initialEmptyIndexSegmentFileName);
|
||||
}
|
||||
|
||||
{ // Empty (named) snapshot..
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATESNAPSHOT.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"commitName", "empty_snapshotA"),
|
||||
resp);
|
||||
assertNull("Snapshot A should have succeeded", resp.getException());
|
||||
}
|
||||
|
||||
assertU(adoc("id", "1")); // uncommitted
|
||||
|
||||
{ // second backup w/uncommited docs
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", "empty_backup2",
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot.empty_backup2"),
|
||||
0, initialEmptyIndexSegmentFileName);
|
||||
}
|
||||
|
||||
{ // Second empty (named) snapshot..
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATESNAPSHOT.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"commitName", "empty_snapshotB"),
|
||||
resp);
|
||||
assertNull("Snapshot A should have succeeded", resp.getException());
|
||||
}
|
||||
|
||||
// Committing the doc now should not affect the existing backups or snapshots...
|
||||
assertU(commit());
|
||||
|
||||
for (String name : Arrays.asList("empty_backup1", "empty_backup2")) {
|
||||
simpleBackupCheck(new File(backupDir, "snapshot." + name ),
|
||||
0, initialEmptyIndexSegmentFileName);
|
||||
}
|
||||
|
||||
// Make backups from each of the snapshots and check they are still empty as well...
|
||||
for (String snapName : Arrays.asList("empty_snapshotA", "empty_snapshotB")) {
|
||||
String name = "empty_backup_from_" + snapName;
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", name,
|
||||
"commitName", snapName,
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup "+name+" should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot." + name),
|
||||
0, initialEmptyIndexSegmentFileName);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a softCommit does not affect what data is in a backup
|
||||
*/
|
||||
public void testBackupAfterSoftCommit() throws Exception {
|
||||
|
||||
// sanity check empty index...
|
||||
assertQ(req("q", "id:42"), "//result[@numFound='0']");
|
||||
assertQ(req("q", "id:99"), "//result[@numFound='0']");
|
||||
assertQ(req("q", "*:*"), "//result[@numFound='0']");
|
||||
|
||||
// hard commit one doc...
|
||||
assertU(adoc("id", "99"));
|
||||
assertU(commit());
|
||||
assertQ(req("q", "id:99"), "//result[@numFound='1']");
|
||||
assertQ(req("q", "*:*"), "//result[@numFound='1']");
|
||||
|
||||
final IndexCommit oneDocCommit = h.getCore().getDeletionPolicy().getLatestCommit();
|
||||
assertNotNull(oneDocCommit);
|
||||
final String oneDocSegmentFile = oneDocCommit.getSegmentsFileName();
|
||||
|
||||
final CoreContainer cores = h.getCoreContainer();
|
||||
final CoreAdminHandler admin = new CoreAdminHandler(cores);
|
||||
|
||||
final File backupDir = createTempDir().toFile();
|
||||
|
||||
|
||||
{ // take an initial 'backup1a' containing our 1 document
|
||||
final SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", "backup1a",
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot.backup1a"),
|
||||
1, oneDocSegmentFile);
|
||||
}
|
||||
|
||||
{ // and an initial "snapshot1a' that should eventually match
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATESNAPSHOT.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"commitName", "snapshot1a"),
|
||||
resp);
|
||||
assertNull("Snapshot 1A should have succeeded", resp.getException());
|
||||
}
|
||||
|
||||
// now we add our 2nd doc, and make it searchable, but we do *NOT* hard commit it to the index dir...
|
||||
assertU(adoc("id", "42"));
|
||||
assertU(commit("softCommit", "true", "openSearcher", "true"));
|
||||
|
||||
assertQ(req("q", "id:99"), "//result[@numFound='1']");
|
||||
assertQ(req("q", "id:42"), "//result[@numFound='1']");
|
||||
assertQ(req("q", "*:*"), "//result[@numFound='2']");
|
||||
|
||||
|
||||
{ // we now have an index with two searchable docs, but a new 'backup1b' should still
|
||||
// be identical to the previous backup...
|
||||
final SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", "backup1b",
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot.backup1b"),
|
||||
1, oneDocSegmentFile);
|
||||
}
|
||||
|
||||
{ // and a second "snapshot1b' should also still be identical
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATESNAPSHOT.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"commitName", "snapshot1b"),
|
||||
resp);
|
||||
assertNull("Snapshot 1B should have succeeded", resp.getException());
|
||||
}
|
||||
|
||||
// Hard Committing the 2nd doc now should not affect the existing backups or snapshots...
|
||||
assertU(commit());
|
||||
|
||||
for (String name : Arrays.asList("backup1a", "backup1b")) {
|
||||
simpleBackupCheck(new File(backupDir, "snapshot." + name ),
|
||||
1, oneDocSegmentFile);
|
||||
}
|
||||
|
||||
{ // But we should be able to confirm both docs appear in a new backup (not based on a previous snapshot)
|
||||
final SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", "backup2",
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot.backup2"), 2);
|
||||
}
|
||||
|
||||
// if we go back and create backups from our earlier snapshots they should still only
|
||||
// have 1 expected doc...
|
||||
// Make backups from each of the snapshots and check they are still empty as well...
|
||||
for (String snapName : Arrays.asList("snapshot1a", "snapshot1b")) {
|
||||
String name = "backup_from_" + snapName;
|
||||
SolrQueryResponse resp = new SolrQueryResponse();
|
||||
admin.handleRequestBody
|
||||
(req(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString(),
|
||||
"core", DEFAULT_TEST_COLLECTION_NAME,
|
||||
"name", name,
|
||||
"commitName", snapName,
|
||||
"location", backupDir.getAbsolutePath()),
|
||||
resp);
|
||||
assertNull("Backup "+name+" should have succeeded", resp.getException());
|
||||
simpleBackupCheck(new File(backupDir, "snapshot." + name),
|
||||
1, oneDocSegmentFile);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple sanity check that asserts the current weird behavior of DirectoryReader.openIfChanged()
|
||||
* and demos how 'softCommit' can cause the IndexReader in use by SolrIndexSearcher to missrepresent what
|
||||
* commit is "current". So Backup code should only ever "trust" the IndexCommit info available from the
|
||||
* IndexDeletionPolicyWrapper
|
||||
*
|
||||
* @see <a href="https://issues.apache.org/jira/browse/LUCENE-9040">LUCENE-9040</a>
|
||||
* @see <a href="https://issues.apache.org/jira/browse/SOLR-13909">SOLR-13909</a>
|
||||
*/
|
||||
public void testDemoWhyBackupCodeShouldNeverUseIndexCommitFromSearcher() throws Exception {
|
||||
|
||||
final long EXPECTED_GEN_OF_EMPTY_INDEX = 1L;
|
||||
|
||||
// sanity check this is an empty index...
|
||||
assertQ(req("q", "*:*"), "//result[@numFound='0']");
|
||||
|
||||
// sanity check what the searcher/reader of this empty index report about current commit
|
||||
final IndexCommit empty = h.getCore().withSearcher(s -> {
|
||||
// sanity check we are empty...
|
||||
assertEquals(0L, (long) s.getIndexReader().numDocs());
|
||||
|
||||
// sanity check this is the initial commit..
|
||||
final IndexCommit commit = s.getIndexReader().getIndexCommit();
|
||||
assertEquals(EXPECTED_GEN_OF_EMPTY_INDEX, (long) commit.getGeneration());
|
||||
return commit;
|
||||
});
|
||||
|
||||
// now let's add & soft commit 1 doc...
|
||||
assertU(adoc("id", "42"));
|
||||
assertU(commit("softCommit", "true", "openSearcher", "true"));
|
||||
|
||||
// verify it's "searchable" ...
|
||||
assertQ(req("q", "id:42"), "//result[@numFound='1']");
|
||||
|
||||
// sanity check what the searcher/reader of this empty index report about current commit
|
||||
IndexCommit oneDoc = h.getCore().withSearcher(s -> {
|
||||
// sanity check this really is the searcher/reader that has the new doc...
|
||||
assertEquals(1L, (long) s.getIndexReader().numDocs());
|
||||
|
||||
final IndexCommit commit = s.getIndexReader().getIndexCommit();
|
||||
// WTF: how/why does this reader still have the same commit generation as before ? ? ? ? ?
|
||||
assertEquals("WTF: This Reader (claims) the same generation as our previous pre-softCommif (empty) reader",
|
||||
EXPECTED_GEN_OF_EMPTY_INDEX, (long) commit.getGeneration());
|
||||
return commit;
|
||||
});
|
||||
|
||||
assertEquals("WTF: Our two IndexCommits, which we know have different docs, claim to be equals",
|
||||
empty, oneDoc);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple check that the backup exists, is a valid index, and contains the expected number of docs
|
||||
*/
|
||||
private static void simpleBackupCheck(final File backup, final int numDocs) throws IOException {
|
||||
simpleBackupCheck(backup, numDocs, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple check that the backup exists, is a valid index, and contains the expected number of docs.
|
||||
* If expectedSegmentsFileName is non null then confirms that file exists in the bakup dir
|
||||
* <em>and</em> that it is reported as the current segment file when opening a reader on that backup.
|
||||
*/
|
||||
private static void simpleBackupCheck(final File backup, final int numDocs,
|
||||
final String expectedSegmentsFileName) throws IOException {
|
||||
assertNotNull(backup);
|
||||
assertTrue("Backup doesn't exist" + backup.toString(), backup.exists());
|
||||
if (null != expectedSegmentsFileName) {
|
||||
assertTrue(expectedSegmentsFileName + " doesn't exist in " + backup.toString(),
|
||||
new File(backup, expectedSegmentsFileName).exists());
|
||||
}
|
||||
try (Directory dir = FSDirectory.open(backup.toPath())) {
|
||||
TestUtil.checkIndex(dir, true, true, null);
|
||||
try (DirectoryReader r = DirectoryReader.open(dir)) {
|
||||
assertEquals("numDocs in " + backup.toString(),
|
||||
numDocs, r.numDocs());
|
||||
if (null != expectedSegmentsFileName) {
|
||||
assertEquals("segmentsFile of IndexCommit for: " + backup.toString(),
|
||||
expectedSegmentsFileName, r.getIndexCommit().getSegmentsFileName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -49,13 +49,16 @@ import org.apache.solr.BaseDistributedSearchTestCase;
|
|||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.GenericSolrRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
|
@ -103,7 +106,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
+ File.separator;
|
||||
|
||||
JettySolrRunner masterJetty, slaveJetty, repeaterJetty;
|
||||
SolrClient masterClient, slaveClient, repeaterClient;
|
||||
HttpSolrClient masterClient, slaveClient, repeaterClient;
|
||||
SolrInstance master = null, slave = null, repeater = null;
|
||||
|
||||
static String context = "/solr";
|
||||
|
@ -178,7 +181,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
return jetty;
|
||||
}
|
||||
|
||||
static SolrClient createNewSolrClient(int port) {
|
||||
static HttpSolrClient createNewSolrClient(int port) {
|
||||
try {
|
||||
// setup the client...
|
||||
final String baseUrl = buildUrl(port) + "/" + DEFAULT_TEST_CORENAME;
|
||||
|
@ -1572,6 +1575,59 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, thrown.code());
|
||||
assertThat(thrown.getMessage(), containsString("Missing required parameter: name"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyBackups() throws Exception {
|
||||
final File backupDir = createTempDir().toFile();
|
||||
final CheckBackupStatus backupStatus = new CheckBackupStatus(masterClient, /* Silly API */ ".");
|
||||
|
||||
{ // initial request w/o any committed docs
|
||||
final GenericSolrRequest req = new GenericSolrRequest
|
||||
(SolrRequest.METHOD.GET, "/replication",
|
||||
params("command", "backup",
|
||||
"location", backupDir.getAbsolutePath(),
|
||||
"name", "empty_backup1"));
|
||||
final TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
final SimpleSolrResponse rsp = req.process(masterClient);
|
||||
|
||||
while (!timeout.hasTimedOut()) {
|
||||
backupStatus.fetchStatus();
|
||||
if (backupStatus.success) {
|
||||
break;
|
||||
}
|
||||
timeout.sleep(50);
|
||||
}
|
||||
assertTrue(backupStatus.success);
|
||||
|
||||
assertTrue("snapshot.empty_backup1 doesn't exist in expected location",
|
||||
new File(backupDir, "snapshot.empty_backup1").exists());
|
||||
}
|
||||
|
||||
index(masterClient, "id", "1", "name", "foo");
|
||||
|
||||
{ // second backup w/uncommited doc
|
||||
final GenericSolrRequest req = new GenericSolrRequest
|
||||
(SolrRequest.METHOD.GET, "/replication",
|
||||
params("command", "backup",
|
||||
"location", backupDir.getAbsolutePath(),
|
||||
"name", "empty_backup2"));
|
||||
final TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
final SimpleSolrResponse rsp = req.process(masterClient);
|
||||
|
||||
while (!timeout.hasTimedOut()) {
|
||||
backupStatus.fetchStatus();
|
||||
if (backupStatus.success) {
|
||||
break;
|
||||
}
|
||||
timeout.sleep(50);
|
||||
}
|
||||
assertTrue(backupStatus.success);
|
||||
|
||||
assertTrue("snapshot.empty_backup2 doesn't exist in expected location",
|
||||
new File(backupDir, "snapshot.empty_backup2").exists());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class AddExtraDocs implements Runnable {
|
||||
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.handler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.GenericSolrRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Nightly
|
||||
@SuppressCodecs({"SimpleText"})
|
||||
@LogLevel("org.apache.solr.handler.SnapShooter=DEBUG;org.apache.solr.core.IndexDeletionPolicyWrapper=DEBUG")
|
||||
public class TestStressThreadBackup extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final Pattern ENDS_WITH_INT_DIGITS = Pattern.compile("\\d+$");
|
||||
private File backupDir;
|
||||
private SolrClient adminClient;
|
||||
private SolrClient coreClient;
|
||||
private String coreName;
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
backupDir = createTempDir(getTestClass().getSimpleName() + "_backups").toFile();
|
||||
|
||||
// NOTE: we don't actually care about using SolrCloud, but we want to use SolrClient and I can't
|
||||
// bring myself to deal with the nonsense that is SolrJettyTestBase.
|
||||
|
||||
// We do however explicitly want a fresh "cluster" every time a test is run
|
||||
configureCluster(1)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
|
||||
.configure();
|
||||
|
||||
assertEquals(0, (CollectionAdminRequest.createCollection(DEFAULT_TEST_COLLECTION_NAME, "conf1", 1, 1)
|
||||
.process(cluster.getSolrClient()).getStatus()));
|
||||
adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
|
||||
initCoreNameAndSolrCoreClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() throws Exception {
|
||||
// we use a clean cluster instance for every test, so we need to clean it up
|
||||
shutdownCluster();
|
||||
|
||||
if (null != adminClient) {
|
||||
adminClient.close();
|
||||
}
|
||||
if (null != coreClient) {
|
||||
coreClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testCoreAdminHandler() throws Exception {
|
||||
// Use default BackupAPIImpl which hits CoreAdmin API for everything
|
||||
testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(new BackupAPIImpl());
|
||||
}
|
||||
|
||||
public void testReplicationHandler() throws Exception {
|
||||
// Create a custom BackupAPIImpl which uses ReplicatoinHandler for the backups
|
||||
// but still defaults to CoreAdmin for making named snapshots (since that's what's documented)
|
||||
testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(new BackupAPIImpl() {
|
||||
/** no solrj API for ReplicationHandler */
|
||||
private GenericSolrRequest makeReplicationReq(SolrParams p) {
|
||||
return new GenericSolrRequest(GenericSolrRequest.METHOD.GET, "/replication", p);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override default backup impl to hit ReplicationHandler,
|
||||
* and then poll that same handler until success
|
||||
*/
|
||||
public void makeBackup(final String backupName, final String snapName) throws Exception {
|
||||
final TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
ModifiableSolrParams p = params("command", "backup",
|
||||
"name", backupName,
|
||||
CoreAdminParams.BACKUP_LOCATION, backupDir.getAbsolutePath());
|
||||
if (null != snapName) {
|
||||
p.add(CoreAdminParams.COMMIT_NAME, snapName);
|
||||
}
|
||||
makeReplicationReq(p).process(coreClient);
|
||||
|
||||
// "/replication" handler is all async, need to poll untill we see *this*
|
||||
// backupName report success
|
||||
while (!timeout.hasTimedOut()) {
|
||||
if (checkBackupSuccess(backupName)) {
|
||||
return;
|
||||
}
|
||||
timeout.sleep(50);
|
||||
}
|
||||
|
||||
// total TimeOut elapsed, so one last check or fail whole test.
|
||||
assertTrue(backupName + " never succeeded after waiting excessive amount of time",
|
||||
checkBackupSuccess(backupName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the replication handler's 'details' command indicates that
|
||||
* the most recently (succcessfully) completed backup has the specified name.
|
||||
* "fails" the test if 'details' ever indicates there was a backup exception.
|
||||
*/
|
||||
private boolean checkBackupSuccess(final String backupName) throws Exception {
|
||||
final SimpleSolrResponse rsp = makeReplicationReq(params("command", "details")).process(coreClient);
|
||||
final NamedList data = rsp.getResponse();
|
||||
log.info("Checking Status of {}: {}", backupName, data);
|
||||
final NamedList<String> backupData = (NamedList<String>) data.findRecursive("details","backup");
|
||||
if (null == backupData) {
|
||||
// no backup has finished yet
|
||||
return false;
|
||||
}
|
||||
|
||||
final Object exception = backupData.get("exception");
|
||||
assertNull("Backup failure", exception);
|
||||
|
||||
if (backupName.equals(backupData.get("snapshotName"))
|
||||
&& "success".equals(backupData.get("status"))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public void testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(final BackupAPIImpl impl) throws Exception {
|
||||
final int numBackupIters = 20; // don't use 'atLeast', we don't want to blow up on nightly
|
||||
|
||||
final AtomicReference<Throwable> heavyCommitFailure = new AtomicReference<>();
|
||||
final AtomicBoolean keepGoing = new AtomicBoolean(true);
|
||||
|
||||
// this thread will do nothing but add/commit new 'dummy' docs over and over again as fast as possible
|
||||
// to create a lot of index churn w/ segment merging
|
||||
final Thread heavyCommitting = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
int docIdCounter = 0;
|
||||
while (keepGoing.get()) {
|
||||
docIdCounter++;
|
||||
|
||||
final UpdateRequest req = new UpdateRequest().add(makeDoc("dummy_" + docIdCounter, "dummy"));
|
||||
// always commit to force lots of new segments
|
||||
req.setParam(UpdateParams.COMMIT,"true");
|
||||
req.setParam(UpdateParams.OPEN_SEARCHER,"false"); // we don't care about searching
|
||||
|
||||
// frequently forceMerge to ensure segments are frequently deleted
|
||||
if (0 == (docIdCounter % 13)) { // arbitrary
|
||||
req.setParam(UpdateParams.OPTIMIZE, "true");
|
||||
req.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "5"); // arbitrary
|
||||
}
|
||||
|
||||
log.info("Heavy Committing #{}: {}", docIdCounter, req);
|
||||
final UpdateResponse rsp = req.process(coreClient);
|
||||
assertEquals("Dummy Doc#" + docIdCounter + " add status: " + rsp.toString(), 0, rsp.getStatus());
|
||||
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
heavyCommitFailure.set(t);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
heavyCommitting.start();
|
||||
try {
|
||||
// now have the "main" test thread try to take a serious of backups/snapshots
|
||||
// while adding other "real" docs
|
||||
|
||||
final Queue<String> namedSnapshots = new LinkedList<>();
|
||||
|
||||
// NOTE #1: start at i=1 for 'id' & doc counting purposes...
|
||||
// NOTE #2: abort quickly if the oher thread reports a heavyCommitFailure...
|
||||
for (int i = 1; (i <= numBackupIters && null == heavyCommitFailure.get()); i++) {
|
||||
|
||||
// in each iteration '#i', the commit we create should have exactly 'i' documents in
|
||||
// it with the term 'type_s:real' (regardless of what the other thread does with dummy docs)
|
||||
|
||||
// add & commit a doc #i
|
||||
final UpdateRequest req = new UpdateRequest().add(makeDoc("doc_" + i, "real"));
|
||||
req.setParam(UpdateParams.COMMIT,"true"); // make immediately available for backup
|
||||
req.setParam(UpdateParams.OPEN_SEARCHER,"false"); // we don't care about searching
|
||||
|
||||
final UpdateResponse rsp = req.process(coreClient);
|
||||
assertEquals("Real Doc#" + i + " add status: " + rsp.toString(), 0, rsp.getStatus());
|
||||
|
||||
// create a backup of the 'current' index
|
||||
impl.makeBackup("backup_currentAt_" + i);
|
||||
|
||||
// verify backup is valid and has the number of 'real' docs we expect...
|
||||
validateBackup("backup_currentAt_" + i);
|
||||
|
||||
// occasionally make a "snapshot_i", add it to 'namedSnapshots'
|
||||
// NOTE: we don't want to do this too often, or the SnapShotMetadataManager will protect
|
||||
// too many segment files "long term". It's more important to stress the thread contention
|
||||
// between backups calling save/release vs the DelPolicy trying to delete segments
|
||||
if ( 0 == random().nextInt(7 + namedSnapshots.size()) ) {
|
||||
final String snapshotName = "snapshot_" + i;
|
||||
log.info("Creating snapshot: {}", snapshotName);
|
||||
impl.makeSnapshot(snapshotName);
|
||||
namedSnapshots.add(snapshotName);
|
||||
}
|
||||
|
||||
// occasionally make a backup of a snapshot and remove it
|
||||
// the odds of doing this increase based on how many snapshots currently exist,
|
||||
// and how few iterations we have left
|
||||
if (3 < namedSnapshots.size() &&
|
||||
random().nextInt(3 + numBackupIters - i) < random().nextInt(namedSnapshots.size())) {
|
||||
|
||||
assert 0 < namedSnapshots.size() : "Someone broke the conditionl";
|
||||
final String snapshotName = namedSnapshots.poll();
|
||||
final String backupName = "backup_as_of_" + snapshotName;
|
||||
log.info("Creating {} from {} in iter={}", backupName, snapshotName, i);
|
||||
impl.makeBackup(backupName, snapshotName);
|
||||
log.info("Deleting {} in iter={}", snapshotName, i);
|
||||
impl.deleteSnapshot(snapshotName);
|
||||
|
||||
validateBackup(backupName);
|
||||
|
||||
// NOTE: we can't directly compare our backups, because the stress thread
|
||||
// may have added/committed documents
|
||||
// ie: backup_as_of_snapshot_4 and backup_currentAt_4 should have the same 4 "real"
|
||||
// documents, but they may have other commits that affect the data files
|
||||
// between when the backup was taken and when the snapshot was taken
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
keepGoing.set(false);
|
||||
heavyCommitting.join();
|
||||
}
|
||||
assertNull(heavyCommitFailure.get());
|
||||
|
||||
{ log.info("Done with (concurrent) updates, Deleting all docs...");
|
||||
final UpdateRequest delAll = new UpdateRequest().deleteByQuery("*:*");
|
||||
delAll.setParam(UpdateParams.COMMIT,"true");
|
||||
delAll.setParam(UpdateParams.OPTIMIZE, "true");
|
||||
delAll.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "1"); // purge as many files as possible
|
||||
final UpdateResponse delRsp = delAll.process(coreClient);
|
||||
assertEquals("dellAll status: " + delRsp.toString(), 0, delRsp.getStatus());
|
||||
}
|
||||
|
||||
{ // Validate some backups at random...
|
||||
final int numBackupsToCheck = atLeast(1);
|
||||
log.info("Validating {} random backups to ensure they are un-affected by deleting all docs...",
|
||||
numBackupsToCheck);
|
||||
final List<File> allBackups = Arrays.asList(backupDir.listFiles());
|
||||
// insure consistent (arbitrary) ordering before shuffling
|
||||
Collections.sort(allBackups);
|
||||
Collections.shuffle(allBackups, random());
|
||||
for (int i = 0; i < numBackupsToCheck; i++) {
|
||||
final File backup = allBackups.get(i);
|
||||
validateBackup(backup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a backup name, extrats the numberic suffix identifying how many "real" docs should be in it
|
||||
*
|
||||
* @see #ENDS_WITH_INT_DIGITS
|
||||
*/
|
||||
private static int getNumRealDocsFromBackupName(final String backupName) {
|
||||
final Matcher m = ENDS_WITH_INT_DIGITS.matcher(backupName);
|
||||
assertTrue("Backup name does not end with int digits: " + backupName, m.find());
|
||||
return Integer.parseInt(m.group());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a backup exists, passes check index, and contains a number of "real" documents
|
||||
* that match it's name
|
||||
*
|
||||
* @see #validateBackup(File)
|
||||
*/
|
||||
private void validateBackup(final String backupName) throws IOException {
|
||||
final File backup = new File(backupDir, "snapshot." + backupName);
|
||||
validateBackup(backup);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a backup dir exists, passes check index, and contains a number of "real" documents
|
||||
* that match it's name
|
||||
*
|
||||
* @see #getNumRealDocsFromBackupName
|
||||
*/
|
||||
private void validateBackup(final File backup) throws IOException {
|
||||
log.info("Checking Validity of {}", backup);
|
||||
assertTrue(backup.toString() + ": isDir?", backup.isDirectory());
|
||||
final Matcher m = ENDS_WITH_INT_DIGITS.matcher(backup.getName());
|
||||
assertTrue("Backup dir name does not end with int digits: " + backup.toString(), m.find());
|
||||
final int numRealDocsExpected = Integer.parseInt(m.group());
|
||||
|
||||
try (Directory dir = FSDirectory.open(backup.toPath())) {
|
||||
TestUtil.checkIndex(dir, true, true, null);
|
||||
try (DirectoryReader r = DirectoryReader.open(dir)) {
|
||||
assertEquals("num real docs in " + backup.toString(),
|
||||
numRealDocsExpected, r.docFreq(new Term("type_s","real")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a "large" document with lots of fields (to stimulate lots of files in each segment)
|
||||
* @param id the uniqueKey
|
||||
* @param type the type of the doc for use in the 'type_s' field (for term counting later)
|
||||
*/
|
||||
private static SolrInputDocument makeDoc(String id, String type) {
|
||||
final SolrInputDocument doc = new SolrInputDocument("id", id, "type_s", type);
|
||||
for (int f = 0; f < 100; f++) {
|
||||
doc.addField(f + "_s", TestUtil.randomUnicodeString(random(), 20));
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
private void initCoreNameAndSolrCoreClient() {
|
||||
// Sigh.
|
||||
Replica r = cluster.getSolrClient().getZkStateReader().getClusterState()
|
||||
.getCollection(DEFAULT_TEST_COLLECTION_NAME).getActiveSlices().iterator().next()
|
||||
.getReplicas().iterator().next();
|
||||
coreName = r.getCoreName();
|
||||
coreClient = getHttpSolrClient(r.getCoreUrl());
|
||||
}
|
||||
|
||||
/**
|
||||
* API for taking backups and snapshots that can hide the impl quirks of
|
||||
* using ReplicationHandler vs CoreAdminHandler (the default)
|
||||
*/
|
||||
private class BackupAPIImpl {
|
||||
/** TODO: SOLR-9239, no solrj API for CoreAdmin Backups */
|
||||
protected GenericSolrRequest makeCoreAdmin(CoreAdminAction action, SolrParams p) {
|
||||
return new GenericSolrRequest(GenericSolrRequest.METHOD.POST, "/admin/cores",
|
||||
SolrParams.wrapDefaults(params(CoreAdminParams.ACTION, action.toString()), p));
|
||||
}
|
||||
|
||||
/** Make a backup or the named commit snapshot (or null for latest), and only return if successful */
|
||||
public void makeBackup(final String backupName) throws Exception {
|
||||
makeBackup(backupName, null);
|
||||
}
|
||||
|
||||
/** Make a backup or latest commit, and only return if successful */
|
||||
public void makeBackup(final String backupName, final String snapName) throws Exception {
|
||||
ModifiableSolrParams p = params(CoreAdminParams.CORE, coreName,
|
||||
CoreAdminParams.NAME, backupName,
|
||||
CoreAdminParams.BACKUP_LOCATION, backupDir.getAbsolutePath());
|
||||
if (null != snapName) {
|
||||
p.add(CoreAdminParams.COMMIT_NAME, snapName);
|
||||
}
|
||||
makeCoreAdmin(CoreAdminAction.BACKUPCORE, p).process(adminClient);
|
||||
// CoreAdmin BACKUPCORE is synchronous by default, no need to wait for anything.
|
||||
}
|
||||
|
||||
/** Make a named snapshot, and only return if successful */
|
||||
public void makeSnapshot(final String snapName) throws Exception {
|
||||
makeCoreAdmin(CoreAdminAction.CREATESNAPSHOT,
|
||||
params(CoreAdminParams.CORE, coreName,
|
||||
CoreAdminParams.COMMIT_NAME, snapName)).process(adminClient);
|
||||
// CoreAdmin CREATESNAPSHOT is synchronous by default, no need to wait for anything.
|
||||
}
|
||||
|
||||
/** Delete a named snapshot, and only return if successful */
|
||||
public void deleteSnapshot(final String snapName) throws Exception {
|
||||
makeCoreAdmin(CoreAdminAction.DELETESNAPSHOT,
|
||||
params(CoreAdminParams.CORE, coreName,
|
||||
CoreAdminParams.COMMIT_NAME, snapName)).process(adminClient);
|
||||
// CoreAdmin DELETESNAPSHOT is synchronous by default, no need to wait for anything.
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1232,7 +1232,7 @@ Backs up Solr collections and associated configurations to a shared filesystem -
|
|||
|
||||
`/admin/collections?action=BACKUP&name=myBackupName&collection=myCollectionName&location=/path/to/my/shared/drive`
|
||||
|
||||
The BACKUP command will backup Solr indexes and configurations for a specified collection. The BACKUP command takes one copy from each shard for the indexes. For configurations, it backs up the configset that was associated with the collection and metadata.
|
||||
The BACKUP command will backup Solr indexes and configurations for a specified collection. The BACKUP command <<making-and-restoring-backups.adoc#making-and-restoring-backups,takes one copy from each shard for the indexes>>. For configurations, it backs up the configset that was associated with the collection and metadata.
|
||||
|
||||
=== BACKUP Parameters
|
||||
|
||||
|
|
|
@ -20,6 +20,12 @@ If you are worried about data loss, and of course you _should_ be, you need a wa
|
|||
|
||||
Solr provides two approaches to backing up and restoring Solr cores or collections, depending on how you are running Solr. If you run in SolrCloud mode, you will use the Collections API. If you run Solr in standalone mode, you will use the replication handler.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
Backups (and Snapshots) capture data that has been <<near-real-time-searching.adoc#commits-and-searching,_hard_ commited>>. Commiting changes using `softCommit=true` may result in changes that are visible in search results but not included in subsequent backups. Likewise, committing changes using `openSearcher=false` may result in changes committed to disk and included in subsequnt backups, even if they are not currently visible in search results.
|
||||
|
||||
====
|
||||
|
||||
== SolrCloud Backups
|
||||
|
||||
Support for backups when running SolrCloud is provided with the <<collections-api.adoc#collections-api,Collections API>>. This allows the backups to be generated across multiple shards, and restored to the same number of shards and replicas as the original collection.
|
||||
|
|
Loading…
Reference in New Issue