Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes (#3108)

* Intial PR adding classes and tests related to checkpoint publishing

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Putting a Draft PR with all changes in classes. Testing is still not included in this commit.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Wiring up index shard to new engine, spotless apply and removing unnecessary tests and logs

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding Unit test for checkpointRefreshListener

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fixing import statements *

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* removing unused constructor in index shard

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Addressing comments from last commit

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding package-info.java files for two new packages

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding test for null checkpoint publisher and addreesing PR comments

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add docs for indexshardtests and remove shard.refresh

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
This commit is contained in:
Rishikesh Pasham 2022-05-24 14:08:14 +00:00 committed by GitHub
parent eb847aeeef
commit fd5a38de12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 815 additions and 11 deletions

View File

@ -84,6 +84,7 @@ import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
@ -673,7 +674,8 @@ public class IndexShardIT extends OpenSearchSingleNodeTestCase {
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
);
}

View File

@ -94,6 +94,7 @@ import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
@ -428,7 +429,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
@ -530,7 +532,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);

View File

@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.shard;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import java.io.IOException;
/**
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
* This class is only used with Segment Replication enabled.
*
* @opensearch.internal
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {
protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);
private final IndexShard shard;
private final SegmentReplicationCheckpointPublisher publisher;
public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
this.shard = shard;
this.publisher = publisher;
}
@Override
public void beforeRefresh() throws IOException {
// Do nothing
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
publisher.publish(shard);
}
}
}

View File

@ -160,6 +160,9 @@ import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
@ -299,6 +302,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;
public IndexShard(
final ShardRouting shardRouting,
@ -320,7 +324,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
@ -403,6 +408,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (checkpointPublisher != null) {
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
} else {
this.checkpointRefreshListener = null;
}
}
public ThreadPool getThreadPool() {
@ -1363,6 +1373,21 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
/**
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
}
/**
* Invoked when a new checkpoint is received from a primary shard. Starts the copy process.
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
assert shardRouting.primary() == false;
// TODO
}
/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
@ -3106,6 +3131,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
};
final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (this.checkpointRefreshListener != null) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
@ -3122,7 +3154,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,

View File

@ -41,6 +41,7 @@ import org.opensearch.common.ParseField;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.mapper.BinaryFieldMapper;
import org.opensearch.index.mapper.BooleanFieldMapper;
@ -73,6 +74,7 @@ import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.plugins.MapperPlugin;
@ -278,6 +280,9 @@ public class IndicesModule extends AbstractModule {
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
}
}
/**

View File

@ -138,6 +138,7 @@ import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
@ -839,6 +840,7 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
@ -853,7 +855,7 @@ public class IndicesService extends AbstractLifecycleComponent
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

View File

@ -80,6 +80,7 @@ import org.opensearch.indices.recovery.PeerRecoverySourceService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
@ -138,6 +139,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
@Inject
public IndicesClusterStateService(
final Settings settings,
@ -153,13 +156,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) {
this(
settings,
indicesService,
clusterService,
threadPool,
checkpointPublisher,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
@ -179,6 +184,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
@ -191,6 +197,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final RetentionLeaseSyncer retentionLeaseSyncer
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -624,6 +631,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
indicesService.createShard(
shardRouting,
checkpointPublisher,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm, this),
repositoriesService,
@ -981,6 +989,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
*/
T createShard(
ShardRouting shardRouting,
SegmentReplicationCheckpointPublisher checkpointPublisher,
PeerRecoveryTargetService recoveryTargetService,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,

View File

@ -0,0 +1,173 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.checkpoint;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
* @opensearch.internal
*/
public class PublishCheckpointAction extends TransportReplicationAction<
PublishCheckpointRequest,
PublishCheckpointRequest,
ReplicationResponse> {
public static final String ACTION_NAME = "indices:admin/publishCheckpoint";
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);
@Inject
public PublishCheckpointAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
PublishCheckpointRequest::new,
PublishCheckpointRequest::new,
ThreadPool.Names.REFRESH
);
}
@Override
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ReplicationResponse(in);
}
@Override
protected void doExecute(Task task, PublishCheckpointRequest request, ActionListener<ReplicationResponse> listener) {
assert false : "use PublishCheckpointAction#publish";
}
/**
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
}
@Override
public void handleException(TransportException e) {
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(
e,
IndexNotFoundException.class,
AlreadyClosedException.class,
IndexShardClosedException.class
) != null) {
// the index was deleted or the shard is closed
return;
}
logger.warn(
new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()),
e
);
}
}
);
}
}
@Override
protected void shardOperationOnPrimary(
PublishCheckpointRequest request,
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
}
@Override
protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
ActionListener.completeWith(listener, () -> {
logger.trace("Checkpoint received on replica {}", request);
if (request.getCheckpoint().getShardId().equals(replica.shardId())) {
replica.onNewCheckpoint(request);
}
return new ReplicaResult();
});
}
}

View File

@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.checkpoint;
import org.opensearch.action.support.replication.ReplicationRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Replication request responsible for publishing checkpoint request to a replica shard.
*
* @opensearch.internal
*/
public class PublishCheckpointRequest extends ReplicationRequest<PublishCheckpointRequest> {
private final ReplicationCheckpoint checkpoint;
public PublishCheckpointRequest(ReplicationCheckpoint checkpoint) {
super(checkpoint.getShardId());
this.checkpoint = checkpoint;
}
public PublishCheckpointRequest(StreamInput in) throws IOException {
super(in);
this.checkpoint = new ReplicationCheckpoint(in);
}
/**
* Returns Replication Checkpoint
*/
public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}
@Override
public String toString() {
return "PublishCheckpointRequest{" + "checkpoint=" + checkpoint + '}';
}
}

View File

@ -0,0 +1,136 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.checkpoint;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
/**
* Represents a Replication Checkpoint which is sent to a replica shard.
*
* @opensearch.internal
*/
public class ReplicationCheckpoint implements Writeable {
private final ShardId shardId;
private final long primaryTerm;
private final long segmentsGen;
private final long seqNo;
private final long segmentInfosVersion;
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.seqNo = seqNo;
this.segmentInfosVersion = segmentInfosVersion;
}
public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
seqNo = in.readLong();
segmentInfosVersion = in.readLong();
}
/**
* The primary term of this Replication Checkpoint.
*
* @return the primary term
*/
public long getPrimaryTerm() {
return primaryTerm;
}
/**
* @return the Segments Gen number
*/
public long getSegmentsGen() {
return segmentsGen;
}
/**
* @return the Segment Info version
*/
public long getSegmentInfosVersion() {
return segmentInfosVersion;
}
/**
* @return the Seq number
*/
public long getSeqNo() {
return seqNo;
}
/**
* Shard Id of primary shard.
*
* @return the Shard Id
*/
public ShardId getShardId() {
return shardId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(seqNo);
out.writeLong(segmentInfosVersion);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReplicationCheckpoint that = (ReplicationCheckpoint) o;
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& seqNo == that.seqNo
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
}
@Override
public int hashCode() {
return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo);
}
/**
* Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null
*/
public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
return other == null || segmentInfosVersion > other.getSegmentInfosVersion();
}
@Override
public String toString() {
return "ReplicationCheckpoint{"
+ "shardId="
+ shardId
+ ", primaryTerm="
+ primaryTerm
+ ", segmentsGen="
+ segmentsGen
+ ", seqNo="
+ seqNo
+ ", version="
+ segmentInfosVersion
+ '}';
}
}

View File

@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.checkpoint;
import org.opensearch.common.inject.Inject;
import org.opensearch.index.shard.IndexShard;
import java.util.Objects;
/**
* Publish Segment Replication Checkpoint.
*
* @opensearch.internal
*/
public class SegmentReplicationCheckpointPublisher {
private final PublishAction publishAction;
@Inject
public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) {
this(publishAction::publish);
}
public SegmentReplicationCheckpointPublisher(PublishAction publishAction) {
this.publishAction = Objects.requireNonNull(publishAction);
}
public void publish(IndexShard indexShard) {
publishAction.publish(indexShard);
}
/**
* Represents an action that is invoked to publish segment replication checkpoint to replica shard
*/
public interface PublishAction {
void publish(IndexShard indexShard);
}
/**
* NoOp Checkpoint publisher
*/
public static final SegmentReplicationCheckpointPublisher EMPTY = new SegmentReplicationCheckpointPublisher(indexShard -> {});
}

View File

@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/** Package containing classes to implement a replication checkpoint */
package org.opensearch.indices.replication.checkpoint;

View File

@ -37,6 +37,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
@ -133,6 +134,7 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
@ -198,6 +200,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS;
@ -3425,6 +3428,72 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
/**
* here we are mocking a SegmentReplicationcheckpointPublisher and testing on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List
*/
public void testCheckpointRefreshListener() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(mock), true);
List<ReferenceManager.RefreshListener> refreshListeners = shard.getEngine().config().getInternalRefreshListener();
assertTrue(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener));
closeShards(shard);
}
/**
* here we are passing null in place of SegmentReplicationCheckpointPublisher and testing on index shard if CheckpointRefreshListener is not added to the InternalrefreshListerners List
*/
public void testCheckpointRefreshListenerWithNull() throws IOException {
IndexShard shard = newStartedShard(p -> newShard(null), true);
List<ReferenceManager.RefreshListener> refreshListeners = shard.getEngine().config().getInternalRefreshListener();
assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener));
closeShards(shard);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000))
.put(Settings.EMPTY)
.build();
IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName())
.settings(indexSettings)
.primaryTerm(0, primaryTerm)
.putMapping("{ \"properties\": {} }")
.build();
return newShard(
shardRouting,
shardPath,
metadata,
null,
null,
new InternalEngineFactory(),
new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
checkpointPublisher
);
}
public void testIndexCheckOnStartup() throws Exception {
final IndexShard indexShard = newStartedShard(true);

View File

@ -49,6 +49,7 @@ import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import java.util.Arrays;
@ -148,7 +149,12 @@ public class IndicesLifecycleListenerSingleNodeTests extends OpenSearchSingleNod
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
IndexShard shard = index.createShard(
newRouting,
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY
);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode(

View File

@ -59,6 +59,7 @@ import org.opensearch.indices.cluster.IndicesClusterStateService.Shard;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.OpenSearchTestCase;
@ -253,6 +254,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends OpenSea
@Override
public MockIndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,

View File

@ -66,6 +66,7 @@ import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
@ -562,6 +563,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
indicesService,
clusterService,
threadPool,
SegmentReplicationCheckpointPublisher.EMPTY,
recoveryTargetService,
shardStateAction,
null,

View File

@ -0,0 +1,157 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.checkpoint;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.*;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
public class PublishCheckpointActionTests extends OpenSearchTestCase {
private ThreadPool threadPool;
private CapturingTransport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = transport.createTransportService(
clusterService.getSettings(),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(),
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
}
@Override
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, clusterService, transport);
} finally {
terminate(threadPool);
}
super.tearDown();
}
public void testPublishCheckpointActionOnPrimary() throws InterruptedException {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings());
final PublishCheckpointAction action = new PublishCheckpointAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1);
final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> {
// we should forward the request containing the current publish checkpoint to the replica
assertThat(result.replicaRequest(), sameInstance(request));
}));
}
public void testPublishCheckpointActionOnReplica() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings());
final PublishCheckpointAction action = new PublishCheckpointAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1);
final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
final PlainActionFuture<TransportReplicationAction.ReplicaResult> listener = PlainActionFuture.newFuture();
action.shardOperationOnReplica(request, indexShard, listener);
final TransportReplicationAction.ReplicaResult result = listener.actionGet();
// onNewCheckpoint should be called on shard with checkpoint request
verify(indexShard).onNewCheckpoint(request);
// the result should indicate success
final AtomicBoolean success = new AtomicBoolean();
result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
assertTrue(success.get());
}
}

View File

@ -182,6 +182,7 @@ import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoverySourceService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.ResponseCollectorService;
@ -1860,7 +1861,8 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase {
shardStateAction,
actionFilters
),
RetentionLeaseSyncer.EMPTY
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY
);
Map<ActionType, TransportAction> actions = new HashMap<>();
final SystemIndices systemIndices = new SystemIndices(emptyMap());

View File

@ -94,6 +94,7 @@ import org.opensearch.indices.recovery.RecoverySourceHandler;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.recovery.StartRecoveryRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.IndexId;
@ -412,7 +413,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
}
/**
* creates a new initializing shard.
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetadata indexMetadata for the shard, including any mapping
@ -434,6 +436,48 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
IndexingOperationListener... listeners
) throws IOException {
return newShard(
routing,
shardPath,
indexMetadata,
storeProvider,
indexReaderWrapper,
engineFactory,
engineConfigFactory,
globalCheckpointSyncer,
retentionLeaseSyncer,
indexEventListener,
SegmentReplicationCheckpointPublisher.EMPTY,
listeners
);
}
/**
* creates a new initializing shard.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetadata indexMetadata for the shard, including any mapping
* @param storeProvider an optional custom store provider to use. If null a default file based store will be created
* @param indexReaderWrapper an optional wrapper to be used during search
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param checkpointPublisher segment Replication Checkpoint Publisher to publish checkpoint
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
ShardRouting routing,
ShardPath shardPath,
IndexMetadata indexMetadata,
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
@Nullable EngineFactory engineFactory,
@Nullable EngineConfigFactory engineConfigFactory,
Runnable globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
SegmentReplicationCheckpointPublisher checkpointPublisher,
IndexingOperationListener... listeners
) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings);
@ -480,7 +524,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase {
Arrays.asList(listeners),
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService
breakerService,
checkpointPublisher
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;