Simplify GlobalCheckpointService and properly hook it for cluster state updates (#20720)

During a recent merge from master, we lost the bridge from IndicesClusterStateService to the GlobalCheckpointService of primary shards, notifying them of changes to the current set of active/initializing shards. This commits add the bridge back (with unit tests). It also simplifies the GlobalCheckpoint tracking to use a simpler model (which makes use the fact that the global check point sync is done periodically).

The old integration CheckpointIT test is moved to IndexLevelReplicationTests. I also added similar assertions to RelocationsIT, which surfaced a bug in the primary relocation logic and how it plays with global checkpoint updates. The test is currently await-fixed and will be fixed in a follow up issue.
This commit is contained in:
Boaz Leskes 2016-10-17 16:33:03 +02:00 committed by GitHub
parent 7c2e761c87
commit eaa105951f
24 changed files with 579 additions and 406 deletions

View File

@ -25,15 +25,18 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint
* is the highest seq_no for which all lower (or equal) seq_no have been processed on all shards that
* are currently active. Since shards count as "active" when the master starts them, and before this primary shard
* has been notified of this fact, we also include shards in that are in the
* {@link org.elasticsearch.index.shard.IndexShardState#POST_RECOVERY} state when checking for global checkpoint advancement.
* We call these shards "in sync" with all operations on the primary (see {@link #inSyncLocalCheckpoints}.
* has been notified of this fact, we also include shards that have completed recovery. These shards have received
* all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set
* of shards that are taken into account for the global checkpoint calculation are called the "in sync" shards.
*
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas
@ -41,15 +44,9 @@ import java.util.Set;
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
/**
* This map holds the last known local checkpoint for every shard copy that's active.
* All shard copies in this map participate in determining the global checkpoint
* keyed by allocation ids
*/
private final ObjectLongMap<String> activeLocalCheckpoints;
/**
* This map holds the last known local checkpoint for every initializing shard copy that's has been brought up
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up
* to speed through recovery. These shards are treated as valid copies and participate in determining the global
* checkpoint.
* <p>
@ -57,22 +54,19 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
private final ObjectLongMap<String> inSyncLocalCheckpoints; // keyed by allocation ids
/**
* This map holds the last known local checkpoint for every initializing shard copy that is still undergoing recovery.
* These shards <strong>do not</strong> participate in determining the global checkpoint. This map is needed to make sure that when
* shards are promoted to {@link #inSyncLocalCheckpoints} we use the highest known checkpoint, even if we index concurrently
* while recovering the shard.
* Keyed by allocation ids
* This set holds the last set of known valid allocation ids as received by the master. This is important to make sure
* shard that are failed or relocated are cleaned up from {@link #inSyncLocalCheckpoints} and do not hold the global
* checkpoint back
*/
private final ObjectLongMap<String> trackingLocalCheckpoint;
private final Set<String> assignedAllocationIds;
private long globalCheckpoint;
/**
* Initialize the global checkpoint service. The {@code globalCheckpoint}
* should be set to the last known global checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}.
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
@ -83,51 +77,35 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint:" + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
assignedAllocationIds = new HashSet<>(1 + indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
}
/**
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
* Notifies the service of a local checkpoint. If the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not in sync, it is ignored. This to prevent late
* arrivals from shards that are removed to be re-added.
*/
public synchronized void updateLocalCheckpoint(String allocationId, long localCheckpoint) {
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, activeLocalCheckpoints, "active")) {
return;
}
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, inSyncLocalCheckpoints, "inSync")) {
return;
}
if (updateLocalCheckpointInMap(allocationId, localCheckpoint, trackingLocalCheckpoint, "tracking")) {
return;
}
logger.trace("local checkpoint of [{}] ([{}]) wasn't found in any map. ignoring.", allocationId, localCheckpoint);
}
final int indexOfKey = inSyncLocalCheckpoints.indexOf(allocationId);
if (indexOfKey >= 0) {
final long current = inSyncLocalCheckpoints.indexGet(indexOfKey);
private boolean updateLocalCheckpointInMap(String allocationId, long localCheckpoint,
ObjectLongMap<String> checkpointsMap, String name) {
assert Thread.holdsLock(this);
int indexOfKey = checkpointsMap.indexOf(allocationId);
if (indexOfKey < 0) {
return false;
}
long current = checkpointsMap.indexGet(indexOfKey);
// nocommit: this can change when we introduces rollback/resync
if (current < localCheckpoint) {
checkpointsMap.indexReplace(indexOfKey, localCheckpoint);
inSyncLocalCheckpoints.indexReplace(indexOfKey, localCheckpoint);
if (logger.isTraceEnabled()) {
logger.trace("updated local checkpoint of [{}] to [{}] (type [{}])", allocationId, localCheckpoint,
name);
logger.trace("updated local checkpoint of [{}] to [{}] (was [{}])", allocationId, localCheckpoint, current);
}
} else {
logger.trace("skipping update local checkpoint [{}], current check point is higher " +
logger.trace("skipping update of local checkpoint [{}], current checkpoint is higher " +
"(current [{}], incoming [{}], type [{}])",
allocationId, current, localCheckpoint, allocationId);
}
return true;
} else {
logger.trace("[{}] isn't marked as in sync. ignoring local checkpoint of [{}].", allocationId, localCheckpoint);
}
}
/**
@ -138,21 +116,16 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
synchronized boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) {
if (inSyncLocalCheckpoints.isEmpty()) {
return false;
}
for (ObjectLongCursor<String> cp : activeLocalCheckpoints) {
if (cp.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
for (ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
if (cp.value == UNASSIGNED_SEQ_NO) {
logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key);
return true;
}
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
for (ObjectLongCursor<String> cp : inSyncLocalCheckpoints) {
assert cp.value != SequenceNumbersService.UNASSIGNED_SEQ_NO :
"in sync allocation ids can not have an unknown checkpoint (aId [" + cp.key + "])";
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
// nocommit: if this happens - do you we fail the shard?
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
@ -181,77 +154,53 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
} else {
// nocommit: fail the shard?
throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" +
this.globalCheckpoint + "], got [" + globalCheckpoint + "]");
}
}
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that
* have been removed and adds/promotes any active allocations to the {@link #activeLocalCheckpoints}.
* have been removed.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(Set<String> activeAllocationIds,
Set<String> initializingAllocationIds) {
activeLocalCheckpoints.removeAll(key -> activeAllocationIds.contains(key) == false);
assignedAllocationIds.removeIf(
aId -> activeAllocationIds.contains(aId) == false && initializingAllocationIds.contains(aId) == false);
assignedAllocationIds.addAll(activeAllocationIds);
assignedAllocationIds.addAll(initializingAllocationIds);
for (String activeId : activeAllocationIds) {
if (activeLocalCheckpoints.containsKey(activeId) == false) {
long knownCheckpoint = trackingLocalCheckpoint.getOrDefault(activeId, SequenceNumbersService.UNASSIGNED_SEQ_NO);
knownCheckpoint = inSyncLocalCheckpoints.getOrDefault(activeId, knownCheckpoint);
activeLocalCheckpoints.put(activeId, knownCheckpoint);
logger.trace("marking [{}] as active. known checkpoint [{}]", activeId, knownCheckpoint);
if (inSyncLocalCheckpoints.containsKey(activeId) == false) {
inSyncLocalCheckpoints.put(activeId, UNASSIGNED_SEQ_NO);
}
}
inSyncLocalCheckpoints.removeAll(key -> initializingAllocationIds.contains(key) == false);
trackingLocalCheckpoint.removeAll(key -> initializingAllocationIds.contains(key) == false);
// add initializing shards to tracking
for (String initID : initializingAllocationIds) {
if (inSyncLocalCheckpoints.containsKey(initID)) {
continue;
}
if (trackingLocalCheckpoint.containsKey(initID)) {
continue;
}
trackingLocalCheckpoint.put(initID, SequenceNumbersService.UNASSIGNED_SEQ_NO);
logger.trace("added [{}] to the tracking map due to a CS update", initID);
}
inSyncLocalCheckpoints.removeAll(key -> assignedAllocationIds.contains(key) == false);
}
/**
* marks the allocationId as "in sync" with the primary shard. This should be called at the end of recovery
* where the primary knows all operation bellow the global checkpoint have been completed on this shard.
* where the primary knows all operation below the global checkpoint have been completed on this shard.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
public synchronized void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
if (trackingLocalCheckpoint.containsKey(allocationId) == false) {
// master have change its mind and removed this allocation, ignore.
public synchronized void markAllocationIdAsInSync(String allocationId) {
if (assignedAllocationIds.contains(allocationId) == false) {
// master have change it's mind and removed this allocation, ignore.
return;
}
long current = trackingLocalCheckpoint.remove(allocationId);
localCheckpoint = Math.max(current, localCheckpoint);
logger.trace("marked [{}] as in sync with a local checkpoint of [{}]", allocationId, localCheckpoint);
inSyncLocalCheckpoints.put(allocationId, localCheckpoint);
logger.trace("marked [{}] as in sync", allocationId);
inSyncLocalCheckpoints.put(allocationId, UNASSIGNED_SEQ_NO);
}
// for testing
synchronized long getLocalCheckpointForAllocation(String allocationId) {
if (activeLocalCheckpoints.containsKey(allocationId)) {
return activeLocalCheckpoints.get(allocationId);
}
if (inSyncLocalCheckpoints.containsKey(allocationId)) {
return inSyncLocalCheckpoints.get(allocationId);
}
if (trackingLocalCheckpoint.containsKey(allocationId)) {
return trackingLocalCheckpoint.get(allocationId);
}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
return UNASSIGNED_SEQ_NO;
}
}

View File

@ -96,25 +96,25 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<Globa
});
}
static final class PrimaryRequest extends ReplicationRequest<PrimaryRequest> {
public static final class PrimaryRequest extends ReplicationRequest<PrimaryRequest> {
private PrimaryRequest() {
super();
}
PrimaryRequest(ShardId shardId) {
public PrimaryRequest(ShardId shardId) {
super(shardId);
}
}
static final class ReplicaRequest extends ReplicationRequest<GlobalCheckpointSyncAction.ReplicaRequest> {
public static final class ReplicaRequest extends ReplicationRequest<GlobalCheckpointSyncAction.ReplicaRequest> {
public long checkpoint;
private long checkpoint;
private ReplicaRequest() {
}
ReplicaRequest(PrimaryRequest primaryRequest, long checkpoint) {
public ReplicaRequest(PrimaryRequest primaryRequest, long checkpoint) {
super(primaryRequest.shardId());
this.checkpoint = checkpoint;
}
@ -130,6 +130,9 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<Globa
super.writeTo(out);
out.writeZLong(checkpoint);
}
}
public long getCheckpoint() {
return checkpoint;
}
}
}

View File

@ -79,4 +79,13 @@ public class SeqNoStats implements ToXContent, Writeable {
builder.endObject();
return builder;
}
@Override
public String toString() {
return "SeqNoStats{" +
"maxSeqNo=" + maxSeqNo +
", localCheckpoint=" + localCheckpoint +
", globalCheckpoint=" + globalCheckpoint +
'}';
}
}

View File

@ -114,13 +114,12 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* marks the allocationId as "in sync" with the primary shard.
* see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details.
* see {@link GlobalCheckpointService#markAllocationIdAsInSync(String)} for details.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
globalCheckpointService.markAllocationIdAsInSync(allocationId, localCheckpoint);
public void markAllocationIdAsInSync(String allocationId) {
globalCheckpointService.markAllocationIdAsInSync(allocationId);
}
public long getLocalCheckpoint() {

View File

@ -349,8 +349,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
primaryTerm = newTerm;
}
}
}
/**
@ -1366,14 +1364,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* marks the allocationId as "in sync" with the primary shard. see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details.
* marks the allocationId as "in sync" with the primary shard. see {@link GlobalCheckpointService#markAllocationIdAsInSync(String)}
* for details.
*
* @param allocationId allocationId of the recovering shard
* @param localCheckpoint the local checkpoint of the shard in question
*/
public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) {
public void markAllocationIdAsInSync(String allocationId) {
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
getEngine().seqNoService().markAllocationIdAsInSync(allocationId);
}
public long getLocalCheckpoint() {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
@ -54,6 +55,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.seqno.GlobalCheckpointService;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -85,6 +87,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateListener {
@ -120,7 +123,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, NodeServicesProvider nodeServicesProvider,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
this(settings, indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService,
nodeServicesProvider, globalCheckpointSyncAction::updateCheckpointForShard);
@ -502,7 +505,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
createShard(nodes, routingTable, shardRouting);
} else {
updateShard(nodes, shardRouting, shard);
updateShard(nodes, shardRouting, shard, routingTable);
}
}
}
@ -534,7 +537,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
}
}
private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shard) {
private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shard, RoutingTable routingTable) {
final ShardRouting currentRoutingEntry = shard.routingEntry();
assert currentRoutingEntry.isSameAllocation(shardRouting) :
"local shard has a different allocation id but wasn't cleaning by removeShards. "
@ -542,6 +545,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try {
shard.updateRoutingEntry(shardRouting);
if (shardRouting.primary()) {
IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
Set<String> activeIds = indexShardRoutingTable.activeShards().stream().map(r -> r.allocationId().getId())
.collect(Collectors.toSet());
Set<String> initializingIds = indexShardRoutingTable.getAllInitializingShards().stream().map(r -> r.allocationId().getId())
.collect(Collectors.toSet());
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e);
return;
@ -720,6 +731,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* @throws IOException if shard state could not be persisted
*/
void updateRoutingEntry(ShardRouting shardRouting) throws IOException;
/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details.
*
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {

View File

@ -113,7 +113,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
}
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, targetShardRouting.allocationId().getId(), shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
try {
return handler.recoverToTarget();
@ -133,9 +133,9 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
private final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, String targetAllocationId, IndexShard shard) {
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
RecoverySourceHandler handler = shardContext.addNewRecovery(request, targetAllocationId, shard);
shard.recoveryStats().incCurrentAsSource();
return handler;
}
@ -181,20 +181,21 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
* Adds recovery source handler if recoveries are not delayed from starting (see also {@link #delayNewRecoveries}.
* Throws {@link DelayRecoveryException} if new recoveries are delayed from starting.
*/
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, String targetAllocationId, IndexShard shard) {
if (onNewRecoveryException != null) {
throw onNewRecoveryException;
}
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
RecoverySourceHandler handler = createRecoverySourceHandler(request, targetAllocationId, shard);
recoveryHandlers.add(handler);
return handler;
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, String targetAllocationId,
IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), targetAllocationId, transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,

View File

@ -314,12 +314,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
final RecoveryTargetHandler.FinalizeResponse response;
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
response = recoveryRef.status().finalizeRecovery();
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()))
{
recoveryRef.status().finalizeRecovery();
}
channel.sendResponse(response);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

View File

@ -391,8 +391,8 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(() -> {
RecoveryTarget.FinalizeResponse response = recoveryTarget.finalizeRecovery();
shard.markAllocationIdAsInSync(response.getAllocationId(), response.getLocalCheckpoint());
recoveryTarget.finalizeRecovery();
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
});
if (request.isPrimaryRelocation()) {

View File

@ -333,10 +333,14 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public FinalizeResponse finalizeRecovery() {
public void finalizeRecovery() {
final IndexShard indexShard = indexShard();
indexShard.finalizeRecovery();
return new FinalizeResponse(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint());
}
@Override
public String getTargetAllocationId() {
return indexShard().routingEntry().allocationId().getId();
}
@Override

View File

@ -19,12 +19,9 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
@ -44,9 +41,9 @@ public interface RecoveryTargetHandler {
/**
* The finalize request clears unreferenced translog files, refreshes the engine now that
* new segments are available, and enables garbage collection of
* tombstone files. The shard is also moved to the POST_RECOVERY phase during this time
* tombstone files.
**/
FinalizeResponse finalizeRecovery();
void finalizeRecovery();
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
@ -81,41 +78,8 @@ public interface RecoveryTargetHandler {
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;
class FinalizeResponse extends TransportResponse {
private long localCheckpoint;
private String allocationId;
public FinalizeResponse(String allocationId, long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
this.allocationId = allocationId;
}
FinalizeResponse() {
}
public long getLocalCheckpoint() {
return localCheckpoint;
}
public String getAllocationId() {
return allocationId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
}
}
/***
* @return the allocation id of the target shard.
*/
String getTargetAllocationId();
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -51,9 +50,11 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final Consumer<Long> onSourceThrottle;
private String targetAllocationId;
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, DiscoveryNode targetNode,
RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, String targetAllocationId, TransportService transportService,
DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
this.targetAllocationId = targetAllocationId;
this.transportService = transportService;
@ -85,16 +86,11 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public FinalizeResponse finalizeRecovery() {
return transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
public void finalizeRecovery() {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new FutureTransportResponseHandler<FinalizeResponse>() {
@Override
public FinalizeResponse newInstance() {
return new FinalizeResponse();
}
}).txGet();
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
@ -167,4 +163,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
*/
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public String getTargetAllocationId() {
return targetAllocationId;
}
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
@ -104,9 +105,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
boolean closed = false;
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
primary = newShard(shardId, true, "s0", indexMetaData, null);
primary = newShard(shardId, true, "s0", indexMetaData, this::syncGlobalCheckpoint, null);
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
updateAllocationIDsOnPrimary();
for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) {
addReplica();
}
@ -119,6 +121,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final IndexResponse response = index(indexRequest);
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
}
primary.updateGlobalCheckpointOnPrimary();
return numOfDoc;
}
@ -128,6 +131,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final IndexResponse response = index(indexRequest);
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
}
primary.updateGlobalCheckpointOnPrimary();
return numOfDoc;
}
@ -138,18 +142,39 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
public synchronized void startAll() throws IOException {
startReplicas(replicas.size());
}
public synchronized int startReplicas(int numOfReplicasToStart) throws IOException {
if (primary.routingEntry().initializing()) {
startPrimary();
}
int started = 0;
for (IndexShard replicaShard : replicas) {
if (replicaShard.routingEntry().initializing()) {
recoverReplica(replicaShard);
started++;
if (started > numOfReplicasToStart) {
break;
}
}
}
return started;
}
public void startPrimary() throws IOException {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
for (IndexShard replicaShard : replicas) {
recoverReplica(replicaShard);
}
updateAllocationIDsOnPrimary();
}
public synchronized IndexShard addReplica() throws IOException {
final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, null);
final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData,
() -> { throw new AssertionError("replicas can't sync global checkpoint"); }, null);
replicas.add(replica);
updateAllocationIDsOnPrimary();
return replica;
}
@ -165,6 +190,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
updateAllocationIDsOnPrimary();
}
public synchronized DiscoveryNode getPrimaryNode() {
@ -230,9 +256,33 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public IndexShard getPrimary() {
return primary;
}
private void syncGlobalCheckpoint() {
PlainActionFuture<ReplicationResponse> listener = new PlainActionFuture<>();
try {
new GlobalCheckpointSync(listener, this).execute();
listener.get();
} catch (Exception e) {
throw new AssertionError(e);
}
}
abstract class ReplicationAction<Request extends ReplicationRequest<Request>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
private void updateAllocationIDsOnPrimary() {
Set<String> active = new HashSet<>();
Set<String> initializing = new HashSet<>();
for (ShardRouting shard: shardRoutings()) {
if (shard.active()) {
active.add(shard.allocationId().getId());
} else {
initializing.add(shard.allocationId().getId());
}
}
primary.updateAllocationIdsFromMaster(active, initializing);
}
}
abstract class ReplicationAction<Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse> {
private final Request request;
private ActionListener<Response> listener;
@ -390,4 +440,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
TransportWriteActionTestHelper.performPostWriteActions(replica, request, index.getTranslogLocation(), logger);
}
}
class GlobalCheckpointSync extends
ReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest, GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
public GlobalCheckpointSync(ActionListener<ReplicationResponse> listener, ReplicationGroup replicationGroup) {
super(new GlobalCheckpointSyncAction.PrimaryRequest(replicationGroup.getPrimary().shardId()), listener,
replicationGroup, "global_ckp");
}
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception {
return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()),
new ReplicationResponse());
}
@Override
protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) {
replica.updateGlobalCheckpointOnReplica(request.getCheckpoint());
}
}
}

View File

@ -21,19 +21,29 @@ package org.elasticsearch.index.replication;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.hamcrest.Matcher;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
public void testSimpleReplication() throws Exception {
@ -121,4 +131,38 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
}
}
public void testCheckpointsAdvance() throws Exception {
try (ReplicationGroup shards = createGroup(randomInt(3))) {
shards.startPrimary();
int numDocs = 0;
int startedShards;
do {
numDocs += shards.indexDocs(randomInt(20));
startedShards = shards.startReplicas(randomIntBetween(1, 2));
} while (startedShards > 0);
if (numDocs == 0 || randomBoolean()) {
// in the case we have no indexing, we simulate the background global checkpoint sync
shards.getPrimary().updateGlobalCheckpointOnPrimary();
}
for (IndexShard shard : shards) {
final SeqNoStats shardStats = shard.seqNoStats();
final ShardRouting shardRouting = shard.routingEntry();
logger.debug("seq_no stats for {}: {}", shardRouting, XContentHelper.toString(shardStats,
new ToXContent.MapParams(Collections.singletonMap("pretty", "false"))));
assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L));
final Matcher<Long> globalCheckpointMatcher;
if (shardRouting.primary()) {
globalCheckpointMatcher = equalTo(numDocs - 1L);
} else {
// nocommit: removed once fixed
globalCheckpointMatcher = anyOf(equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO), equalTo(numDocs - 1L));
}
assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), globalCheckpointMatcher);
assertThat(shardRouting + " max seq no mismatch", shardStats.getMaxSeqNo(), equalTo(numDocs - 1L));
}
}
}
}

View File

@ -109,14 +109,13 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public FinalizeResponse finalizeRecovery() {
public void finalizeRecovery() {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
return super.finalizeRecovery();
super.finalizeRecovery();
}
}
}

View File

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matcher;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
@TestLogging("index.shard:TRACE,index.seqno:TRACE")
public class CheckpointsIT extends ESIntegTestCase {
@AwaitsFix(bugUrl = "boaz working om this.")
public void testCheckpointsAdvance() throws Exception {
prepareCreate("test").setSettings(
"index.seq_no.checkpoint_sync_interval", "100ms", // update global point frequently
"index.number_of_shards", "1" // simplify things so we know how many ops goes to the shards
).get();
final List<IndexRequestBuilder> builders = new ArrayList<>();
final long numDocs = scaledRandomIntBetween(0, 100);
logger.info("--> will index [{}] docs", numDocs);
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex("test", "type", "id_" + i).setSource("{}"));
}
indexRandom(randomBoolean(), false, builders);
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().get();
for (ShardStats shardStats : stats.getShards()) {
logger.debug("seq_no stats for {}: {}", shardStats.getShardRouting(),
XContentHelper.toString(shardStats.getSeqNoStats(),
new ToXContent.MapParams(Collections.singletonMap("pretty", "false"))));
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(numDocs - 1));
final Matcher<Long> globalCheckpointMatcher;
if (shardStats.getShardRouting().primary()) {
globalCheckpointMatcher = equalTo(numDocs - 1);
} else {
// nocommit: removed once fixed
globalCheckpointMatcher = anyOf(equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO), equalTo(numDocs - 1));
}
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
shardStats.getSeqNoStats().getGlobalCheckpoint(), globalCheckpointMatcher);
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1));
}
});
}
}

View File

@ -19,18 +19,26 @@
package org.elasticsearch.index.seqno;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
public class GlobalCheckpointTests extends ESTestCase {
@ -41,89 +49,74 @@ public class GlobalCheckpointTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), SequenceNumbersService.UNASSIGNED_SEQ_NO);
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO);
}
public void testEmptyShards() {
assertFalse("checkpoint shouldn't be updated when the are no active shards", checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}
private final AtomicInteger aIdGenerator = new AtomicInteger();
private Map<String, Long> randomAllocationsWithLocalCheckpoints(int min, int max) {
Map<String, Long> allocations = new HashMap<>();
for (int i = randomIntBetween(min, max); i > 0; i--) {
allocations.put("id_" + aIdGenerator.incrementAndGet(), (long) randomInt(1000));
}
return allocations;
}
public void testGlobalCheckpointUpdate() {
Map<String, Long> allocations = new HashMap<>();
Set<String> active = new HashSet<>();
Set<String> insync = new HashSet<>();
Set<String> tracking = new HashSet<>();
long maxLocalCheckpoint = Long.MAX_VALUE;
for (int i = randomIntBetween(3, 10); i > 0; i--) {
String id = "id_" + i + "_" + randomAsciiOfLength(5);
long localCheckpoint = randomInt(200);
switch (randomInt(2)) {
case 0:
active.add(id);
maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint);
break;
case 1:
insync.add(id);
maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint);
break;
case 2:
tracking.add(id);
break;
default:
throw new IllegalStateException("you messed up your numbers, didn't you?");
}
allocations.put(id, localCheckpoint);
}
Map<String, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> active = new HashSet<>(activeWithCheckpoints.keySet());
allocations.putAll(activeWithCheckpoints);
Map<String, Long> initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(0, 5);
Set<String> initializing = new HashSet<>(initializingWithCheckpoints.keySet());
allocations.putAll(initializingWithCheckpoints);
assertThat(allocations.size(), equalTo(active.size() + initializing.size()));
if (maxLocalCheckpoint == Long.MAX_VALUE) {
// note: this state can not happen in practice as we always have at least one primary shard active/in sync
// note: allocations can never be empty in practice as we always have at least one primary shard active/in sync
// it is however nice not to assume this on this level and check we do the right thing.
maxLocalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
final long maxLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
logger.info("--> using allocations");
allocations.keySet().stream().forEach(aId -> {
allocations.keySet().forEach(aId -> {
final String type;
if (active.contains(aId)) {
type = "active";
} else if (insync.contains(aId)) {
type = "insync";
} else if (tracking.contains(aId)) {
type = "tracked";
} else if (initializing.contains(aId)) {
type = "init";
} else {
throw new IllegalStateException(aId + " not found in any map");
}
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
Set<String> initializing = new HashSet<>(insync);
initializing.addAll(tracking);
checkpointService.updateAllocationIdsFromMaster(active, initializing);
allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
initializing.forEach(aId -> checkpointService.markAllocationIdAsInSync(aId));
allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
// make sure insync allocation count
insync.stream().forEach(aId -> checkpointService.markAllocationIdAsInSync(aId, randomBoolean() ? 0 : allocations.get(aId)));
assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint));
// increment checkpoints
active.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
insync.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId)));
// now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested.
final String extraId = "extra_" + randomAsciiOfLength(5);
// first check that adding it without the master blessing doesn't change anything.
checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4));
assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(UNASSIGNED_SEQ_NO));
Set<String> newActive = new HashSet<>(active);
newActive.add(extraId);
@ -140,4 +133,112 @@ public class GlobalCheckpointTests extends ESTestCase {
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), greaterThan(maxLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> assigned = new HashMap<>();
assigned.putAll(active);
assigned.putAll(initializing);
checkpointService.updateAllocationIdsFromMaster(
new HashSet<>(randomSubsetOf(randomInt(active.size() - 1), active.keySet())),
initializing.keySet());
randomSubsetOf(initializing.keySet()).forEach(checkpointService::markAllocationIdAsInSync);
assigned.forEach(checkpointService::updateLocalCheckpoint);
// now mark all active shards
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
// global checkpoint can't be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
assigned.forEach(checkpointService::updateLocalCheckpoint);
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(0, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
randomSubsetOf(randomInt(initializing.size() - 1),
initializing.keySet()).forEach(aId -> checkpointService.updateLocalCheckpoint(aId, initializing.get(aId)));
active.forEach(checkpointService::updateLocalCheckpoint);
// global checkpoint can't be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
// update again
initializing.forEach(checkpointService::updateLocalCheckpoint);
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map<String, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
checkpointService.updateAllocationIdsFromMaster(active.keySet(), initializing.keySet());
initializing.keySet().forEach(checkpointService::markAllocationIdAsInSync);
nonApproved.keySet().forEach(checkpointService::markAllocationIdAsInSync);
List<Map<String, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
Collections.shuffle(allocations, random());
allocations.forEach(a -> a.forEach(checkpointService::updateLocalCheckpoint));
// global checkpoint can be advanced, but we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
final Map<String, Long> activeToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToStay = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> activeToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<String, Long> initializingToBeRemoved = randomAllocationsWithLocalCheckpoints(1, 5);
final Set<String> active = Sets.union(activeToStay.keySet(), activeToBeRemoved.keySet());
final Set<String> initializing = Sets.union(initializingToStay.keySet(), initializingToBeRemoved.keySet());
final Map<String, Long> allocations = new HashMap<>();
allocations.putAll(activeToStay);
if (randomBoolean()) {
allocations.putAll(activeToBeRemoved);
}
allocations.putAll(initializingToStay);
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
checkpointService.updateAllocationIdsFromMaster(active, initializing);
if (randomBoolean()) {
initializingToStay.keySet().forEach(checkpointService::markAllocationIdAsInSync);
} else {
initializing.forEach(checkpointService::markAllocationIdAsInSync);
}
if (randomBoolean()) {
allocations.forEach(checkpointService::updateLocalCheckpoint);
}
// global checkpoint may be advanced, but we need a sync in any case
assertTrue(checkpointService.updateCheckpointOnPrimary());
// now remove shards
if (randomBoolean()) {
checkpointService.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
allocations.forEach((aid, ckp) -> checkpointService.updateLocalCheckpoint(aid, ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> checkpointService.updateLocalCheckpoint(aid, ckp + 10L));
checkpointService.updateAllocationIdsFromMaster(activeToStay.keySet(), initializingToStay.keySet());
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
.min(Long::compare).get() + 10; // we added 10 to make sure it's advanced in the second time
// global checkpoint is advanced and we need a sync
assertTrue(checkpointService.updateCheckpointOnPrimary());
assertThat(checkpointService.getCheckpoint(), equalTo(checkpoint));
}
}

View File

@ -1043,7 +1043,7 @@ public class IndexShardTests extends IndexShardTestCase {
};
closeShards(shard);
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper);
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {});
recoveryShardFromStore(newShard);
@ -1183,7 +1183,7 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper);
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {});
recoveryShardFromStore(newShard);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -50,8 +51,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@ -82,8 +85,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
* @param state cluster state used for matching
*/
public void assertClusterStateMatchesNodeState(ClusterState state, IndicesClusterStateService indicesClusterStateService) {
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService =
indicesClusterStateService.indicesService;
MockIndicesService indicesService = (MockIndicesService) indicesClusterStateService.indicesService;
ConcurrentMap<ShardId, ShardRouting> failedShardsCache = indicesClusterStateService.failedShardsCache;
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.getNodes().getLocalNodeId());
if (localRoutingNode != null) {
@ -95,7 +97,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
Index index = shardRouting.index();
IndexMetaData indexMetaData = state.metaData().getIndexSafe(index);
Shard shard = indicesService.getShardOrNull(shardRouting.shardId());
MockIndexShard shard = indicesService.getShardOrNull(shardRouting.shardId());
ShardRouting failedShard = failedShardsCache.get(shardRouting.shardId());
if (enableRandomFailures) {
if (shard == null && failedShard == null) {
@ -122,6 +124,17 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
// shard has latest shard routing
assertThat(shard.routingEntry(), equalTo(shardRouting));
}
if (shard.routingEntry().primary() && shard.routingEntry().active()) {
IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shard.shardId());
Set<String> activeIds = shardRoutingTable.activeShards().stream()
.map(r -> r.allocationId().getId()).collect(Collectors.toSet());
Set<String> initializingIds = shardRoutingTable.getAllInitializingShards().stream()
.map(r -> r.allocationId().getId()).collect(Collectors.toSet());
assertThat(shard.routingEntry() + " isn't updated with active aIDs", shard.activeAllocationIds, equalTo(activeIds));
assertThat(shard.routingEntry() + " isn't updated with init aIDs", shard.initializingAllocationIds,
equalTo(initializingIds));
}
}
}
}
@ -305,6 +318,8 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
protected class MockIndexShard implements IndicesClusterStateService.Shard {
private volatile ShardRouting shardRouting;
private volatile RecoveryState recoveryState;
private volatile Set<String> activeAllocationIds;
private volatile Set<String> initializingAllocationIds;
public MockIndexShard(ShardRouting shardRouting) {
this.shardRouting = shardRouting;
@ -337,5 +352,11 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
assert this.shardRouting.isSameAllocation(shardRouting);
this.shardRouting = shardRouting;
}
@Override
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
this.activeAllocationIds = activeAllocationIds;
this.initializingAllocationIds = initializingAllocationIds;
}
}
}

View File

@ -306,7 +306,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}).when(shard).relocated(any(String.class));
RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class);
when(targetHandler.finalizeRecovery()).thenReturn(new RecoveryTargetHandler.FinalizeResponse("_mock_", 1));
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
assertFalse(ensureClusterStateVersionCalled.get());

View File

@ -23,7 +23,12 @@ import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
@ -40,12 +45,14 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@ -73,11 +80,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.elasticsearch.index.IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -91,6 +101,8 @@ import static org.hamcrest.Matchers.startsWith;
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@TestLogging("_root:DEBUG,org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.index.shard.service:TRACE")
@LuceneTestCase.AwaitsFix(bugUrl = "primary relocation needs to transfer the global check point. otherwise the new primary sends a " +
"an unknown global checkpoint during sync, causing assertions to trigger")
public class RelocationIT extends ESIntegTestCase {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
@ -99,16 +111,53 @@ public class RelocationIT extends ESIntegTestCase {
return Arrays.asList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
}
@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings())
.put(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "200ms").build();
}
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
assertThat(primary.getShardRouting() + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)));
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
}
}
}
});
}
public void testSimpleRelocationNoIndexing() {
logger.info("--> starting [node1] ...");
final String node_1 = internalCluster().startNode();
logger.info("--> creating test index ...");
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder()
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0))
.execute().actionGet();
.put("index.number_of_replicas", 0)
).get();
logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
@ -158,10 +207,10 @@ public class RelocationIT extends ESIntegTestCase {
nodes[0] = internalCluster().startNode();
logger.info("--> creating test index ...");
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder()
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)).execute().actionGet();
.put("index.number_of_replicas", numberOfReplicas)
).get();
for (int i = 1; i < numberOfNodes; i++) {
@ -260,12 +309,11 @@ public class RelocationIT extends ESIntegTestCase {
nodes[0] = internalCluster().startNode();
logger.info("--> creating test index ...");
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder()
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", -1) // we want to control refreshes c
).execute().actionGet();
).get();
for (int i = 1; i < numberOfNodes; i++) {
logger.info("--> starting [node_{}] ...", i);
@ -349,8 +397,9 @@ public class RelocationIT extends ESIntegTestCase {
final String p_node = internalCluster().startNode();
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get();
prepareCreate(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
internalCluster().startNodesAsync(2).get();
@ -383,9 +432,7 @@ public class RelocationIT extends ESIntegTestCase {
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")));
logger.info("--> wait for all replica shards to be removed, on all nodes");
assertBusy(new Runnable() {
@Override
public void run() {
assertBusy(() -> {
for (String node : internalCluster().getNodeNames()) {
if (node.equals(p_node)) {
continue;
@ -394,7 +441,6 @@ public class RelocationIT extends ESIntegTestCase {
assertThat(node + " indicates assigned replicas",
state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
}
}
});
logger.info("--> verifying no temporary recoveries are left");
@ -402,9 +448,7 @@ public class RelocationIT extends ESIntegTestCase {
NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node);
for (final Path shardLoc : nodeEnvironment.availableShardPaths(new ShardId(indexName, "_na_", 0))) {
if (Files.exists(shardLoc)) {
assertBusy(new Runnable() {
@Override
public void run() {
assertBusy(() -> {
try {
Files.walkFileTree(shardLoc, new SimpleFileVisitor<Path>() {
@Override
@ -416,7 +460,6 @@ public class RelocationIT extends ESIntegTestCase {
} catch (IOException e) {
throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
}
}
});
}
}
@ -435,7 +478,7 @@ public class RelocationIT extends ESIntegTestCase {
logger.info("red nodes: {}", redFuture.get());
ensureStableCluster(halfNodes * 2);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
assertAcked(prepareCreate("test", Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))

View File

@ -194,9 +194,26 @@ public abstract class IndexShardTestCase extends ESTestCase {
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper);
return newShard(shardRouting, indexMetaData, searcherWrapper, () -> {});
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* supplied node id.
*
* @param shardId the shard id to use
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
*/
protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData,
Runnable globalCheckpointSyncer,
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, globalCheckpointSyncer);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
@ -207,7 +224,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, listeners);
return newShard(routing, indexMetaData, null, () -> {}, listeners);
}
/**
@ -217,16 +234,18 @@ public abstract class IndexShardTestCase extends ESTestCase {
* @param routing shard routing to use
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer an runnable to run when the global check point needs syncing
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingOperationListener... listeners)
@Nullable IndexSearcherWrapper indexSearcherWrapper, Runnable globalCheckpointSyncer,
IndexingOperationListener... listeners)
throws IOException {
// add node id as name to settings for popper logging
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, listeners);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, globalCheckpointSyncer, listeners);
}
/**
@ -240,6 +259,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
Runnable globalCheckpointSyncer,
IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
@ -263,7 +283,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
new NoneCircuitBreakerService(), mapperService);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, indexCache, mapperService, similarityService,
indexFieldDataService, null, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer,
() -> {}, Collections.emptyList(), Arrays.asList(listeners));
globalCheckpointSyncer, Collections.emptyList(), Arrays.asList(listeners));
success = true;
} finally {
if (success == false) {
@ -293,7 +313,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, listeners);
return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null,
current.getGlobalCheckpointSyncer(), listeners);
}
/**

View File

@ -28,14 +28,8 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -64,6 +58,7 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -73,6 +68,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -99,9 +95,11 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
@ -119,6 +117,7 @@ import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.NodeMocksPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.client.RandomizingClient;
@ -126,6 +125,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
@ -575,7 +575,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
return Collections.emptySet();
}
protected void beforeIndexDeletion() {
protected void beforeIndexDeletion() throws Exception {
cluster().beforeIndexDeletion();
}
@ -714,7 +714,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
*/
public final CreateIndexRequestBuilder prepareCreate(String index) {
return client().admin().indices().prepareCreate(index).setSettings(indexSettings());
return prepareCreate(index, -1);
}
/**
@ -730,6 +730,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
return prepareCreate(index, numNodes, Settings.builder());
}
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}, augmented
* by the given builder
*/
public CreateIndexRequestBuilder prepareCreate(String index, Settings.Builder settingsBuilder) {
return prepareCreate(index, -1, settingsBuilder);
}
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
* The index that is created with this builder will only be allowed to allocate on the number of nodes passed to this
@ -740,11 +747,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
* </p>
*/
public CreateIndexRequestBuilder prepareCreate(String index, int numNodes, Settings.Builder settingsBuilder) {
internalCluster().ensureAtLeastNumDataNodes(numNodes);
Settings.Builder builder = Settings.builder().put(indexSettings()).put(settingsBuilder.build());
if (numNodes > 0) {
internalCluster().ensureAtLeastNumDataNodes(numNodes);
getExcludeSettings(index, numNodes, builder);
}
return client().admin().indices().prepareCreate(index).setSettings(builder.build());

View File

@ -649,7 +649,7 @@ public abstract class ESTestCase extends LuceneTestCase {
* Returns a random subset of values (including a potential empty list)
*/
public static <T> List<T> randomSubsetOf(Collection<T> collection) {
return randomSubsetOf(randomInt(collection.size() - 1), collection);
return randomSubsetOf(randomInt(Math.max(collection.size() - 1, 0)), collection);
}
/**