diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb887e9d19f..5c7bb900119 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -49,7 +49,6 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -360,7 +359,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Override public void updateShardState(final ShardRouting newRouting, final long newPrimaryTerm, - final CheckedBiConsumer, IOException> primaryReplicaSyncer, + final BiConsumer> primaryReplicaSyncer, final long applyingClusterStateVersion, final Set inSyncAllocationIds, final IndexShardRoutingTable routingTable, diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index bec263f5470..b340dbe82ce 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -78,8 +78,30 @@ public class PrimaryReplicaSyncer extends AbstractComponent { this.chunkSize = chunkSize; } - public void resync(IndexShard indexShard, ActionListener listener) throws IOException { - try (Translog.View view = indexShard.acquireTranslogView()) { + public void resync(IndexShard indexShard, ActionListener listener) { + final Translog.View view = indexShard.acquireTranslogView(); + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(ResyncTask resyncTask) { + try { + view.close(); + } catch (IOException e) { + onFailure(e); + } + listener.onResponse(resyncTask); + } + + @Override + public void onFailure(Exception e) { + try { + view.close(); + } catch (IOException inner) { + e.addSuppressed(inner); + } + listener.onFailure(e); + } + }; + try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = view.snapshot(startingSeqNo); ShardId shardId = indexShard.shardId(); @@ -96,16 +118,20 @@ public class PrimaryReplicaSyncer extends AbstractComponent { @Override public synchronized Translog.Operation next() throws IOException { - if (indexShard.state() != IndexShardState.STARTED) { - assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard"; - throw new IndexShardNotStartedException(shardId, indexShard.state()); + IndexShardState state = indexShard.state(); + if (state == IndexShardState.CLOSED) { + throw new IndexShardClosedException(shardId); + } else { + assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } return snapshot.next(); } }; resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, - startingSeqNo, listener); + startingSeqNo, wrappedListener); + } catch (Exception e) { + wrappedListener.onFailure(e); } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 84fa54e4613..3c1ee5b8412 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -41,7 +41,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -87,6 +86,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -746,7 +746,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple */ void updateShardState(ShardRouting shardRouting, long primaryTerm, - CheckedBiConsumer, IOException> primaryReplicaSyncer, + BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, IndexShardRoutingTable routingTable, diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d7652d10981..725d39279d2 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.shard; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -33,6 +34,7 @@ import org.elasticsearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.containsString; @@ -90,6 +92,49 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { closeShards(shard); } + public void testSyncerOnClosingShard() throws Exception { + IndexShard shard = newStartedShard(true); + AtomicBoolean syncActionCalled = new AtomicBoolean(); + CountDownLatch syncCalledLatch = new CountDownLatch(1); + PrimaryReplicaSyncer.SyncAction syncAction = + (request, parentTask, allocationId, primaryTerm, listener) -> { + logger.info("Sending off {} operations", request.getOperations().size()); + syncActionCalled.set(true); + syncCalledLatch.countDown(); + threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); + }; + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction); + syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately + + int numDocs = 10; + for (int i = 0; i < numDocs; i++) { + indexDoc(shard, "test", Integer.toString(i)); + } + + String allocationId = shard.routingEntry().allocationId().getId(); + shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), + new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); + + PlainActionFuture fut = new PlainActionFuture<>(); + threadPool.generic().execute(() -> { + try { + syncer.resync(shard, fut); + } catch (AlreadyClosedException ace) { + fut.onFailure(ace); + } + }); + if (randomBoolean()) { + syncCalledLatch.await(); + } + closeShards(shard); + try { + fut.actionGet(); + assertTrue("Sync action was not called", syncActionCalled.get()); + } catch (AlreadyClosedException | IndexShardClosedException ignored) { + // ignore + } + } + public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10), randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000)); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 3c97f26950e..208e7443c7d 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -54,8 +53,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -345,7 +344,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC @Override public void updateShardState(ShardRouting shardRouting, long newPrimaryTerm, - CheckedBiConsumer, IOException> primaryReplicaSyncer, + BiConsumer> primaryReplicaSyncer, long applyingClusterStateVersion, Set inSyncAllocationIds, IndexShardRoutingTable routingTable,