diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 7191e4517ab..d661713829a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -63,7 +63,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde private final ClusterService clusterService; - private final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); + final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries(); @Inject public PeerRecoverySourceService(Settings settings, TransportService transportService, IndicesService indicesService, @@ -137,7 +137,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde } } - private final class OngoingRecoveries { + final class OngoingRecoveries { private final Map ongoingRecoveries = new HashMap<>(); synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { @@ -192,6 +192,12 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde if (onNewRecoveryException != null) { throw onNewRecoveryException; } + for (RecoverySourceHandler existingHandler : recoveryHandlers) { + if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) { + throw new DelayRecoveryException("recovery with same target already registered, waiting for " + + "previous recovery attempt to be cancelled or completed"); + } + } RecoverySourceHandler handler = createRecoverySourceHandler(request, shard); recoveryHandlers.add(handler); return handler; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 3097c8e668f..6a39700545b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -129,6 +129,10 @@ public class RecoverySourceHandler { this.response = new RecoveryResponse(); } + public StartRecoveryRequest getRequest() { + return request; + } + /** * performs the recovery from the local engine to the target */ diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java new file mode 100644 index 00000000000..0e1f37c2878 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; + +public class PeerRecoverySourceServiceTests extends IndexShardTestCase { + + public void testDuplicateRecoveries() throws IOException { + IndexShard primary = newStartedShard(true); + PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(Settings.EMPTY, + mock(TransportService.class), mock(IndicesService.class), + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + mock(ClusterService.class)); + StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), + getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong()); + RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); + DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, + () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary)); + assertThat(delayRecoveryException.getMessage(), containsString("recovery with same target already registered")); + peerRecoverySourceService.ongoingRecoveries.remove(primary, handler); + // re-adding after removing previous attempt works + handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); + peerRecoverySourceService.ongoingRecoveries.remove(primary, handler); + closeShards(primary); + } +}