diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c242d98b4b6..4829148322b 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -175,6 +175,21 @@ public abstract class Engine implements Closeable { */ protected abstract SegmentInfos getLatestSegmentInfos(); + /** + * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} + * object directly, this method returns a {@link GatedCloseable} reference to the same object. + * This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable} + * which is run when the reference is closed. The default implementation of the clean-up + * procedure is a no-op. + * + * @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that + * must be closed for segment files to be deleted. + */ + public GatedCloseable getSegmentInfosSnapshot() { + // default implementation + return new GatedCloseable<>(getLatestSegmentInfos(), () -> {}); + } + public MergeStats getMergeStats() { return new MergeStats(); } @@ -846,6 +861,12 @@ public abstract class Engine implements Closeable { */ public abstract long getPersistedLocalCheckpoint(); + /** + * @return the latest checkpoint that has been processed but not necessarily persisted. + * Also see {@link #getPersistedLocalCheckpoint()} + */ + public abstract long getProcessedLocalCheckpoint(); + /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e60e650372e..b63a39ebb12 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2305,6 +2305,22 @@ public class InternalEngine extends Engine { } } + /** + * Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()} + * but also increment the ref-count to ensure that these segment files are retained + * until the reference is closed. On close, the ref-count is decremented. + */ + @Override + public GatedCloseable getSegmentInfosSnapshot() { + final SegmentInfos segmentInfos = getLatestSegmentInfos(); + try { + indexWriter.incRefDeleter(segmentInfos); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } + return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos)); + } + @Override protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); @@ -2724,6 +2740,7 @@ public class InternalEngine extends Engine { return getTranslog().getLastSyncedGlobalCheckpoint(); } + @Override public long getProcessedLocalCheckpoint() { return localCheckpointTracker.getProcessedCheckpoint(); } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 106643198cc..e4f4bbbba8f 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -248,6 +248,7 @@ public class NRTReplicationEngine extends Engine { return localCheckpointTracker.getPersistedCheckpoint(); } + @Override public long getProcessedLocalCheckpoint() { return localCheckpointTracker.getProcessedCheckpoint(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 23a86d8da55..6262a9269c0 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -374,6 +374,13 @@ public class ReadOnlyEngine extends Engine { return seqNoStats.getLocalCheckpoint(); } + @Override + public long getProcessedLocalCheckpoint() { + // the read-only engine does not process checkpoints, so its + // processed checkpoint is identical to its persisted one. + return getPersistedLocalCheckpoint(); + } + @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 995a92e94ae..5d11c34ca20 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2638,6 +2638,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return getEngine().getPersistedLocalCheckpoint(); } + /** + * Fetch the latest checkpoint that has been processed but not necessarily persisted. + * Also see {@link #getLocalCheckpoint()}. + */ + public long getProcessedLocalCheckpoint() { + return getEngine().getProcessedLocalCheckpoint(); + } + /** * Returns the global checkpoint for the shard. * @@ -4005,4 +4013,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl RetentionLeaseSyncer getRetentionLeaseSyncer() { return retentionLeaseSyncer; } + + /** + * Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped + * by a a {@link GatedCloseable} to ensure files are not deleted/merged away. + * + * @throws EngineException - When segment infos cannot be safely retrieved + */ + public GatedCloseable getSegmentInfosSnapshot() { + return getEngine().getSegmentInfosSnapshot(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 65c47f66b76..f818456c3a2 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -274,6 +274,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return getMetadata(commit, false); } + /** + * Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input. + */ + public MetadataSnapshot getMetadata() throws IOException { + return getMetadata(null, false); + } + /** * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. @@ -315,6 +322,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object. + * In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios + * where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that + * may not have a IndexCommit associated with it, such as with segment replication. + */ + public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException { + return new MetadataSnapshot(segmentInfos, directory, logger); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -477,7 +494,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref Directory dir = new NIOFSDirectory(indexLocation) ) { failIfCorrupted(dir); - return new MetadataSnapshot(null, dir, logger); + return new MetadataSnapshot((IndexCommit) null, dir, logger); } catch (IndexNotFoundException ex) { // that's fine - happens all the time no need to log } catch (FileNotFoundException | NoSuchFileException ex) { @@ -682,7 +699,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } directory.syncMetaData(); - final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null); + final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); verifyAfterCleanup(sourceMetadata, metadataOrEmpty); } finally { metadataLock.writeLock().unlock(); @@ -822,7 +839,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException { - LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger); + this(loadMetadata(commit, directory, logger)); + } + + MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + this(loadMetadata(segmentInfos, directory, logger)); + } + + private MetadataSnapshot(LoadedMetadata loadedMetadata) { metadata = loadedMetadata.fileMetadata; commitUserData = loadedMetadata.userData; numDocs = loadedMetadata.numDocs; @@ -890,40 +914,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException { - long numDocs; - Map builder = new HashMap<>(); - Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(segmentCommitInfos); - commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); - // we don't know which version was used to write so we take the max version. - Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); - for (SegmentCommitInfo info : segmentCommitInfos) { - final Version version = info.info.getVersion(); - if (version == null) { - // version is written since 3.1+: we should have already hit IndexFormatTooOld. - throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); - } - if (version.onOrAfter(maxVersion)) { - maxVersion = version; - } - for (String file : info.files()) { - checksumFromLuceneFile( - directory, - file, - builder, - logger, - version, - SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)) - ); - } - } - if (maxVersion == null) { - maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; - } - final String segmentsFile = segmentCommitInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + return loadMetadata(segmentCommitInfos, directory, logger); } catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we either know the index is corrupted or it's just not there throw ex; @@ -949,6 +942,40 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } throw ex; } + } + + static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + long numDocs = Lucene.getNumDocs(segmentInfos); + Map commitUserDataBuilder = new HashMap<>(); + commitUserDataBuilder.putAll(segmentInfos.getUserData()); + Map builder = new HashMap<>(); + // we don't know which version was used to write so we take the max version. + Version maxVersion = segmentInfos.getMinSegmentLuceneVersion(); + for (SegmentCommitInfo info : segmentInfos) { + final Version version = info.info.getVersion(); + if (version == null) { + // version is written since 3.1+: we should have already hit IndexFormatTooOld. + throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); + } + if (version.onOrAfter(maxVersion)) { + maxVersion = version; + } + for (String file : info.files()) { + checksumFromLuceneFile( + directory, + file, + builder, + logger, + version, + SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)) + ); + } + } + if (maxVersion == null) { + maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; + } + final String segmentsFile = segmentInfos.getSegmentsFileName(); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java new file mode 100644 index 00000000000..188a4c1e40f --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest; + +import java.io.IOException; + +/** + * Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from + * a {@link SegmentReplicationSource}. This object is created by the target node and sent + * to the source node. + * + * @opensearch.internal + */ +public class CheckpointInfoRequest extends SegmentReplicationTransportRequest { + + private final ReplicationCheckpoint checkpoint; + + public CheckpointInfoRequest(StreamInput in) throws IOException { + super(in); + checkpoint = new ReplicationCheckpoint(in); + } + + public CheckpointInfoRequest( + long replicationId, + String targetAllocationId, + DiscoveryNode targetNode, + ReplicationCheckpoint checkpoint + ) { + super(replicationId, targetAllocationId, targetNode); + this.checkpoint = checkpoint; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + checkpoint.writeTo(out); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java new file mode 100644 index 00000000000..21749d3fe7d --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest; + +import java.io.IOException; +import java.util.List; + +/** + * Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}. + * This object is created by the target node and sent to the source node. + * + * @opensearch.internal + */ +public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest { + + private final List filesToFetch; + private final ReplicationCheckpoint checkpoint; + + public GetSegmentFilesRequest(StreamInput in) throws IOException { + super(in); + this.filesToFetch = in.readList(StoreFileMetadata::new); + this.checkpoint = new ReplicationCheckpoint(in); + } + + public GetSegmentFilesRequest( + long replicationId, + String targetAllocationId, + DiscoveryNode targetNode, + List filesToFetch, + ReplicationCheckpoint checkpoint + ) { + super(replicationId, targetAllocationId, targetNode); + this.filesToFetch = filesToFetch; + this.checkpoint = checkpoint; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(filesToFetch); + checkpoint.writeTo(out); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java new file mode 100644 index 00000000000..08dc0b97b31 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportService; + +import java.util.List; + +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO; +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES; + +/** + * Implementation of a {@link SegmentReplicationSource} where the source is a primary node. + * This code executes on the target node. + * + * @opensearch.internal + */ +public class PrimaryShardReplicationSource implements SegmentReplicationSource { + + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + + private final RetryableTransportClient transportClient; + private final DiscoveryNode targetNode; + private final String targetAllocationId; + + public PrimaryShardReplicationSource( + DiscoveryNode targetNode, + String targetAllocationId, + TransportService transportService, + RecoverySettings recoverySettings, + DiscoveryNode sourceNode + ) { + this.targetAllocationId = targetAllocationId; + this.transportClient = new RetryableTransportClient( + transportService, + sourceNode, + recoverySettings.internalActionRetryTimeout(), + logger + ); + this.targetNode = targetNode; + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + final Writeable.Reader reader = CheckpointInfoResponse::new; + final ActionListener responseListener = ActionListener.map(listener, r -> r); + final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint); + transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + final Writeable.Reader reader = GetSegmentFilesResponse::new; + final ActionListener responseListener = ActionListener.map(listener, r -> r); + final GetSegmentFilesRequest request = new GetSegmentFilesRequest( + replicationId, + targetAllocationId, + targetNode, + filesToFetch, + checkpoint + ); + transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 3ca31503f17..afbb80d2638 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -8,8 +8,11 @@ package org.opensearch.indices.replication; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.transport.TransportService; @@ -35,7 +38,17 @@ public class SegmentReplicationSourceFactory { } public SegmentReplicationSource get(IndexShard shard) { - // TODO: Default to an implementation that uses the primary shard. - return null; + return new PrimaryShardReplicationSource( + clusterService.localNode(), + shard.routingEntry().allocationId().getId(), + transportService, + recoverySettings, + getPrimaryNode(shard.shardId()) + ); + } + + private DiscoveryNode getPrimaryNode(ShardId shardId) { + ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard(); + return clusterService.state().nodes().get(primaryShard.currentNodeId()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java new file mode 100644 index 00000000000..9f70120dedd --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Service class that handles segment replication requests from replica shards. + * Typically, the "source" is a primary shard. This code executes on the source node. + * + * @opensearch.internal + */ +public class SegmentReplicationSourceService { + + private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class); + + /** + * Internal actions used by the segment replication source service on the primary shard + * + * @opensearch.internal + */ + public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; + public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; + } + + private final Map copyStateMap; + private final TransportService transportService; + private final IndicesService indicesService; + + // TODO mark this as injected and bind in Node + public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) { + copyStateMap = Collections.synchronizedMap(new HashMap<>()); + this.transportService = transportService; + this.indicesService = indicesService; + + transportService.registerRequestHandler( + Actions.GET_CHECKPOINT_INFO, + ThreadPool.Names.GENERIC, + CheckpointInfoRequest::new, + new CheckpointInfoRequestHandler() + ); + transportService.registerRequestHandler( + Actions.GET_SEGMENT_FILES, + ThreadPool.Names.GENERIC, + GetSegmentFilesRequest::new, + new GetSegmentFilesRequestHandler() + ); + } + + private class CheckpointInfoRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { + final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + logger.trace("Received request for checkpoint {}", checkpoint); + final CopyState copyState = getCachedCopyState(checkpoint); + channel.sendResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + } + + class GetSegmentFilesRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception { + if (isInCopyStateMap(request.getCheckpoint())) { + // TODO send files + } else { + // Return an empty list of files + channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + } + } + + /** + * Operations on the {@link #copyStateMap} member. + */ + + /** + * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key + * and returns the cached value if one is present. If the key is not present, a {@link CopyState} + * object is constructed and stored in the map before being returned. + */ + private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { + if (isInCopyStateMap(checkpoint)) { + final CopyState copyState = fetchFromCopyStateMap(checkpoint); + copyState.incRef(); + return copyState; + } else { + // From the checkpoint's shard ID, fetch the IndexShard + ShardId shardId = checkpoint.getShardId(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + // build the CopyState object and cache it before returning + final CopyState copyState = new CopyState(indexShard); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); + return copyState; + } + } + + /** + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. + */ + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java new file mode 100644 index 00000000000..250df348143 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashSet; +import java.util.Set; + +/** + * An Opensearch-specific version of Lucene's CopyState class that + * holds incRef'd file level details for one point-in-time segment infos. + * + * @opensearch.internal + */ +public class CopyState extends AbstractRefCounted { + + private final GatedCloseable segmentInfosRef; + private final ReplicationCheckpoint replicationCheckpoint; + private final Store.MetadataSnapshot metadataSnapshot; + private final HashSet pendingDeleteFiles; + private final byte[] infosBytes; + private GatedCloseable commitRef; + + public CopyState(IndexShard shard) throws IOException { + super("CopyState-" + shard.shardId()); + this.segmentInfosRef = shard.getSegmentInfosSnapshot(); + SegmentInfos segmentInfos = this.segmentInfosRef.get(); + this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + this.replicationCheckpoint = new ReplicationCheckpoint( + shard.shardId(), + shard.getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + shard.getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ); + + // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. + // This ensures that the store on replicas is in sync with the store on primaries. + this.commitRef = shard.acquireLastIndexCommit(false); + Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get()); + final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); + this.pendingDeleteFiles = new HashSet<>(diff.missing); + if (this.pendingDeleteFiles.isEmpty()) { + // If there are no additional files we can release the last commit immediately. + this.commitRef.close(); + this.commitRef = null; + } + + ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); + // resource description and name are not used, but resource description cannot be null + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + segmentInfos.write(indexOutput); + } + this.infosBytes = buffer.toArrayCopy(); + } + + @Override + protected void closeInternal() { + try { + segmentInfosRef.close(); + // commitRef may be null if there were no pending delete files + if (commitRef != null) { + commitRef.close(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public ReplicationCheckpoint getCheckpoint() { + return replicationCheckpoint; + } + + public Store.MetadataSnapshot getMetadataSnapshot() { + return metadataSnapshot; + } + + public byte[] getInfosBytes() { + return infosBytes; + } + + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java new file mode 100644 index 00000000000..db8206d131c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * Abstract base class for transport-layer requests related to segment replication. + * + * @opensearch.internal + */ +public abstract class SegmentReplicationTransportRequest extends TransportRequest { + + private final long replicationId; + private final String targetAllocationId; + private final DiscoveryNode targetNode; + + protected SegmentReplicationTransportRequest(long replicationId, String targetAllocationId, DiscoveryNode targetNode) { + this.replicationId = replicationId; + this.targetAllocationId = targetAllocationId; + this.targetNode = targetNode; + } + + protected SegmentReplicationTransportRequest(StreamInput in) throws IOException { + super(in); + this.replicationId = in.readLong(); + this.targetAllocationId = in.readString(); + this.targetNode = new DiscoveryNode(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(this.replicationId); + out.writeString(this.targetAllocationId); + targetNode.writeTo(out); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java new file mode 100644 index 00000000000..1c6d06e9bcc --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +public class EngineConfigTests extends OpenSearchTestCase { + + private IndexSettings defaultIndexSettings; + + @Override + public void setUp() throws Exception { + super.setUp(); + final IndexMetadata defaultIndexMetadata = IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + defaultIndexSettings = IndexSettingsModule.newIndexSettings("test", defaultIndexMetadata.getSettings()); + } + + public void testEngineConfig_DefaultValueForReadOnlyEngine() { + EngineConfig config = new EngineConfig( + null, + null, + defaultIndexSettings, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + () -> RetentionLeases.EMPTY, + null, + null + ); + assertFalse(config.isReadOnlyReplica()); + } + + public void testEngineConfig_ReadOnlyEngineWithSegRepDisabled() { + expectThrows(IllegalArgumentException.class, () -> createReadOnlyEngine(defaultIndexSettings)); + } + + public void testEngineConfig_ReadOnlyEngineWithSegRepEnabled() { + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultIndexSettings.getSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + EngineConfig engineConfig = createReadOnlyEngine(indexSettings); + assertTrue(engineConfig.isReadOnlyReplica()); + } + + private EngineConfig createReadOnlyEngine(IndexSettings indexSettings) { + return new EngineConfig( + null, + null, + indexSettings, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + () -> RetentionLeases.EMPTY, + null, + null, + true + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cbae55a047a..b14ad150701 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -211,7 +211,9 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; @@ -7384,4 +7386,33 @@ public class InternalEngineTests extends EngineTestCase { restoreIndexWriterMaxDocs(); } } + + public void testGetSegmentInfosSnapshot() throws IOException { + IOUtils.close(store, engine); + Store store = createStore(); + InternalEngine engine = spy(createEngine(store, createTempDir())); + GatedCloseable segmentInfosSnapshot = engine.getSegmentInfosSnapshot(); + assertNotNull(segmentInfosSnapshot); + assertNotNull(segmentInfosSnapshot.get()); + verify(engine, times(1)).getLatestSegmentInfos(); + store.close(); + engine.close(); + } + + public void testGetProcessedLocalCheckpoint() throws IOException { + final long expectedLocalCheckpoint = 1L; + IOUtils.close(store, engine); + // set up mock + final LocalCheckpointTracker mockCheckpointTracker = mock(LocalCheckpointTracker.class); + when(mockCheckpointTracker.getProcessedCheckpoint()).thenReturn(expectedLocalCheckpoint); + + Store store = createStore(); + InternalEngine engine = createEngine(store, createTempDir(), (a, b) -> mockCheckpointTracker); + + long actualLocalCheckpoint = engine.getProcessedLocalCheckpoint(); + assertEquals(expectedLocalCheckpoint, actualLocalCheckpoint); + verify(mockCheckpointTracker, atLeastOnce()).getProcessedCheckpoint(); + store.close(); + engine.close(); + } } diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2106c5e1067..da0db02ac40 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -107,6 +107,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { @@ -131,6 +132,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { IOUtils.close(external, internal); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) { @@ -142,6 +144,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index fdec86e7912..d99bde4764a 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -364,14 +364,14 @@ public class StoreTests extends OpenSearchTestCase { Store.MetadataSnapshot metadata; // check before we committed try { - store.getMetadata(null); + store.getMetadata(); fail("no index present - expected exception"); } catch (IndexNotFoundException ex) { // expected } writer.commit(); writer.close(); - metadata = store.getMetadata(null); + metadata = store.getMetadata(); assertThat(metadata.asMap().isEmpty(), is(false)); for (StoreFileMetadata meta : metadata) { try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) { @@ -552,7 +552,7 @@ public class StoreTests extends OpenSearchTestCase { } writer.commit(); writer.close(); - first = store.getMetadata(null); + first = store.getMetadata(); assertDeleteContent(store, store.directory()); store.close(); } @@ -581,7 +581,7 @@ public class StoreTests extends OpenSearchTestCase { } writer.commit(); writer.close(); - second = store.getMetadata(null); + second = store.getMetadata(); } Store.RecoveryDiff diff = first.recoveryDiff(second); assertThat(first.size(), equalTo(second.size())); @@ -610,7 +610,7 @@ public class StoreTests extends OpenSearchTestCase { writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs)))); writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); StoreFileMetadata delFile = null; for (StoreFileMetadata md : metadata) { if (md.name().endsWith(".liv")) { @@ -645,7 +645,7 @@ public class StoreTests extends OpenSearchTestCase { writer.addDocument(docs.get(0)); writer.close(); - Store.MetadataSnapshot newCommitMetadata = store.getMetadata(null); + Store.MetadataSnapshot newCommitMetadata = store.getMetadata(); Store.RecoveryDiff newCommitDiff = newCommitMetadata.recoveryDiff(metadata); if (delFile != null) { assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 5)); // segments_N, del file, cfs, cfe, si for the @@ -710,7 +710,7 @@ public class StoreTests extends OpenSearchTestCase { writer.addDocument(doc); } - Store.MetadataSnapshot firstMeta = store.getMetadata(null); + Store.MetadataSnapshot firstMeta = store.getMetadata(); if (random().nextBoolean()) { for (int i = 0; i < docs; i++) { @@ -731,7 +731,7 @@ public class StoreTests extends OpenSearchTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot secondMeta = store.getMetadata(null); + Store.MetadataSnapshot secondMeta = store.getMetadata(); if (randomBoolean()) { store.cleanupAndVerify("test", firstMeta); @@ -1000,7 +1000,7 @@ public class StoreTests extends OpenSearchTestCase { try { if (randomBoolean()) { - store.getMetadata(null); + store.getMetadata(); } else { store.readLastCommittedSegmentsInfo(); } @@ -1138,4 +1138,15 @@ public class StoreTests extends OpenSearchTestCase { } } } + + public void testGetMetadataWithSegmentInfos() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + store.createEmpty(Version.LATEST); + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + Store.MetadataSnapshot metadataSnapshot = store.getMetadata(segmentInfos); + // loose check for equality + assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name()); + store.close(); + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bda2a910d92..d85b2f1e229 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -85,7 +85,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { indexDoc(sourceShard, "_doc", Integer.toString(i)); } sourceShard.flush(new FlushRequest()); - Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(null); + Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(); List mdFiles = new ArrayList<>(); for (StoreFileMetadata md : sourceSnapshot) { mdFiles.add(md); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 1739f546150..fc5c429d74b 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -189,7 +189,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { @@ -226,7 +226,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase { PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); sendFilesFuture.actionGet(); - Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); + Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); assertEquals(0, recoveryDiff.different.size()); @@ -512,7 +512,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase { writer.close(); ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); @@ -594,7 +594,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java new file mode 100644 index 00000000000..6bce74be569 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.util.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collections; + +import static org.mockito.Mockito.mock; + +public class PrimaryShardReplicationSourceTests extends IndexShardTestCase { + + private static final long PRIMARY_TERM = 1L; + private static final long SEGMENTS_GEN = 2L; + private static final long SEQ_NO = 3L; + private static final long VERSION = 4L; + private static final long REPLICATION_ID = 123L; + + private CapturingTransport transport; + private ClusterService clusterService; + private TransportService transportService; + private PrimaryShardReplicationSource replicationSource; + private IndexShard indexShard; + private DiscoveryNode sourceNode; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + transport = new CapturingTransport(); + sourceNode = newDiscoveryNode("sourceNode"); + final DiscoveryNode localNode = newDiscoveryNode("localNode"); + clusterService = ClusterServiceUtils.createClusterService(threadPool, localNode); + transportService = transport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + indexShard = newStartedShard(true); + + replicationSource = new PrimaryShardReplicationSource( + localNode, + indexShard.routingEntry().allocationId().toString(), + transportService, + recoverySettings, + sourceNode + ); + } + + @Override + public void tearDown() throws Exception { + IOUtils.close(transportService, clusterService, transport); + closeShards(indexShard); + super.tearDown(); + } + + public void testGetCheckpointMetadata() { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + SEQ_NO, + VERSION + ); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class)); + CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); + assertEquals(1, requestList.length); + CapturingTransport.CapturedRequest capturedRequest = requestList[0]; + assertEquals(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, capturedRequest.action); + assertEquals(sourceNode, capturedRequest.node); + assertTrue(capturedRequest.request instanceof CheckpointInfoRequest); + } + + public void testGetSegmentFiles() { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + SEQ_NO, + VERSION + ); + StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); + replicationSource.getSegmentFiles( + REPLICATION_ID, + checkpoint, + Arrays.asList(testMetadata), + mock(Store.class), + mock(ActionListener.class) + ); + CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); + assertEquals(1, requestList.length); + CapturingTransport.CapturedRequest capturedRequest = requestList[0]; + assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action); + assertEquals(sourceNode, capturedRequest.node); + assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest); + } + + private DiscoveryNode newDiscoveryNode(String nodeName) { + return new DiscoveryNode( + nodeName, + randomAlphaOfLength(10), + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + org.opensearch.Version.CURRENT + ); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java new file mode 100644 index 00000000000..67c867d360e --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyStateTests; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { + + private ShardId testShardId; + private ReplicationCheckpoint testCheckpoint; + private IndicesService mockIndicesService; + private IndexService mockIndexService; + private IndexShard mockIndexShard; + private TestThreadPool testThreadPool; + private CapturingTransport transport; + private TransportService transportService; + private DiscoveryNode localNode; + private SegmentReplicationSourceService segmentReplicationSourceService; + + @Override + public void setUp() throws Exception { + super.setUp(); + // setup mocks + mockIndexShard = CopyStateTests.createMockIndexShard(); + testShardId = mockIndexShard.shardId(); + mockIndicesService = mock(IndicesService.class); + mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); + + // This mirrors the creation of the ReplicationCheckpoint inside CopyState + testCheckpoint = new ReplicationCheckpoint( + testShardId, + mockIndexShard.getOperationPrimaryTerm(), + 0L, + mockIndexShard.getProcessedLocalCheckpoint(), + 0L + ); + testThreadPool = new TestThreadPool("test", Settings.EMPTY); + transport = new CapturingTransport(); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + transportService = transport.createTransportService( + Settings.EMPTY, + testThreadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> localNode, + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService); + } + + @Override + public void tearDown() throws Exception { + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + testThreadPool = null; + super.tearDown(); + } + + public void testGetSegmentFiles_EmptyResponse() { + final GetSegmentFilesRequest request = new GetSegmentFilesRequest( + 1, + "allocationId", + localNode, + Collections.emptyList(), + testCheckpoint + ); + transportService.sendRequest( + localNode, + SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, + request, + new TransportResponseHandler() { + @Override + public void handleResponse(GetSegmentFilesResponse response) { + assertEquals(0, response.files.size()); + } + + @Override + public void handleException(TransportException e) { + fail("unexpected exception: " + e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetSegmentFilesResponse read(StreamInput in) throws IOException { + return new GetSegmentFilesResponse(in); + } + } + ); + } + + public void testCheckpointInfo() { + final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); + transportService.sendRequest( + localNode, + SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, + request, + new TransportResponseHandler() { + @Override + public void handleResponse(CheckpointInfoResponse response) { + assertEquals(testCheckpoint, response.getCheckpoint()); + assertNotNull(response.getInfosBytes()); + // CopyStateTests sets up one pending delete file and one committed segments file + assertEquals(1, response.getPendingDeleteFiles().size()); + assertEquals(1, response.getSnapshot().size()); + } + + @Override + public void handleException(TransportException e) { + fail("unexpected exception: " + e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); + } + } + ); + } + +} diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java new file mode 100644 index 00000000000..afa38afb0cf --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.util.Version; +import org.opensearch.common.collect.Map; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CopyStateTests extends IndexShardTestCase { + + private static final long EXPECTED_LONG_VALUE = 1L; + private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0); + private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + + private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE, PENDING_DELETE_FILE.name(), PENDING_DELETE_FILE), + null, + 0 + ); + + private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + null, + 0 + ); + + public void testCopyStateCreation() throws IOException { + CopyState copyState = new CopyState(createMockIndexShard()); + ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); + assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); + // version was never set so this should be zero + assertEquals(0, checkpoint.getSegmentInfosVersion()); + assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm()); + + Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); + assertEquals(1, pendingDeleteFiles.size()); + assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE)); + } + + public static IndexShard createMockIndexShard() throws IOException { + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(TEST_SHARD_ID); + when(mockShard.getOperationPrimaryTerm()).thenReturn(EXPECTED_LONG_VALUE); + when(mockShard.getProcessedLocalCheckpoint()).thenReturn(EXPECTED_LONG_VALUE); + + Store mockStore = mock(Store.class); + when(mockShard.store()).thenReturn(mockStore); + + SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); + when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); + when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT); + + IndexCommit mockIndexCommit = mock(IndexCommit.class); + when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); + when(mockStore.getMetadata(mockIndexCommit)).thenReturn(COMMIT_SNAPSHOT); + return mockShard; + } +}