Tighten up concurrent store metadata listing and engine writes (#19684)
In several places in our code we need to get a consistent list of files + metadata of the current index. We currently have a couple of ways to do in the `Store` class, which also does the right things and tries to verify the integrity of the smaller files. Sadly, those methods can run into trouble if anyone writes into the folder while they are busy. Most notably, the index shard's engine decides to commit half way and remove a `segment_N` file before the store got to checksum (but did already list it). This race condition typically doesn't happen as almost all of the places where we list files also happen to be places where the relevant shard doesn't yet have an engine. There is however an exception (of course :)) which is the API to list shard stores, used by the master when it is looking for shard copies to assign to. I already took one shot at fixing this in #19416 , but it turns out not to be enough - see for example https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-os-compatibility/os=sles/822. The first inclination to fix this was to add more locking to the different Store methods and acquire the `IndexWriter` lock, thus preventing any engine for accessing if if the a shard is offline and use the current index commit snapshotting logic already existing in `IndexShard` for when the engine is started. That turned out to be a bad idea as we create more subtleties where, for example, a store listing can prevent a shard from starting up (the writer lock doesn't wait if it can't get access, but fails immediately, which is good). Another example is running on a shared directory where some other engine may actually hold the lock. Instead I decided to take another approach: 1) Remove all the various methods on store and keep one, which accepts an index commit (which can be null) and also clearly communicates that the *caller* is responsible for concurrent access. This also tightens up the API which is a plus. 2) Add a `snapshotStore` method to IndexShard that takes care of all the concurrency aspects with the engine, which is now possible because it's all in the same place. It's still a bit ugly but at least it's all in one place and we can evaluate how to improve on this later on. I also renamed the `snapshotIndex` method to `acquireIndexCommit` to avoid confusion and I think it communicates better what it does.
This commit is contained in:
parent
046abdc281
commit
f6aeb35ce8
|
@ -654,7 +654,7 @@ public abstract class Engine implements Closeable {
|
|||
*
|
||||
* @param flushFirst indicates whether the engine should flush before returning the snapshot
|
||||
*/
|
||||
public abstract IndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
|
||||
public abstract IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException;
|
||||
|
||||
/**
|
||||
* fail engine due to some error. the engine will also be closed.
|
||||
|
|
|
@ -852,7 +852,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public IndexCommit snapshotIndex(final boolean flushFirst) throws EngineException {
|
||||
public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineException {
|
||||
// we have to flush outside of the readlock otherwise we might have a problem upgrading
|
||||
// the to a write lock when we fail the engine in this operation
|
||||
if (flushFirst) {
|
||||
|
|
|
@ -205,7 +205,7 @@ public class ShadowEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
|
||||
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,11 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
||||
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
@ -29,6 +33,7 @@ import org.apache.lucene.search.Query;
|
|||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
@ -116,10 +121,12 @@ import org.elasticsearch.search.suggest.completion.CompletionStats;
|
|||
import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
@ -789,15 +796,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
/**
|
||||
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
|
||||
* commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}.
|
||||
* commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}.
|
||||
*
|
||||
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
|
||||
*/
|
||||
public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
|
||||
public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException {
|
||||
IndexShardState state = this.state; // one time volatile read
|
||||
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
|
||||
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
|
||||
return getEngine().snapshotIndex(flushFirst);
|
||||
return getEngine().acquireIndexCommit(flushFirst);
|
||||
} else {
|
||||
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
|
||||
}
|
||||
|
@ -805,13 +812,50 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
|
||||
/**
|
||||
* Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources
|
||||
* Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources
|
||||
* referenced by the given snapshot {@link IndexCommit}.
|
||||
*/
|
||||
public void releaseSnapshot(IndexCommit snapshot) throws IOException {
|
||||
public void releaseIndexCommit(IndexCommit snapshot) throws IOException {
|
||||
deletionPolicy.release(snapshot);
|
||||
}
|
||||
|
||||
/**
|
||||
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
|
||||
* without having to worry about the current state of the engine and concurrent flushes.
|
||||
*
|
||||
* @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory
|
||||
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
|
||||
* unexpected exception when opening the index reading the segments file.
|
||||
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
|
||||
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
|
||||
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
|
||||
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
|
||||
*/
|
||||
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
|
||||
IndexCommit indexCommit = null;
|
||||
store.incRef();
|
||||
try {
|
||||
synchronized (mutex) {
|
||||
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
|
||||
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
|
||||
// That can be done out of mutex, since the engine can be closed half way.
|
||||
Engine engine = getEngineOrNull();
|
||||
if (engine == null) {
|
||||
try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
|
||||
return store.getMetadata(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
indexCommit = deletionPolicy.snapshot();
|
||||
return store.getMetadata(indexCommit);
|
||||
} finally {
|
||||
store.decRef();
|
||||
if (indexCommit != null) {
|
||||
deletionPolicy.release(indexCommit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fails the shard and marks the shard store as corrupted if
|
||||
* <code>e</code> is caused by index corruption
|
||||
|
@ -1310,7 +1354,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
if ("checksum".equals(checkIndexOnStartup)) {
|
||||
// physical verification only: verify all checksums for the latest commit
|
||||
IOException corrupt = null;
|
||||
MetadataSnapshot metadata = store.getMetadata();
|
||||
MetadataSnapshot metadata = snapshotStoreMetadata();
|
||||
for (Map.Entry<String, StoreFileMetaData> entry : metadata.asMap().entrySet()) {
|
||||
try {
|
||||
Store.checkIntegrity(entry.getValue(), store.directory());
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.lucene.index.IndexCommit;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.NoLockFactory;
|
||||
|
@ -31,7 +30,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -52,7 +50,7 @@ final class LocalShardSnapshot implements Closeable {
|
|||
store.incRef();
|
||||
boolean success = false;
|
||||
try {
|
||||
indexCommit = shard.snapshotIndex(true);
|
||||
indexCommit = shard.acquireIndexCommit(true);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
|
@ -120,7 +118,7 @@ final class LocalShardSnapshot implements Closeable {
|
|||
public void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try {
|
||||
shard.releaseSnapshot(indexCommit);
|
||||
shard.releaseIndexCommit(indexCommit);
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
|
|
|
@ -109,4 +109,9 @@ public final class ShadowIndexShard extends IndexShard {
|
|||
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
|
||||
throw new UnsupportedOperationException("Can't listen for a refresh on a shadow engine because it doesn't have a translog");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
|
||||
throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ import org.elasticsearch.env.ShardLock;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -208,45 +209,17 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new MetadataSnapshot for the latest commit in this store or
|
||||
* an empty snapshot if no index exists or can not be opened.
|
||||
*
|
||||
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
|
||||
* unexpected exception when opening the index reading the segments file.
|
||||
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
|
||||
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
|
||||
*/
|
||||
public MetadataSnapshot getMetadataOrEmpty() throws IOException {
|
||||
try {
|
||||
return getMetadata(null);
|
||||
} catch (IndexNotFoundException ex) {
|
||||
// that's fine - happens all the time no need to log
|
||||
} catch (FileNotFoundException | NoSuchFileException ex) {
|
||||
logger.info("Failed to open / find files while reading metadata snapshot");
|
||||
}
|
||||
return MetadataSnapshot.EMPTY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new MetadataSnapshot for the latest commit in this store.
|
||||
*
|
||||
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
|
||||
* unexpected exception when opening the index reading the segments file.
|
||||
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
|
||||
* @throws IndexFormatTooNewException if the lucene index is too new to be opened.
|
||||
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
|
||||
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
|
||||
* @throws IndexNotFoundException if no index / valid commit-point can be found in this store
|
||||
*/
|
||||
public MetadataSnapshot getMetadata() throws IOException {
|
||||
return getMetadata(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
|
||||
* the latest commit point is used.
|
||||
*
|
||||
* Note that this method requires the caller verify it has the right to access the store and
|
||||
* no concurrent file changes are happening. If in doubt, you probably want to use one of the following:
|
||||
*
|
||||
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, ESLogger)} to read a meta data while locking
|
||||
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
|
||||
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
|
||||
*
|
||||
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
|
||||
* unexpected exception when opening the index reading the segments file.
|
||||
* @throws IndexFormatTooOldException if the lucene index is too old to be opened.
|
||||
|
@ -634,7 +607,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
// ignore, we don't really care, will get deleted later on
|
||||
}
|
||||
}
|
||||
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
|
||||
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
|
||||
verifyAfterCleanup(sourceMetaData, metadataOrEmpty);
|
||||
} finally {
|
||||
metadataLock.writeLock().unlock();
|
||||
|
|
|
@ -127,7 +127,7 @@ public class RecoverySourceHandler {
|
|||
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
|
||||
final IndexCommit phase1Snapshot;
|
||||
try {
|
||||
phase1Snapshot = shard.snapshotIndex(false);
|
||||
phase1Snapshot = shard.acquireIndexCommit(false);
|
||||
} catch (Exception e) {
|
||||
IOUtils.closeWhileHandlingException(translogView);
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
|
||||
|
@ -139,7 +139,7 @@ public class RecoverySourceHandler {
|
|||
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
|
||||
} finally {
|
||||
try {
|
||||
shard.releaseSnapshot(phase1Snapshot);
|
||||
shard.releaseIndexCommit(phase1Snapshot);
|
||||
} catch (IOException ex) {
|
||||
logger.warn("releasing snapshot caused exception", ex);
|
||||
}
|
||||
|
|
|
@ -167,7 +167,13 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
|
|||
logger.trace("collecting local files for {}", recoveryTarget);
|
||||
Store.MetadataSnapshot metadataSnapshot = null;
|
||||
try {
|
||||
metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty();
|
||||
if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) {
|
||||
// we are not going to copy any files, so don't bother listing files, potentially running
|
||||
// into concurrency issues with the primary changing files underneath us.
|
||||
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
|
||||
} else {
|
||||
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("error while listing local files, recover as if there are none", e);
|
||||
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
|
||||
|
@ -178,6 +184,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
|
|||
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
|
||||
return;
|
||||
}
|
||||
logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size());
|
||||
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
|
||||
clusterService.localNode(),
|
||||
metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId());
|
||||
|
|
|
@ -123,14 +123,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
|
|||
if (indexService != null) {
|
||||
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
|
||||
if (indexShard != null) {
|
||||
final Store store = indexShard.store();
|
||||
store.incRef();
|
||||
try {
|
||||
exists = true;
|
||||
return new StoreFilesMetaData(shardId, store.getMetadataOrEmpty());
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
exists = true;
|
||||
return new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata());
|
||||
}
|
||||
}
|
||||
// try and see if we an list unallocated
|
||||
|
|
|
@ -23,14 +23,12 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
|
@ -174,7 +172,7 @@ public interface Repository extends LifecycleComponent {
|
|||
/**
|
||||
* Creates a snapshot of the shard based on the index commit point.
|
||||
* <p>
|
||||
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
|
||||
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
|
||||
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
|
||||
* <p>
|
||||
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException;
|
|||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.index.IndexNotFoundException;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
|
@ -40,14 +41,38 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.NotXContentException;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
|
||||
|
@ -61,37 +86,13 @@ import org.elasticsearch.index.store.Store;
|
|||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.NotXContentException;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.snapshots.SnapshotCreationException;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
@ -1444,7 +1445,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*/
|
||||
private class RestoreContext extends Context {
|
||||
|
||||
private final Store store;
|
||||
private final IndexShard targetShard;
|
||||
|
||||
private final RecoveryState recoveryState;
|
||||
|
||||
|
@ -1460,13 +1461,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
super(snapshotId, version, indexId, shard.shardId(), snapshotShardId);
|
||||
this.recoveryState = recoveryState;
|
||||
store = shard.store();
|
||||
this.targetShard = shard;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs restore operation
|
||||
*/
|
||||
public void restore() throws IOException {
|
||||
final Store store = targetShard.store();
|
||||
store.incRef();
|
||||
try {
|
||||
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId);
|
||||
|
@ -1491,12 +1493,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
|
||||
final Store.MetadataSnapshot recoveryTargetMetadata;
|
||||
Store.MetadataSnapshot recoveryTargetMetadata;
|
||||
try {
|
||||
recoveryTargetMetadata = store.getMetadataOrEmpty();
|
||||
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
|
||||
logger.warn("{} Can't read metadata from store", e, shardId);
|
||||
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
|
||||
recoveryTargetMetadata = targetShard.snapshotStoreMetadata();
|
||||
} catch (IndexNotFoundException e) {
|
||||
// happens when restore to an empty shard, not a big deal
|
||||
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
|
||||
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
|
||||
} catch (IOException e) {
|
||||
logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", e, shardId);
|
||||
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
|
||||
}
|
||||
|
||||
final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
|
||||
|
@ -1550,7 +1556,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
try {
|
||||
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
|
||||
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
|
||||
restoreFile(fileToRecover);
|
||||
restoreFile(fileToRecover, store);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
|
||||
|
@ -1597,7 +1603,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*
|
||||
* @param fileInfo file to be restored
|
||||
*/
|
||||
private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
|
||||
private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException {
|
||||
boolean success = false;
|
||||
|
||||
try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) {
|
||||
|
|
|
@ -348,7 +348,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
|
||||
try {
|
||||
// we flush first to make sure we get the latest writes snapshotted
|
||||
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
|
||||
IndexCommit snapshotIndexCommit = indexShard.acquireIndexCommit(true);
|
||||
try {
|
||||
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -358,7 +358,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
|
||||
}
|
||||
} finally {
|
||||
indexShard.releaseSnapshot(snapshotIndexCommit);
|
||||
indexShard.releaseIndexCommit(snapshotIndexCommit);
|
||||
}
|
||||
} catch (SnapshotFailedEngineException e) {
|
||||
throw e;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.index.replication;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexNotFoundException;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
|
@ -299,7 +300,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
|
|||
replica.prepareForIndexRecovery();
|
||||
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
|
||||
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
|
||||
replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0);
|
||||
getMetadataSnapshotOrEmpty(replica), RecoveryState.Type.REPLICA, 0);
|
||||
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {},
|
||||
(int) ByteSizeUnit.MB.toKB(1), logger);
|
||||
recovery.recoverToTarget();
|
||||
|
@ -307,6 +308,20 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
|
|||
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
|
||||
}
|
||||
|
||||
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {
|
||||
Store.MetadataSnapshot result;
|
||||
try {
|
||||
result = replica.snapshotStoreMetadata();
|
||||
} catch (IndexNotFoundException e) {
|
||||
// OK!
|
||||
result = Store.MetadataSnapshot.EMPTY;
|
||||
} catch (IOException e) {
|
||||
logger.warn("{} failed read store, treating as empty", e);
|
||||
result = Store.MetadataSnapshot.EMPTY;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public synchronized DiscoveryNode getPrimaryNode() {
|
||||
return getDiscoveryNode(primary.routingEntry().currentNodeId());
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.document.NumericDocValuesField;
|
|||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
|
@ -156,6 +157,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSear
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Simple unit-test IndexShard related operations.
|
||||
|
@ -476,6 +478,76 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths);
|
||||
}
|
||||
|
||||
public void testAcquireIndexCommit() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||
final IndexShard shard = test.getShardOrNull(0);
|
||||
int numDocs = randomInt(20);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("test", "type", "id_" + i).setSource("{}").get();
|
||||
}
|
||||
final boolean flushFirst = randomBoolean();
|
||||
IndexCommit commit = shard.acquireIndexCommit(flushFirst);
|
||||
int moreDocs = randomInt(20);
|
||||
for (int i = 0; i < moreDocs; i++) {
|
||||
client().prepareIndex("test", "type", "id_" + numDocs + i).setSource("{}").get();
|
||||
}
|
||||
shard.flush(new FlushRequest("index"));
|
||||
// check that we can still read the commit that we captured
|
||||
try (IndexReader reader = DirectoryReader.open(commit)) {
|
||||
assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0));
|
||||
}
|
||||
shard.releaseIndexCommit(commit);
|
||||
shard.flush(new FlushRequest("index").force(true));
|
||||
// check it's clean up
|
||||
assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1));
|
||||
}
|
||||
|
||||
/***
|
||||
* test one can snapshot the store at various lifecycle stages
|
||||
*/
|
||||
public void testSnapshotStore() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||
final IndexShard shard = test.getShardOrNull(0);
|
||||
client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
|
||||
client().admin().indices().prepareFlush().get();
|
||||
ShardRouting routing = shard.routingEntry();
|
||||
test.removeShard(0, "b/c simon says so");
|
||||
routing = ShardRoutingHelper.reinit(routing);
|
||||
IndexShard newShard = test.createShard(routing);
|
||||
newShard.updateRoutingEntry(routing);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
|
||||
Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
|
||||
localNode));
|
||||
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
|
||||
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted());
|
||||
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
|
||||
newShard.close("test", false);
|
||||
|
||||
snapshot = newShard.snapshotStoreMetadata();
|
||||
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
|
||||
}
|
||||
|
||||
public void testDurableFlagHasEffect() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
|
|
@ -328,15 +328,14 @@ public class StoreTests extends ESTestCase {
|
|||
Store.MetadataSnapshot metadata;
|
||||
// check before we committed
|
||||
try {
|
||||
store.getMetadata();
|
||||
store.getMetadata(null);
|
||||
fail("no index present - expected exception");
|
||||
} catch (IndexNotFoundException ex) {
|
||||
// expected
|
||||
}
|
||||
assertThat(store.getMetadataOrEmpty(), is(Store.MetadataSnapshot.EMPTY)); // nothing committed
|
||||
writer.commit();
|
||||
writer.close();
|
||||
metadata = store.getMetadata();
|
||||
metadata = store.getMetadata(null);
|
||||
assertThat(metadata.asMap().isEmpty(), is(false));
|
||||
for (StoreFileMetaData meta : metadata) {
|
||||
try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) {
|
||||
|
@ -579,7 +578,7 @@ public class StoreTests extends ESTestCase {
|
|||
}
|
||||
writer.commit();
|
||||
writer.close();
|
||||
first = store.getMetadata();
|
||||
first = store.getMetadata(null);
|
||||
assertDeleteContent(store, directoryService);
|
||||
store.close();
|
||||
}
|
||||
|
@ -609,7 +608,7 @@ public class StoreTests extends ESTestCase {
|
|||
}
|
||||
writer.commit();
|
||||
writer.close();
|
||||
second = store.getMetadata();
|
||||
second = store.getMetadata(null);
|
||||
}
|
||||
Store.RecoveryDiff diff = first.recoveryDiff(second);
|
||||
assertThat(first.size(), equalTo(second.size()));
|
||||
|
@ -639,7 +638,7 @@ public class StoreTests extends ESTestCase {
|
|||
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs))));
|
||||
writer.commit();
|
||||
writer.close();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata(null);
|
||||
StoreFileMetaData delFile = null;
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
if (md.name().endsWith(".liv")) {
|
||||
|
@ -674,7 +673,7 @@ public class StoreTests extends ESTestCase {
|
|||
writer.addDocument(docs.get(0));
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot newCommitMetaData = store.getMetadata();
|
||||
Store.MetadataSnapshot newCommitMetaData = store.getMetadata(null);
|
||||
Store.RecoveryDiff newCommitDiff = newCommitMetaData.recoveryDiff(metadata);
|
||||
if (delFile != null) {
|
||||
assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size() - 5)); // segments_N, del file, cfs, cfe, si for the new segment
|
||||
|
@ -723,7 +722,7 @@ public class StoreTests extends ESTestCase {
|
|||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
Store.MetadataSnapshot firstMeta = store.getMetadata();
|
||||
Store.MetadataSnapshot firstMeta = store.getMetadata(null);
|
||||
|
||||
if (random().nextBoolean()) {
|
||||
for (int i = 0; i < docs; i++) {
|
||||
|
@ -738,7 +737,7 @@ public class StoreTests extends ESTestCase {
|
|||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot secondMeta = store.getMetadata();
|
||||
Store.MetadataSnapshot secondMeta = store.getMetadata(null);
|
||||
|
||||
|
||||
if (randomBoolean()) {
|
||||
|
@ -785,13 +784,10 @@ public class StoreTests extends ESTestCase {
|
|||
final AtomicInteger count = new AtomicInteger(0);
|
||||
final ShardLock lock = new DummyShardLock(shardId);
|
||||
|
||||
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, new Store.OnClose() {
|
||||
@Override
|
||||
public void handle(ShardLock theLock) {
|
||||
assertEquals(shardId, theLock.getShardId());
|
||||
assertEquals(lock, theLock);
|
||||
count.incrementAndGet();
|
||||
}
|
||||
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, theLock -> {
|
||||
assertEquals(shardId, theLock.getShardId());
|
||||
assertEquals(lock, theLock);
|
||||
count.incrementAndGet();
|
||||
});
|
||||
assertEquals(count.get(), 0);
|
||||
|
||||
|
@ -917,11 +913,7 @@ public class StoreTests extends ESTestCase {
|
|||
writer.commit();
|
||||
writer.close();
|
||||
Store.MetadataSnapshot metadata;
|
||||
if (randomBoolean()) {
|
||||
metadata = store.getMetadata();
|
||||
} else {
|
||||
metadata = store.getMetadata(deletionPolicy.snapshot());
|
||||
}
|
||||
metadata = store.getMetadata(randomBoolean() ? null : deletionPolicy.snapshot());
|
||||
assertFalse(metadata.asMap().isEmpty());
|
||||
// do not check for correct files, we have enough tests for that above
|
||||
assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
|
||||
|
@ -982,7 +974,7 @@ public class StoreTests extends ESTestCase {
|
|||
|
||||
try {
|
||||
if (randomBoolean()) {
|
||||
store.getMetadata();
|
||||
store.getMetadata(null);
|
||||
} else {
|
||||
store.readLastCommittedSegmentsInfo();
|
||||
}
|
||||
|
|
|
@ -97,7 +97,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
writer.addDocument(document);
|
||||
}
|
||||
writer.commit();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot metadata = store.getMetadata(null);
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
|
@ -116,14 +118,14 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata();
|
||||
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null);
|
||||
Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
|
||||
assertEquals(metas.size(), recoveryDiff.identical.size());
|
||||
assertEquals(0, recoveryDiff.different.size());
|
||||
assertEquals(0, recoveryDiff.missing.size());
|
||||
IndexReader reader = DirectoryReader.open(targetStore.directory());
|
||||
assertEquals(numDocs, reader.maxDoc());
|
||||
IOUtils.close(reader, writer, store, targetStore);
|
||||
IOUtils.close(reader, store, targetStore);
|
||||
}
|
||||
|
||||
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
|
||||
|
@ -157,7 +159,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata(null);
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
|
@ -221,7 +223,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata(null);
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
|
|
Loading…
Reference in New Issue