Reformat some classes in the index universe

This commit reformats some classes in the index universe with the
purpose of breaking some long method definitions and invocations into a
line per parameter. This has the advantage that for an upcoming change
to these definitions and invocations, the diff for that change will be a
single line per definition or invocation. That makes these sorts of
changes easier to read.
This commit is contained in:
Jason Tedor 2019-01-14 21:43:56 -05:00
parent c0368a2086
commit e11a32eda8
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
8 changed files with 201 additions and 108 deletions

View File

@ -382,11 +382,25 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
DirectoryService directoryService = indexStore.newDirectoryService(path); DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock, store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))); new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, indexShard = new IndexShard(
indexCache, mapperService, similarityService, engineFactory, routing,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, this.indexSettings,
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), path,
circuitBreakerService); store,
indexSortSupplier,
indexCache,
mapperService,
similarityService,
engineFactory,
eventListener,
searcherWrapper,
threadPool,
bigArrays,
engineWarmer,
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard); eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

View File

@ -249,24 +249,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>(); private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
public IndexShard( public IndexShard(
ShardRouting shardRouting, final ShardRouting shardRouting,
IndexSettings indexSettings, final IndexSettings indexSettings,
ShardPath path, final ShardPath path,
Store store, final Store store,
Supplier<Sort> indexSortSupplier, final Supplier<Sort> indexSortSupplier,
IndexCache indexCache, final IndexCache indexCache,
MapperService mapperService, final MapperService mapperService,
SimilarityService similarityService, final SimilarityService similarityService,
@Nullable EngineFactory engineFactory, final @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, final IndexEventListener indexEventListener,
IndexSearcherWrapper indexSearcherWrapper, final IndexSearcherWrapper indexSearcherWrapper,
ThreadPool threadPool, final ThreadPool threadPool,
BigArrays bigArrays, final BigArrays bigArrays,
Engine.Warmer warmer, final Engine.Warmer warmer,
List<SearchOperationListener> searchOperationListener, final List<SearchOperationListener> searchOperationListener,
List<IndexingOperationListener> listeners, final List<IndexingOperationListener> listeners,
Runnable globalCheckpointSyncer, final Runnable globalCheckpointSyncer,
CircuitBreakerService circuitBreakerService) throws IOException { final CircuitBreakerService circuitBreakerService) throws IOException {
super(shardRouting.shardId(), indexSettings); super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing(); assert shardRouting.initializing();
this.shardRouting = shardRouting; this.shardRouting = shardRouting;

View File

@ -586,10 +586,14 @@ public class IndicesService extends AbstractLifecycleComponent
} }
@Override @Override
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, public IndexShard createShard(
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, final ShardRouting shardRouting,
Consumer<IndexShard.ShardFailure> onShardFailure, final RecoveryState recoveryState,
Consumer<ShardId> globalCheckpointSyncer) throws IOException { final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
ensureChangesAllowed(); ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index()); IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -122,42 +121,54 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final PrimaryReplicaSyncer primaryReplicaSyncer; private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer; private final Consumer<ShardId> globalCheckpointSyncer;
@Inject public IndicesClusterStateService(
public IndicesClusterStateService(Settings settings, final Settings settings,
IndicesService indicesService, final IndicesService indicesService,
ClusterService clusterService, final ClusterService clusterService,
ThreadPool threadPool, final ThreadPool threadPool,
PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction, final ShardStateAction shardStateAction,
NodeMappingRefreshAction nodeMappingRefreshAction, final NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, final RepositoriesService repositoriesService,
SearchService searchService, final SearchService searchService,
SyncedFlushService syncedFlushService, final SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, final PeerRecoverySourceService peerRecoverySourceService,
SnapshotShardsService snapshotShardsService, final SnapshotShardsService snapshotShardsService,
PrimaryReplicaSyncer primaryReplicaSyncer, final PrimaryReplicaSyncer primaryReplicaSyncer,
GlobalCheckpointSyncAction globalCheckpointSyncAction) { final GlobalCheckpointSyncAction globalCheckpointSyncAction) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService, this(
clusterService, threadPool, recoveryTargetService, shardStateAction, settings,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard); clusterService,
threadPool,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
repositoriesService,
searchService,
syncedFlushService,
peerRecoverySourceService,
snapshotShardsService,
primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard);
} }
// for tests // for tests
IndicesClusterStateService(Settings settings, IndicesClusterStateService(
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService, final Settings settings,
ClusterService clusterService, final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
ThreadPool threadPool, final ClusterService clusterService,
PeerRecoveryTargetService recoveryTargetService, final ThreadPool threadPool,
ShardStateAction shardStateAction, final PeerRecoveryTargetService recoveryTargetService,
NodeMappingRefreshAction nodeMappingRefreshAction, final ShardStateAction shardStateAction,
RepositoriesService repositoriesService, final NodeMappingRefreshAction nodeMappingRefreshAction,
SearchService searchService, final RepositoriesService repositoriesService,
SyncedFlushService syncedFlushService, final SearchService searchService,
PeerRecoverySourceService peerRecoverySourceService, final SyncedFlushService syncedFlushService,
SnapshotShardsService snapshotShardsService, final PeerRecoverySourceService peerRecoverySourceService,
PrimaryReplicaSyncer primaryReplicaSyncer, final SnapshotShardsService snapshotShardsService,
Consumer<ShardId> globalCheckpointSyncer) { final PrimaryReplicaSyncer primaryReplicaSyncer,
final Consumer<ShardId> globalCheckpointSyncer) {
super(settings); super(settings);
this.settings = settings; this.settings = settings;
this.buildInIndexListener = this.buildInIndexListener =
@ -557,8 +568,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try { try {
logger.debug("{} creating shard", shardRouting.shardId()); logger.debug("{} creating shard", shardRouting.shardId());
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), indicesService.createShard(
repositoriesService, failedShardHandler, globalCheckpointSyncer); shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer);
} catch (Exception e) { } catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
} }
@ -843,12 +860,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
@Nullable U indexService(Index index); @Nullable U indexService(Index index);
/** /**
* Creates shard for the specified shard routing and starts recovery, * Creates a shard for the specified shard routing and starts recovery.
*
* @param shardRouting the shard routing
* @param recoveryState the recovery state
* @param recoveryTargetService recovery service for the target
* @param recoveryListener a callback when recovery changes state (finishes or fails)
* @param repositoriesService service responsible for snapshot/restore
* @param onShardFailure a callback when this shard fails
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
* @return a new shard
* @throws IOException if an I/O exception occurs when creating the shard
*/ */
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, T createShard(
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, ShardRouting shardRouting,
Consumer<IndexShard.ShardFailure> onShardFailure, RecoveryState recoveryState,
Consumer<ShardId> globalCheckpointSyncer) throws IOException; PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
/** /**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>. * Returns shard for the specified id if it exists otherwise returns <code>null</code>.

View File

@ -638,15 +638,31 @@ public class IndexShardIT extends ESSingleNodeTestCase {
return newShard; return newShard;
} }
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, public static final IndexShard newIndexShard(
CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException { final IndexService indexService,
final IndexShard shard,IndexSearcherWrapper wrapper,
final CircuitBreakerService cbs,
final IndexingOperationListener... listeners) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), return new IndexShard(
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), initializingShardRouting,
indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexService.getIndexSettings(),
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), shard.shardPath(),
() -> {}, cbs); shard.store(),
return newShard; indexService.getIndexSortSupplier(),
indexService.cache(),
indexService.mapperService(),
indexService.similarityService(),
shard.getEngineFactory(),
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
indexService.getBigArrays(),
null,
Collections.emptyList(),
Arrays.asList(listeners),
() -> {},
cbs);
} }
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {

View File

@ -226,12 +226,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
} }
@Override @Override
public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, public MockIndexShard createShard(
PeerRecoveryTargetService recoveryTargetService, final ShardRouting shardRouting,
PeerRecoveryTargetService.RecoveryListener recoveryListener, final RecoveryState recoveryState,
RepositoriesService repositoriesService, final PeerRecoveryTargetService recoveryTargetService,
Consumer<IndexShard.ShardFailure> onShardFailure, final PeerRecoveryTargetService.RecoveryListener recoveryListener,
Consumer<ShardId> globalCheckpointSyncer) throws IOException { final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
failRandomly(); failRandomly();
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting); MockIndexShard indexShard = indexService.createShard(shardRouting);

View File

@ -467,28 +467,39 @@ public class SnapshotsServiceTests extends ESTestCase {
deterministicTaskQueue.getThreadPool() deterministicTaskQueue.getThreadPool()
); );
indicesClusterStateService = new IndicesClusterStateService( indicesClusterStateService = new IndicesClusterStateService(
settings, indicesService, clusterService, threadPool, settings,
new PeerRecoveryTargetService( indicesService,
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService,
clusterService threadPool,
), new PeerRecoveryTargetService(
shardStateAction, deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService),
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)), shardStateAction,
repositoriesService, new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
mock(SearchService.class), repositoriesService,
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), mock(SearchService.class),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings), new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
snapshotShardsService, new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
new PrimaryReplicaSyncer( snapshotShardsService,
transportService, new PrimaryReplicaSyncer(
new TransportResyncReplicationAction( transportService,
settings, transportService, clusterService, indicesService, threadPool, new TransportResyncReplicationAction(
shardStateAction, actionFilters, indexNameExpressionResolver) settings,
), transportService,
new GlobalCheckpointSyncAction( clusterService,
settings, transportService, clusterService, indicesService, threadPool, indicesService,
shardStateAction, actionFilters, indexNameExpressionResolver) threadPool,
); shardStateAction,
actionFilters,
indexNameExpressionResolver)),
new GlobalCheckpointSyncAction(
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver));
Map<Action, TransportAction> actions = new HashMap<>(); Map<Action, TransportAction> actions = new HashMap<>();
actions.put(CreateIndexAction.INSTANCE, actions.put(CreateIndexAction.INSTANCE,
new TransportCreateIndexAction( new TransportCreateIndexAction(

View File

@ -367,10 +367,25 @@ public abstract class IndexShardTestCase extends ESTestCase {
}; };
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService, indexShard = new IndexShard(
engineFactory, indexEventListener, indexSearcherWrapper, threadPool, routing,
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, indexSettings,
breakerService); shardPath,
store,
() -> null,
indexCache,
mapperService,
similarityService,
engineFactory,
indexEventListener,
indexSearcherWrapper,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
warmer,
Collections.emptyList(),
Arrays.asList(listeners),
globalCheckpointSyncer,
breakerService);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true; success = true;
} finally { } finally {