[Segment Replication] Added source-side classes for orchestrating replication events (#3470)

This change expands on the existing SegmentReplicationSource interface and its corresponding Factory class by introducing an implementation where the replication source is a primary shard (PrimaryShardReplicationSource). These code paths execute on the target. The primary shard implementation creates the requests to be send to the source/primary shard.

Correspondingly, this change also defines two request classes for the GET_CHECKPOINT_INFO and GET_SEGMENT_FILES requests as well as an abstract superclass.

A CopyState class has been introduced that captures point-in-time, file-level details from an IndexShard. This implementation mirrors Lucene's NRT CopyState implementation.

Finally, a service class has been introduce for segment replication that runs on the source side (SegmentReplicationSourceService) which handles these two types of incoming requests. This includes private handler classes that house the logic to respond to these requests, with some functionality stubbed for now. The service class also uses a simple map to cache CopyState objects that would be needed by replication targets.

Unit tests have been added/updated for all new functionality.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
This commit is contained in:
Kartik Ganesh 2022-06-03 10:53:58 -07:00 committed by GitHub
parent 0add9d2e2e
commit b902add3fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1204 additions and 51 deletions

View File

@ -175,6 +175,21 @@ public abstract class Engine implements Closeable {
*/
protected abstract SegmentInfos getLatestSegmentInfos();
/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
* object directly, this method returns a {@link GatedCloseable} reference to the same object.
* This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable}
* which is run when the reference is closed. The default implementation of the clean-up
* procedure is a no-op.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}
public MergeStats getMergeStats() {
return new MergeStats();
}
@ -846,6 +861,12 @@ public abstract class Engine implements Closeable {
*/
public abstract long getPersistedLocalCheckpoint();
/**
* @return the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getPersistedLocalCheckpoint()}
*/
public abstract long getProcessedLocalCheckpoint();
/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/

View File

@ -2305,6 +2305,22 @@ public class InternalEngine extends Engine {
}
}
/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}
@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
@ -2724,6 +2740,7 @@ public class InternalEngine extends Engine {
return getTranslog().getLastSyncedGlobalCheckpoint();
}
@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}

View File

@ -248,6 +248,7 @@ public class NRTReplicationEngine extends Engine {
return localCheckpointTracker.getPersistedCheckpoint();
}
@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}

View File

@ -374,6 +374,13 @@ public class ReadOnlyEngine extends Engine {
return seqNoStats.getLocalCheckpoint();
}
@Override
public long getProcessedLocalCheckpoint() {
// the read-only engine does not process checkpoints, so its
// processed checkpoint is identical to its persisted one.
return getPersistedLocalCheckpoint();
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);

View File

@ -2638,6 +2638,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return getEngine().getPersistedLocalCheckpoint();
}
/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
return getEngine().getProcessedLocalCheckpoint();
}
/**
* Returns the global checkpoint for the shard.
*
@ -4005,4 +4013,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}
/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}

View File

@ -274,6 +274,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return getMetadata(commit, false);
}
/**
* Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}
/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
@ -315,6 +322,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
/**
* Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object.
* In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios
* where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that
* may not have a IndexCommit associated with it, such as with segment replication.
*/
public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
return new MetadataSnapshot(segmentInfos, directory, logger);
}
/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
@ -477,7 +494,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
@ -682,7 +699,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
@ -822,7 +839,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}
MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}
private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
@ -890,40 +914,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
@ -949,6 +942,40 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
throw ex;
}
}
static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

View File

@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;
import java.io.IOException;
/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {
private final ReplicationCheckpoint checkpoint;
public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}
public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}
public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}

View File

@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;
import java.io.IOException;
import java.util.List;
/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {
private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;
public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}
public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}
public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}

View File

@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportService;
import java.util.List;
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO;
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES;
/**
* Implementation of a {@link SegmentReplicationSource} where the source is a primary node.
* This code executes on the target node.
*
* @opensearch.internal
*/
public class PrimaryShardReplicationSource implements SegmentReplicationSource {
private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class);
private final RetryableTransportClient transportClient;
private final DiscoveryNode targetNode;
private final String targetAllocationId;
public PrimaryShardReplicationSource(
DiscoveryNode targetNode,
String targetAllocationId,
TransportService transportService,
RecoverySettings recoverySettings,
DiscoveryNode sourceNode
) {
this.targetAllocationId = targetAllocationId;
this.transportClient = new RetryableTransportClient(
transportService,
sourceNode,
recoverySettings.internalActionRetryTimeout(),
logger
);
this.targetNode = targetNode;
}
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
final Writeable.Reader<CheckpointInfoResponse> reader = CheckpointInfoResponse::new;
final ActionListener<CheckpointInfoResponse> responseListener = ActionListener.map(listener, r -> r);
final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint);
transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader);
}
@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
replicationId,
targetAllocationId,
targetNode,
filesToFetch,
checkpoint
);
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
}
}

View File

@ -8,8 +8,11 @@
package org.opensearch.indices.replication;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;
@ -35,7 +38,17 @@ public class SegmentReplicationSourceFactory {
}
public SegmentReplicationSource get(IndexShard shard) {
// TODO: Default to an implementation that uses the primary shard.
return null;
return new PrimaryShardReplicationSource(
clusterService.localNode(),
shard.routingEntry().allocationId().getId(),
transportService,
recoverySettings,
getPrimaryNode(shard.shardId())
);
}
private DiscoveryNode getPrimaryNode(ShardId shardId) {
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
return clusterService.state().nodes().get(primaryShard.currentNodeId());
}
}

View File

@ -0,0 +1,160 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Service class that handles segment replication requests from replica shards.
* Typically, the "source" is a primary shard. This code executes on the source node.
*
* @opensearch.internal
*/
public class SegmentReplicationSourceService {
private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
/**
* Internal actions used by the segment replication source service on the primary shard
*
* @opensearch.internal
*/
public static class Actions {
public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info";
public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files";
}
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
private final TransportService transportService;
private final IndicesService indicesService;
// TODO mark this as injected and bind in Node
public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) {
copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.transportService = transportService;
this.indicesService = indicesService;
transportService.registerRequestHandler(
Actions.GET_CHECKPOINT_INFO,
ThreadPool.Names.GENERIC,
CheckpointInfoRequest::new,
new CheckpointInfoRequestHandler()
);
transportService.registerRequestHandler(
Actions.GET_SEGMENT_FILES,
ThreadPool.Names.GENERIC,
GetSegmentFilesRequest::new,
new GetSegmentFilesRequestHandler()
);
}
private class CheckpointInfoRequestHandler implements TransportRequestHandler<CheckpointInfoRequest> {
@Override
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received request for checkpoint {}", checkpoint);
final CopyState copyState = getCachedCopyState(checkpoint);
channel.sendResponse(
new CheckpointInfoResponse(
copyState.getCheckpoint(),
copyState.getMetadataSnapshot(),
copyState.getInfosBytes(),
copyState.getPendingDeleteFiles()
)
);
}
}
class GetSegmentFilesRequestHandler implements TransportRequestHandler<GetSegmentFilesRequest> {
@Override
public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception {
if (isInCopyStateMap(request.getCheckpoint())) {
// TODO send files
} else {
// Return an empty list of files
channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
}
}
/**
* Operations on the {@link #copyStateMap} member.
*/
/**
* A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key
* and returns the cached value if one is present. If the key is not present, a {@link CopyState}
* object is constructed and stored in the map before being returned.
*/
private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException {
if (isInCopyStateMap(checkpoint)) {
final CopyState copyState = fetchFromCopyStateMap(checkpoint);
copyState.incRef();
return copyState;
} else {
// From the checkpoint's shard ID, fetch the IndexShard
ShardId shardId = checkpoint.getShardId();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
// build the CopyState object and cache it before returning
final CopyState copyState = new CopyState(indexShard);
/**
* Use the checkpoint from the request as the key in the map, rather than
* the checkpoint from the created CopyState. This maximizes cache hits
* if replication targets make a request with an older checkpoint.
* Replication targets are expected to fetch the checkpoint in the response
* CopyState to bring themselves up to date.
*/
addToCopyStateMap(checkpoint, copyState);
return copyState;
}
}
/**
* Adds the input {@link CopyState} object to {@link #copyStateMap}.
* The key is the CopyState's {@link ReplicationCheckpoint} object.
*/
private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
copyStateMap.putIfAbsent(checkpoint, copyState);
}
/**
* Given a {@link ReplicationCheckpoint}, return the corresponding
* {@link CopyState} object, if any, from {@link #copyStateMap}.
*/
private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.get(replicationCheckpoint);
}
/**
* Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint}
* as a key by invoking {@link Map#containsKey(Object)}.
*/
private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.containsKey(replicationCheckpoint);
}
}

View File

@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.Set;
/**
* An Opensearch-specific version of Lucene's CopyState class that
* holds incRef'd file level details for one point-in-time segment infos.
*
* @opensearch.internal
*/
public class CopyState extends AbstractRefCounted {
private final GatedCloseable<SegmentInfos> segmentInfosRef;
private final ReplicationCheckpoint replicationCheckpoint;
private final Store.MetadataSnapshot metadataSnapshot;
private final HashSet<StoreFileMetadata> pendingDeleteFiles;
private final byte[] infosBytes;
private GatedCloseable<IndexCommit> commitRef;
public CopyState(IndexShard shard) throws IOException {
super("CopyState-" + shard.shardId());
this.segmentInfosRef = shard.getSegmentInfosSnapshot();
SegmentInfos segmentInfos = this.segmentInfosRef.get();
this.metadataSnapshot = shard.store().getMetadata(segmentInfos);
this.replicationCheckpoint = new ReplicationCheckpoint(
shard.shardId(),
shard.getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shard.getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
);
// Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N.
// This ensures that the store on replicas is in sync with the store on primaries.
this.commitRef = shard.acquireLastIndexCommit(false);
Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get());
final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot);
this.pendingDeleteFiles = new HashSet<>(diff.missing);
if (this.pendingDeleteFiles.isEmpty()) {
// If there are no additional files we can release the last commit immediately.
this.commitRef.close();
this.commitRef = null;
}
ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
// resource description and name are not used, but resource description cannot be null
try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) {
segmentInfos.write(indexOutput);
}
this.infosBytes = buffer.toArrayCopy();
}
@Override
protected void closeInternal() {
try {
segmentInfosRef.close();
// commitRef may be null if there were no pending delete files
if (commitRef != null) {
commitRef.close();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public ReplicationCheckpoint getCheckpoint() {
return replicationCheckpoint;
}
public Store.MetadataSnapshot getMetadataSnapshot() {
return metadataSnapshot;
}
public byte[] getInfosBytes() {
return infosBytes;
}
public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}

View File

@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;
import java.io.IOException;
/**
* Abstract base class for transport-layer requests related to segment replication.
*
* @opensearch.internal
*/
public abstract class SegmentReplicationTransportRequest extends TransportRequest {
private final long replicationId;
private final String targetAllocationId;
private final DiscoveryNode targetNode;
protected SegmentReplicationTransportRequest(long replicationId, String targetAllocationId, DiscoveryNode targetNode) {
this.replicationId = replicationId;
this.targetAllocationId = targetAllocationId;
this.targetNode = targetNode;
}
protected SegmentReplicationTransportRequest(StreamInput in) throws IOException {
super(in);
this.replicationId = in.readLong();
this.targetAllocationId = in.readString();
this.targetNode = new DiscoveryNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(this.replicationId);
out.writeString(this.targetAllocationId);
targetNode.writeTo(out);
}
}

View File

@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
public class EngineConfigTests extends OpenSearchTestCase {
private IndexSettings defaultIndexSettings;
@Override
public void setUp() throws Exception {
super.setUp();
final IndexMetadata defaultIndexMetadata = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
defaultIndexSettings = IndexSettingsModule.newIndexSettings("test", defaultIndexMetadata.getSettings());
}
public void testEngineConfig_DefaultValueForReadOnlyEngine() {
EngineConfig config = new EngineConfig(
null,
null,
defaultIndexSettings,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
() -> RetentionLeases.EMPTY,
null,
null
);
assertFalse(config.isReadOnlyReplica());
}
public void testEngineConfig_ReadOnlyEngineWithSegRepDisabled() {
expectThrows(IllegalArgumentException.class, () -> createReadOnlyEngine(defaultIndexSettings));
}
public void testEngineConfig_ReadOnlyEngineWithSegRepEnabled() {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"test",
Settings.builder()
.put(defaultIndexSettings.getSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
EngineConfig engineConfig = createReadOnlyEngine(indexSettings);
assertTrue(engineConfig.isReadOnlyReplica());
}
private EngineConfig createReadOnlyEngine(IndexSettings indexSettings) {
return new EngineConfig(
null,
null,
indexSettings,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
() -> RetentionLeases.EMPTY,
null,
null,
true
);
}
}

View File

@ -211,7 +211,9 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
@ -7384,4 +7386,33 @@ public class InternalEngineTests extends EngineTestCase {
restoreIndexWriterMaxDocs();
}
}
public void testGetSegmentInfosSnapshot() throws IOException {
IOUtils.close(store, engine);
Store store = createStore();
InternalEngine engine = spy(createEngine(store, createTempDir()));
GatedCloseable<SegmentInfos> segmentInfosSnapshot = engine.getSegmentInfosSnapshot();
assertNotNull(segmentInfosSnapshot);
assertNotNull(segmentInfosSnapshot.get());
verify(engine, times(1)).getLatestSegmentInfos();
store.close();
engine.close();
}
public void testGetProcessedLocalCheckpoint() throws IOException {
final long expectedLocalCheckpoint = 1L;
IOUtils.close(store, engine);
// set up mock
final LocalCheckpointTracker mockCheckpointTracker = mock(LocalCheckpointTracker.class);
when(mockCheckpointTracker.getProcessedCheckpoint()).thenReturn(expectedLocalCheckpoint);
Store store = createStore();
InternalEngine engine = createEngine(store, createTempDir(), (a, b) -> mockCheckpointTracker);
long actualLocalCheckpoint = engine.getProcessedLocalCheckpoint();
assertEquals(expectedLocalCheckpoint, actualLocalCheckpoint);
verify(mockCheckpointTracker, atLeastOnce()).getProcessedCheckpoint();
store.close();
engine.close();
}
}

View File

@ -107,6 +107,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
for (int i = 0; i < numDocs; i++) {
@ -131,6 +132,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
IOUtils.close(external, internal);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) {
@ -142,6 +144,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
}

View File

@ -364,14 +364,14 @@ public class StoreTests extends OpenSearchTestCase {
Store.MetadataSnapshot metadata;
// check before we committed
try {
store.getMetadata(null);
store.getMetadata();
fail("no index present - expected exception");
} catch (IndexNotFoundException ex) {
// expected
}
writer.commit();
writer.close();
metadata = store.getMetadata(null);
metadata = store.getMetadata();
assertThat(metadata.asMap().isEmpty(), is(false));
for (StoreFileMetadata meta : metadata) {
try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) {
@ -552,7 +552,7 @@ public class StoreTests extends OpenSearchTestCase {
}
writer.commit();
writer.close();
first = store.getMetadata(null);
first = store.getMetadata();
assertDeleteContent(store, store.directory());
store.close();
}
@ -581,7 +581,7 @@ public class StoreTests extends OpenSearchTestCase {
}
writer.commit();
writer.close();
second = store.getMetadata(null);
second = store.getMetadata();
}
Store.RecoveryDiff diff = first.recoveryDiff(second);
assertThat(first.size(), equalTo(second.size()));
@ -610,7 +610,7 @@ public class StoreTests extends OpenSearchTestCase {
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs))));
writer.commit();
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata(null);
Store.MetadataSnapshot metadata = store.getMetadata();
StoreFileMetadata delFile = null;
for (StoreFileMetadata md : metadata) {
if (md.name().endsWith(".liv")) {
@ -645,7 +645,7 @@ public class StoreTests extends OpenSearchTestCase {
writer.addDocument(docs.get(0));
writer.close();
Store.MetadataSnapshot newCommitMetadata = store.getMetadata(null);
Store.MetadataSnapshot newCommitMetadata = store.getMetadata();
Store.RecoveryDiff newCommitDiff = newCommitMetadata.recoveryDiff(metadata);
if (delFile != null) {
assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 5)); // segments_N, del file, cfs, cfe, si for the
@ -710,7 +710,7 @@ public class StoreTests extends OpenSearchTestCase {
writer.addDocument(doc);
}
Store.MetadataSnapshot firstMeta = store.getMetadata(null);
Store.MetadataSnapshot firstMeta = store.getMetadata();
if (random().nextBoolean()) {
for (int i = 0; i < docs; i++) {
@ -731,7 +731,7 @@ public class StoreTests extends OpenSearchTestCase {
writer.commit();
writer.close();
Store.MetadataSnapshot secondMeta = store.getMetadata(null);
Store.MetadataSnapshot secondMeta = store.getMetadata();
if (randomBoolean()) {
store.cleanupAndVerify("test", firstMeta);
@ -1000,7 +1000,7 @@ public class StoreTests extends OpenSearchTestCase {
try {
if (randomBoolean()) {
store.getMetadata(null);
store.getMetadata();
} else {
store.readLastCommittedSegmentsInfo();
}
@ -1138,4 +1138,15 @@ public class StoreTests extends OpenSearchTestCase {
}
}
}
public void testGetMetadataWithSegmentInfos() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId));
store.createEmpty(Version.LATEST);
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
Store.MetadataSnapshot metadataSnapshot = store.getMetadata(segmentInfos);
// loose check for equality
assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name());
store.close();
}
}

View File

@ -85,7 +85,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
indexDoc(sourceShard, "_doc", Integer.toString(i));
}
sourceShard.flush(new FlushRequest());
Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(null);
Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata();
List<StoreFileMetadata> mdFiles = new ArrayList<>();
for (StoreFileMetadata md : sourceSnapshot) {
mdFiles.add(md);

View File

@ -189,7 +189,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
writer.commit();
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata(null);
Store.MetadataSnapshot metadata = store.getMetadata();
ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex();
List<StoreFileMetadata> metas = new ArrayList<>();
for (StoreFileMetadata md : metadata) {
@ -226,7 +226,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
PlainActionFuture<Void> sendFilesFuture = new PlainActionFuture<>();
handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture);
sendFilesFuture.actionGet();
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null);
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata();
Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
assertEquals(metas.size(), recoveryDiff.identical.size());
assertEquals(0, recoveryDiff.different.size());
@ -512,7 +512,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
writer.close();
ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex();
Store.MetadataSnapshot metadata = store.getMetadata(null);
Store.MetadataSnapshot metadata = store.getMetadata();
List<StoreFileMetadata> metas = new ArrayList<>();
for (StoreFileMetadata md : metadata) {
metas.add(md);
@ -594,7 +594,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
writer.commit();
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata(null);
Store.MetadataSnapshot metadata = store.getMetadata();
List<StoreFileMetadata> metas = new ArrayList<>();
for (StoreFileMetadata md : metadata) {
metas.add(md);

View File

@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.apache.lucene.util.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.transport.TransportService;
import java.util.Arrays;
import java.util.Collections;
import static org.mockito.Mockito.mock;
public class PrimaryShardReplicationSourceTests extends IndexShardTestCase {
private static final long PRIMARY_TERM = 1L;
private static final long SEGMENTS_GEN = 2L;
private static final long SEQ_NO = 3L;
private static final long VERSION = 4L;
private static final long REPLICATION_ID = 123L;
private CapturingTransport transport;
private ClusterService clusterService;
private TransportService transportService;
private PrimaryShardReplicationSource replicationSource;
private IndexShard indexShard;
private DiscoveryNode sourceNode;
@Override
public void setUp() throws Exception {
super.setUp();
final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
transport = new CapturingTransport();
sourceNode = newDiscoveryNode("sourceNode");
final DiscoveryNode localNode = newDiscoveryNode("localNode");
clusterService = ClusterServiceUtils.createClusterService(threadPool, localNode);
transportService = transport.createTransportService(
clusterService.getSettings(),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(),
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
indexShard = newStartedShard(true);
replicationSource = new PrimaryShardReplicationSource(
localNode,
indexShard.routingEntry().allocationId().toString(),
transportService,
recoverySettings,
sourceNode
);
}
@Override
public void tearDown() throws Exception {
IOUtils.close(transportService, clusterService, transport);
closeShards(indexShard);
super.tearDown();
}
public void testGetCheckpointMetadata() {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class));
CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear();
assertEquals(1, requestList.length);
CapturingTransport.CapturedRequest capturedRequest = requestList[0];
assertEquals(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, capturedRequest.action);
assertEquals(sourceNode, capturedRequest.node);
assertTrue(capturedRequest.request instanceof CheckpointInfoRequest);
}
public void testGetSegmentFiles() {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
checkpoint,
Arrays.asList(testMetadata),
mock(Store.class),
mock(ActionListener.class)
);
CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear();
assertEquals(1, requestList.length);
CapturingTransport.CapturedRequest capturedRequest = requestList[0];
assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action);
assertEquals(sourceNode, capturedRequest.node);
assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest);
}
private DiscoveryNode newDiscoveryNode(String nodeName) {
return new DiscoveryNode(
nodeName,
randomAlphaOfLength(10),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
org.opensearch.Version.CURRENT
);
}
}

View File

@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyStateTests;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase {
private ShardId testShardId;
private ReplicationCheckpoint testCheckpoint;
private IndicesService mockIndicesService;
private IndexService mockIndexService;
private IndexShard mockIndexShard;
private TestThreadPool testThreadPool;
private CapturingTransport transport;
private TransportService transportService;
private DiscoveryNode localNode;
private SegmentReplicationSourceService segmentReplicationSourceService;
@Override
public void setUp() throws Exception {
super.setUp();
// setup mocks
mockIndexShard = CopyStateTests.createMockIndexShard();
testShardId = mockIndexShard.shardId();
mockIndicesService = mock(IndicesService.class);
mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard);
// This mirrors the creation of the ReplicationCheckpoint inside CopyState
testCheckpoint = new ReplicationCheckpoint(
testShardId,
mockIndexShard.getOperationPrimaryTerm(),
0L,
mockIndexShard.getProcessedLocalCheckpoint(),
0L
);
testThreadPool = new TestThreadPool("test", Settings.EMPTY);
transport = new CapturingTransport();
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
transportService = transport.createTransportService(
Settings.EMPTY,
testThreadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> localNode,
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService);
}
@Override
public void tearDown() throws Exception {
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
testThreadPool = null;
super.tearDown();
}
public void testGetSegmentFiles_EmptyResponse() {
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
1,
"allocationId",
localNode,
Collections.emptyList(),
testCheckpoint
);
transportService.sendRequest(
localNode,
SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES,
request,
new TransportResponseHandler<GetSegmentFilesResponse>() {
@Override
public void handleResponse(GetSegmentFilesResponse response) {
assertEquals(0, response.files.size());
}
@Override
public void handleException(TransportException e) {
fail("unexpected exception: " + e);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public GetSegmentFilesResponse read(StreamInput in) throws IOException {
return new GetSegmentFilesResponse(in);
}
}
);
}
public void testCheckpointInfo() {
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint);
transportService.sendRequest(
localNode,
SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO,
request,
new TransportResponseHandler<CheckpointInfoResponse>() {
@Override
public void handleResponse(CheckpointInfoResponse response) {
assertEquals(testCheckpoint, response.getCheckpoint());
assertNotNull(response.getInfosBytes());
// CopyStateTests sets up one pending delete file and one committed segments file
assertEquals(1, response.getPendingDeleteFiles().size());
assertEquals(1, response.getSnapshot().size());
}
@Override
public void handleException(TransportException e) {
fail("unexpected exception: " + e);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public CheckpointInfoResponse read(StreamInput in) throws IOException {
return new CheckpointInfoResponse(in);
}
}
);
}
}

View File

@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.common.collect.Map;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import java.io.IOException;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CopyStateTests extends IndexShardTestCase {
private static final long EXPECTED_LONG_VALUE = 1L;
private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0);
private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST);
private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST);
private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot(
Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE, PENDING_DELETE_FILE.name(), PENDING_DELETE_FILE),
null,
0
);
private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot(
Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE),
null,
0
);
public void testCopyStateCreation() throws IOException {
CopyState copyState = new CopyState(createMockIndexShard());
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(TEST_SHARD_ID, checkpoint.getShardId());
// version was never set so this should be zero
assertEquals(0, checkpoint.getSegmentInfosVersion());
assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm());
Set<StoreFileMetadata> pendingDeleteFiles = copyState.getPendingDeleteFiles();
assertEquals(1, pendingDeleteFiles.size());
assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE));
}
public static IndexShard createMockIndexShard() throws IOException {
IndexShard mockShard = mock(IndexShard.class);
when(mockShard.shardId()).thenReturn(TEST_SHARD_ID);
when(mockShard.getOperationPrimaryTerm()).thenReturn(EXPECTED_LONG_VALUE);
when(mockShard.getProcessedLocalCheckpoint()).thenReturn(EXPECTED_LONG_VALUE);
Store mockStore = mock(Store.class);
when(mockShard.store()).thenReturn(mockStore);
SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major);
when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {}));
when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT);
IndexCommit mockIndexCommit = mock(IndexCommit.class);
when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {}));
when(mockStore.getMetadata(mockIndexCommit)).thenReturn(COMMIT_SNAPSHOT);
return mockShard;
}
}