Reformat some classes in the index universe
This commit reformats some classes in the index universe with the purpose of breaking some long method definitions and invocations into a line per parameter. This has the advantage that for an upcoming change to these definitions and invocations, the diff for that change will be a single line per definition or invocation. That makes these sorts of changes easier to read.
This commit is contained in:
parent
c0368a2086
commit
e11a32eda8
|
@ -382,11 +382,25 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
DirectoryService directoryService = indexStore.newDirectoryService(path);
|
||||
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
|
||||
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
|
||||
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
|
||||
indexCache, mapperService, similarityService, engineFactory,
|
||||
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
|
||||
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId),
|
||||
circuitBreakerService);
|
||||
indexShard = new IndexShard(
|
||||
routing,
|
||||
this.indexSettings,
|
||||
path,
|
||||
store,
|
||||
indexSortSupplier,
|
||||
indexCache,
|
||||
mapperService,
|
||||
similarityService,
|
||||
engineFactory,
|
||||
eventListener,
|
||||
searcherWrapper,
|
||||
threadPool,
|
||||
bigArrays,
|
||||
engineWarmer,
|
||||
searchOperationListeners,
|
||||
indexingOperationListeners,
|
||||
() -> globalCheckpointSyncer.accept(shardId),
|
||||
circuitBreakerService);
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||
|
|
|
@ -249,24 +249,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
|
||||
|
||||
public IndexShard(
|
||||
ShardRouting shardRouting,
|
||||
IndexSettings indexSettings,
|
||||
ShardPath path,
|
||||
Store store,
|
||||
Supplier<Sort> indexSortSupplier,
|
||||
IndexCache indexCache,
|
||||
MapperService mapperService,
|
||||
SimilarityService similarityService,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener,
|
||||
IndexSearcherWrapper indexSearcherWrapper,
|
||||
ThreadPool threadPool,
|
||||
BigArrays bigArrays,
|
||||
Engine.Warmer warmer,
|
||||
List<SearchOperationListener> searchOperationListener,
|
||||
List<IndexingOperationListener> listeners,
|
||||
Runnable globalCheckpointSyncer,
|
||||
CircuitBreakerService circuitBreakerService) throws IOException {
|
||||
final ShardRouting shardRouting,
|
||||
final IndexSettings indexSettings,
|
||||
final ShardPath path,
|
||||
final Store store,
|
||||
final Supplier<Sort> indexSortSupplier,
|
||||
final IndexCache indexCache,
|
||||
final MapperService mapperService,
|
||||
final SimilarityService similarityService,
|
||||
final @Nullable EngineFactory engineFactory,
|
||||
final IndexEventListener indexEventListener,
|
||||
final IndexSearcherWrapper indexSearcherWrapper,
|
||||
final ThreadPool threadPool,
|
||||
final BigArrays bigArrays,
|
||||
final Engine.Warmer warmer,
|
||||
final List<SearchOperationListener> searchOperationListener,
|
||||
final List<IndexingOperationListener> listeners,
|
||||
final Runnable globalCheckpointSyncer,
|
||||
final CircuitBreakerService circuitBreakerService) throws IOException {
|
||||
super(shardRouting.shardId(), indexSettings);
|
||||
assert shardRouting.initializing();
|
||||
this.shardRouting = shardRouting;
|
||||
|
|
|
@ -586,10 +586,14 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
}
|
||||
|
||||
@Override
|
||||
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
|
||||
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
|
||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||
public IndexShard createShard(
|
||||
final ShardRouting shardRouting,
|
||||
final RecoveryState recoveryState,
|
||||
final PeerRecoveryTargetService recoveryTargetService,
|
||||
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||
final RepositoriesService repositoriesService,
|
||||
final Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||
ensureChangesAllowed();
|
||||
IndexService indexService = indexService(shardRouting.index());
|
||||
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -122,42 +121,54 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
private final PrimaryReplicaSyncer primaryReplicaSyncer;
|
||||
private final Consumer<ShardId> globalCheckpointSyncer;
|
||||
|
||||
@Inject
|
||||
public IndicesClusterStateService(Settings settings,
|
||||
IndicesService indicesService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
PeerRecoveryTargetService recoveryTargetService,
|
||||
ShardStateAction shardStateAction,
|
||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
RepositoriesService repositoriesService,
|
||||
SearchService searchService,
|
||||
SyncedFlushService syncedFlushService,
|
||||
PeerRecoverySourceService peerRecoverySourceService,
|
||||
SnapshotShardsService snapshotShardsService,
|
||||
PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
|
||||
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
|
||||
clusterService, threadPool, recoveryTargetService, shardStateAction,
|
||||
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
|
||||
snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard);
|
||||
public IndicesClusterStateService(
|
||||
final Settings settings,
|
||||
final IndicesService indicesService,
|
||||
final ClusterService clusterService,
|
||||
final ThreadPool threadPool,
|
||||
final PeerRecoveryTargetService recoveryTargetService,
|
||||
final ShardStateAction shardStateAction,
|
||||
final NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
final RepositoriesService repositoriesService,
|
||||
final SearchService searchService,
|
||||
final SyncedFlushService syncedFlushService,
|
||||
final PeerRecoverySourceService peerRecoverySourceService,
|
||||
final SnapshotShardsService snapshotShardsService,
|
||||
final PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||
final GlobalCheckpointSyncAction globalCheckpointSyncAction) {
|
||||
this(
|
||||
settings,
|
||||
(AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
|
||||
clusterService,
|
||||
threadPool,
|
||||
recoveryTargetService,
|
||||
shardStateAction,
|
||||
nodeMappingRefreshAction,
|
||||
repositoriesService,
|
||||
searchService,
|
||||
syncedFlushService,
|
||||
peerRecoverySourceService,
|
||||
snapshotShardsService,
|
||||
primaryReplicaSyncer,
|
||||
globalCheckpointSyncAction::updateGlobalCheckpointForShard);
|
||||
}
|
||||
|
||||
// for tests
|
||||
IndicesClusterStateService(Settings settings,
|
||||
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
PeerRecoveryTargetService recoveryTargetService,
|
||||
ShardStateAction shardStateAction,
|
||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
RepositoriesService repositoriesService,
|
||||
SearchService searchService,
|
||||
SyncedFlushService syncedFlushService,
|
||||
PeerRecoverySourceService peerRecoverySourceService,
|
||||
SnapshotShardsService snapshotShardsService,
|
||||
PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||
Consumer<ShardId> globalCheckpointSyncer) {
|
||||
IndicesClusterStateService(
|
||||
final Settings settings,
|
||||
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
|
||||
final ClusterService clusterService,
|
||||
final ThreadPool threadPool,
|
||||
final PeerRecoveryTargetService recoveryTargetService,
|
||||
final ShardStateAction shardStateAction,
|
||||
final NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
final RepositoriesService repositoriesService,
|
||||
final SearchService searchService,
|
||||
final SyncedFlushService syncedFlushService,
|
||||
final PeerRecoverySourceService peerRecoverySourceService,
|
||||
final SnapshotShardsService snapshotShardsService,
|
||||
final PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||
final Consumer<ShardId> globalCheckpointSyncer) {
|
||||
super(settings);
|
||||
this.settings = settings;
|
||||
this.buildInIndexListener =
|
||||
|
@ -557,8 +568,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
try {
|
||||
logger.debug("{} creating shard", shardRouting.shardId());
|
||||
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
|
||||
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
|
||||
repositoriesService, failedShardHandler, globalCheckpointSyncer);
|
||||
indicesService.createShard(
|
||||
shardRouting,
|
||||
recoveryState,
|
||||
recoveryTargetService,
|
||||
new RecoveryListener(shardRouting),
|
||||
repositoriesService,
|
||||
failedShardHandler,
|
||||
globalCheckpointSyncer);
|
||||
} catch (Exception e) {
|
||||
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
|
||||
}
|
||||
|
@ -843,12 +860,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
@Nullable U indexService(Index index);
|
||||
|
||||
/**
|
||||
* Creates shard for the specified shard routing and starts recovery,
|
||||
* Creates a shard for the specified shard routing and starts recovery.
|
||||
*
|
||||
* @param shardRouting the shard routing
|
||||
* @param recoveryState the recovery state
|
||||
* @param recoveryTargetService recovery service for the target
|
||||
* @param recoveryListener a callback when recovery changes state (finishes or fails)
|
||||
* @param repositoriesService service responsible for snapshot/restore
|
||||
* @param onShardFailure a callback when this shard fails
|
||||
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
|
||||
* @return a new shard
|
||||
* @throws IOException if an I/O exception occurs when creating the shard
|
||||
*/
|
||||
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
|
||||
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
|
||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
|
||||
T createShard(
|
||||
ShardRouting shardRouting,
|
||||
RecoveryState recoveryState,
|
||||
PeerRecoveryTargetService recoveryTargetService,
|
||||
PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||
RepositoriesService repositoriesService,
|
||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.
|
||||
|
|
|
@ -638,15 +638,31 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
return newShard;
|
||||
}
|
||||
|
||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
|
||||
CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException {
|
||||
public static final IndexShard newIndexShard(
|
||||
final IndexService indexService,
|
||||
final IndexShard shard,IndexSearcherWrapper wrapper,
|
||||
final CircuitBreakerService cbs,
|
||||
final IndexingOperationListener... listeners) throws IOException {
|
||||
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
|
||||
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(),
|
||||
indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
||||
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners),
|
||||
() -> {}, cbs);
|
||||
return newShard;
|
||||
return new IndexShard(
|
||||
initializingShardRouting,
|
||||
indexService.getIndexSettings(),
|
||||
shard.shardPath(),
|
||||
shard.store(),
|
||||
indexService.getIndexSortSupplier(),
|
||||
indexService.cache(),
|
||||
indexService.mapperService(),
|
||||
indexService.similarityService(),
|
||||
shard.getEngineFactory(),
|
||||
indexService.getIndexEventListener(),
|
||||
wrapper,
|
||||
indexService.getThreadPool(),
|
||||
indexService.getBigArrays(),
|
||||
null,
|
||||
Collections.emptyList(),
|
||||
Arrays.asList(listeners),
|
||||
() -> {},
|
||||
cbs);
|
||||
}
|
||||
|
||||
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
|
||||
|
|
|
@ -226,12 +226,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
|
|||
}
|
||||
|
||||
@Override
|
||||
public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState,
|
||||
PeerRecoveryTargetService recoveryTargetService,
|
||||
PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||
RepositoriesService repositoriesService,
|
||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||
public MockIndexShard createShard(
|
||||
final ShardRouting shardRouting,
|
||||
final RecoveryState recoveryState,
|
||||
final PeerRecoveryTargetService recoveryTargetService,
|
||||
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||
final RepositoriesService repositoriesService,
|
||||
final Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||
failRandomly();
|
||||
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
|
||||
MockIndexShard indexShard = indexService.createShard(shardRouting);
|
||||
|
|
|
@ -467,28 +467,39 @@ public class SnapshotsServiceTests extends ESTestCase {
|
|||
deterministicTaskQueue.getThreadPool()
|
||||
);
|
||||
indicesClusterStateService = new IndicesClusterStateService(
|
||||
settings, indicesService, clusterService, threadPool,
|
||||
new PeerRecoveryTargetService(
|
||||
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings,
|
||||
clusterService
|
||||
),
|
||||
shardStateAction,
|
||||
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
|
||||
repositoriesService,
|
||||
mock(SearchService.class),
|
||||
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
|
||||
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
|
||||
snapshotShardsService,
|
||||
new PrimaryReplicaSyncer(
|
||||
transportService,
|
||||
new TransportResyncReplicationAction(
|
||||
settings, transportService, clusterService, indicesService, threadPool,
|
||||
shardStateAction, actionFilters, indexNameExpressionResolver)
|
||||
),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings, transportService, clusterService, indicesService, threadPool,
|
||||
shardStateAction, actionFilters, indexNameExpressionResolver)
|
||||
);
|
||||
settings,
|
||||
indicesService,
|
||||
clusterService,
|
||||
threadPool,
|
||||
new PeerRecoveryTargetService(
|
||||
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService),
|
||||
shardStateAction,
|
||||
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
|
||||
repositoriesService,
|
||||
mock(SearchService.class),
|
||||
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
|
||||
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
|
||||
snapshotShardsService,
|
||||
new PrimaryReplicaSyncer(
|
||||
transportService,
|
||||
new TransportResyncReplicationAction(
|
||||
settings,
|
||||
transportService,
|
||||
clusterService,
|
||||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver)),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
transportService,
|
||||
clusterService,
|
||||
indicesService,
|
||||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
indexNameExpressionResolver));
|
||||
Map<Action, TransportAction> actions = new HashMap<>();
|
||||
actions.put(CreateIndexAction.INSTANCE,
|
||||
new TransportCreateIndexAction(
|
||||
|
|
|
@ -367,10 +367,25 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
};
|
||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
|
||||
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService,
|
||||
engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
|
||||
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer,
|
||||
breakerService);
|
||||
indexShard = new IndexShard(
|
||||
routing,
|
||||
indexSettings,
|
||||
shardPath,
|
||||
store,
|
||||
() -> null,
|
||||
indexCache,
|
||||
mapperService,
|
||||
similarityService,
|
||||
engineFactory,
|
||||
indexEventListener,
|
||||
indexSearcherWrapper,
|
||||
threadPool,
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
warmer,
|
||||
Collections.emptyList(),
|
||||
Arrays.asList(listeners),
|
||||
globalCheckpointSyncer,
|
||||
breakerService);
|
||||
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
|
||||
success = true;
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue