diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 54bf5fa1aa1..6c01e61586d 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -382,11 +382,25 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust DirectoryService directoryService = indexStore.newDirectoryService(path); store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock, new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))); - indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, - indexCache, mapperService, similarityService, engineFactory, - eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, - searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), - circuitBreakerService); + indexShard = new IndexShard( + routing, + this.indexSettings, + path, + store, + indexSortSupplier, + indexCache, + mapperService, + similarityService, + engineFactory, + eventListener, + searcherWrapper, + threadPool, + bigArrays, + engineWarmer, + searchOperationListeners, + indexingOperationListeners, + () -> globalCheckpointSyncer.accept(shardId), + circuitBreakerService); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 270ce208191..c76c845328b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -249,24 +249,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); public IndexShard( - ShardRouting shardRouting, - IndexSettings indexSettings, - ShardPath path, - Store store, - Supplier indexSortSupplier, - IndexCache indexCache, - MapperService mapperService, - SimilarityService similarityService, - @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, - IndexSearcherWrapper indexSearcherWrapper, - ThreadPool threadPool, - BigArrays bigArrays, - Engine.Warmer warmer, - List searchOperationListener, - List listeners, - Runnable globalCheckpointSyncer, - CircuitBreakerService circuitBreakerService) throws IOException { + final ShardRouting shardRouting, + final IndexSettings indexSettings, + final ShardPath path, + final Store store, + final Supplier indexSortSupplier, + final IndexCache indexCache, + final MapperService mapperService, + final SimilarityService similarityService, + final @Nullable EngineFactory engineFactory, + final IndexEventListener indexEventListener, + final IndexSearcherWrapper indexSearcherWrapper, + final ThreadPool threadPool, + final BigArrays bigArrays, + final Engine.Warmer warmer, + final List searchOperationListener, + final List listeners, + final Runnable globalCheckpointSyncer, + final CircuitBreakerService circuitBreakerService) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index f0294535fa7..e3c1a77b843 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -586,10 +586,14 @@ public class IndicesService extends AbstractLifecycleComponent } @Override - public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, - PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure, - Consumer globalCheckpointSyncer) throws IOException { + public IndexShard createShard( + final ShardRouting shardRouting, + final RecoveryState recoveryState, + final PeerRecoveryTargetService recoveryTargetService, + final PeerRecoveryTargetService.RecoveryListener recoveryListener, + final RepositoriesService repositoriesService, + final Consumer onShardFailure, + final Consumer globalCheckpointSyncer) throws IOException { ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer); diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 701690ed1d4..337c66812da 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -122,42 +121,54 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; - @Inject - public IndicesClusterStateService(Settings settings, - IndicesService indicesService, - ClusterService clusterService, - ThreadPool threadPool, - PeerRecoveryTargetService recoveryTargetService, - ShardStateAction shardStateAction, - NodeMappingRefreshAction nodeMappingRefreshAction, - RepositoriesService repositoriesService, - SearchService searchService, - SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, - SnapshotShardsService snapshotShardsService, - PrimaryReplicaSyncer primaryReplicaSyncer, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { - this(settings, (AllocatedIndices>) indicesService, - clusterService, threadPool, recoveryTargetService, shardStateAction, - nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, - snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard); + public IndicesClusterStateService( + final Settings settings, + final IndicesService indicesService, + final ClusterService clusterService, + final ThreadPool threadPool, + final PeerRecoveryTargetService recoveryTargetService, + final ShardStateAction shardStateAction, + final NodeMappingRefreshAction nodeMappingRefreshAction, + final RepositoriesService repositoriesService, + final SearchService searchService, + final SyncedFlushService syncedFlushService, + final PeerRecoverySourceService peerRecoverySourceService, + final SnapshotShardsService snapshotShardsService, + final PrimaryReplicaSyncer primaryReplicaSyncer, + final GlobalCheckpointSyncAction globalCheckpointSyncAction) { + this( + settings, + (AllocatedIndices>) indicesService, + clusterService, + threadPool, + recoveryTargetService, + shardStateAction, + nodeMappingRefreshAction, + repositoriesService, + searchService, + syncedFlushService, + peerRecoverySourceService, + snapshotShardsService, + primaryReplicaSyncer, + globalCheckpointSyncAction::updateGlobalCheckpointForShard); } // for tests - IndicesClusterStateService(Settings settings, - AllocatedIndices> indicesService, - ClusterService clusterService, - ThreadPool threadPool, - PeerRecoveryTargetService recoveryTargetService, - ShardStateAction shardStateAction, - NodeMappingRefreshAction nodeMappingRefreshAction, - RepositoriesService repositoriesService, - SearchService searchService, - SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, - SnapshotShardsService snapshotShardsService, - PrimaryReplicaSyncer primaryReplicaSyncer, - Consumer globalCheckpointSyncer) { + IndicesClusterStateService( + final Settings settings, + final AllocatedIndices> indicesService, + final ClusterService clusterService, + final ThreadPool threadPool, + final PeerRecoveryTargetService recoveryTargetService, + final ShardStateAction shardStateAction, + final NodeMappingRefreshAction nodeMappingRefreshAction, + final RepositoriesService repositoriesService, + final SearchService searchService, + final SyncedFlushService syncedFlushService, + final PeerRecoverySourceService peerRecoverySourceService, + final SnapshotShardsService snapshotShardsService, + final PrimaryReplicaSyncer primaryReplicaSyncer, + final Consumer globalCheckpointSyncer) { super(settings); this.settings = settings; this.buildInIndexListener = @@ -557,8 +568,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple try { logger.debug("{} creating shard", shardRouting.shardId()); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); - indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), - repositoriesService, failedShardHandler, globalCheckpointSyncer); + indicesService.createShard( + shardRouting, + recoveryState, + recoveryTargetService, + new RecoveryListener(shardRouting), + repositoriesService, + failedShardHandler, + globalCheckpointSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -843,12 +860,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple @Nullable U indexService(Index index); /** - * Creates shard for the specified shard routing and starts recovery, + * Creates a shard for the specified shard routing and starts recovery. + * + * @param shardRouting the shard routing + * @param recoveryState the recovery state + * @param recoveryTargetService recovery service for the target + * @param recoveryListener a callback when recovery changes state (finishes or fails) + * @param repositoriesService service responsible for snapshot/restore + * @param onShardFailure a callback when this shard fails + * @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint + * @return a new shard + * @throws IOException if an I/O exception occurs when creating the shard */ - T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, - PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure, - Consumer globalCheckpointSyncer) throws IOException; + T createShard( + ShardRouting shardRouting, + RecoveryState recoveryState, + PeerRecoveryTargetService recoveryTargetService, + PeerRecoveryTargetService.RecoveryListener recoveryListener, + RepositoriesService repositoriesService, + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 475caf06e30..1bcb8cd2910 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -638,15 +638,31 @@ public class IndexShardIT extends ESSingleNodeTestCase { return newShard; } - public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, - CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException { + public static final IndexShard newIndexShard( + final IndexService indexService, + final IndexShard shard,IndexSearcherWrapper wrapper, + final CircuitBreakerService cbs, + final IndexingOperationListener... listeners) throws IOException { ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); - IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), - shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), - indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), - () -> {}, cbs); - return newShard; + return new IndexShard( + initializingShardRouting, + indexService.getIndexSettings(), + shard.shardPath(), + shard.store(), + indexService.getIndexSortSupplier(), + indexService.cache(), + indexService.mapperService(), + indexService.similarityService(), + shard.getEngineFactory(), + indexService.getIndexEventListener(), + wrapper, + indexService.getThreadPool(), + indexService.getBigArrays(), + null, + Collections.emptyList(), + Arrays.asList(listeners), + () -> {}, + cbs); } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index c480c571903..b4c3d651151 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -226,12 +226,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC } @Override - public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, - PeerRecoveryTargetService recoveryTargetService, - PeerRecoveryTargetService.RecoveryListener recoveryListener, - RepositoriesService repositoriesService, - Consumer onShardFailure, - Consumer globalCheckpointSyncer) throws IOException { + public MockIndexShard createShard( + final ShardRouting shardRouting, + final RecoveryState recoveryState, + final PeerRecoveryTargetService recoveryTargetService, + final PeerRecoveryTargetService.RecoveryListener recoveryListener, + final RepositoriesService repositoriesService, + final Consumer onShardFailure, + final Consumer globalCheckpointSyncer) throws IOException { failRandomly(); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); MockIndexShard indexShard = indexService.createShard(shardRouting); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index b4dbbef65cc..5cd75df22a4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -467,28 +467,39 @@ public class SnapshotsServiceTests extends ESTestCase { deterministicTaskQueue.getThreadPool() ); indicesClusterStateService = new IndicesClusterStateService( - settings, indicesService, clusterService, threadPool, - new PeerRecoveryTargetService( - deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, - clusterService - ), - shardStateAction, - new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)), - repositoriesService, - mock(SearchService.class), - new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), - new PeerRecoverySourceService(transportService, indicesService, recoverySettings), - snapshotShardsService, - new PrimaryReplicaSyncer( - transportService, - new TransportResyncReplicationAction( - settings, transportService, clusterService, indicesService, threadPool, - shardStateAction, actionFilters, indexNameExpressionResolver) - ), - new GlobalCheckpointSyncAction( - settings, transportService, clusterService, indicesService, threadPool, - shardStateAction, actionFilters, indexNameExpressionResolver) - ); + settings, + indicesService, + clusterService, + threadPool, + new PeerRecoveryTargetService( + deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService), + shardStateAction, + new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)), + repositoriesService, + mock(SearchService.class), + new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), + new PeerRecoverySourceService(transportService, indicesService, recoverySettings), + snapshotShardsService, + new PrimaryReplicaSyncer( + transportService, + new TransportResyncReplicationAction( + settings, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver)), + new GlobalCheckpointSyncAction( + settings, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver)); Map actions = new HashMap<>(); actions.put(CreateIndexAction.INSTANCE, new TransportCreateIndexAction( diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 273967509f4..4938b307e75 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -367,10 +367,25 @@ public abstract class IndexShardTestCase extends ESTestCase { }; ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); - indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService, - engineFactory, indexEventListener, indexSearcherWrapper, threadPool, - BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, - breakerService); + indexShard = new IndexShard( + routing, + indexSettings, + shardPath, + store, + () -> null, + indexCache, + mapperService, + similarityService, + engineFactory, + indexEventListener, + indexSearcherWrapper, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + warmer, + Collections.emptyList(), + Arrays.asList(listeners), + globalCheckpointSyncer, + breakerService); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally {