Disallow multiple concurrent recovery attempts for same target shard (#25428)
The primary shard uses the GlobalCheckPointTracker to track local checkpoint information of recovering and started replicas in order to calculate the global checkpoint. As the tracker is updated through recoveries as well, it is easier to reason about the tracker if we can ensure that there are no concurrent recovery attempts for the same target shard (which can happen in case of network disconnects).
This commit is contained in:
parent
8ae61c0fc4
commit
5d1e67c882
|
@ -63,7 +63,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
private final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
|
final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public PeerRecoverySourceService(Settings settings, TransportService transportService, IndicesService indicesService,
|
public PeerRecoverySourceService(Settings settings, TransportService transportService, IndicesService indicesService,
|
||||||
|
@ -137,7 +137,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class OngoingRecoveries {
|
final class OngoingRecoveries {
|
||||||
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
|
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
|
||||||
|
|
||||||
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
|
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
|
||||||
|
@ -192,6 +192,12 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
|
||||||
if (onNewRecoveryException != null) {
|
if (onNewRecoveryException != null) {
|
||||||
throw onNewRecoveryException;
|
throw onNewRecoveryException;
|
||||||
}
|
}
|
||||||
|
for (RecoverySourceHandler existingHandler : recoveryHandlers) {
|
||||||
|
if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
|
||||||
|
throw new DelayRecoveryException("recovery with same target already registered, waiting for " +
|
||||||
|
"previous recovery attempt to be cancelled or completed");
|
||||||
|
}
|
||||||
|
}
|
||||||
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
|
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
|
||||||
recoveryHandlers.add(handler);
|
recoveryHandlers.add(handler);
|
||||||
return handler;
|
return handler;
|
||||||
|
|
|
@ -129,6 +129,10 @@ public class RecoverySourceHandler {
|
||||||
this.response = new RecoveryResponse();
|
this.response = new RecoveryResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public StartRecoveryRequest getRequest() {
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* performs the recovery from the local engine to the target
|
* performs the recovery from the local engine to the target
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.indices.recovery;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
|
||||||
|
|
||||||
|
public void testDuplicateRecoveries() throws IOException {
|
||||||
|
IndexShard primary = newStartedShard(true);
|
||||||
|
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(Settings.EMPTY,
|
||||||
|
mock(TransportService.class), mock(IndicesService.class),
|
||||||
|
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||||
|
mock(ClusterService.class));
|
||||||
|
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
|
||||||
|
getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong());
|
||||||
|
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
|
||||||
|
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
|
||||||
|
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));
|
||||||
|
assertThat(delayRecoveryException.getMessage(), containsString("recovery with same target already registered"));
|
||||||
|
peerRecoverySourceService.ongoingRecoveries.remove(primary, handler);
|
||||||
|
// re-adding after removing previous attempt works
|
||||||
|
handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
|
||||||
|
peerRecoverySourceService.ongoingRecoveries.remove(primary, handler);
|
||||||
|
closeShards(primary);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue