Reformat some classes in the index universe

This commit reformats some classes in the index universe with the
purpose of breaking some long method definitions and invocations into a
line per parameter. This has the advantage that for an upcoming change
to these definitions and invocations, the diff for that change will be a
single line per definition or invocation. That makes these sorts of
changes easier to read.
This commit is contained in:
Jason Tedor 2019-01-14 21:43:56 -05:00
parent c0368a2086
commit e11a32eda8
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
8 changed files with 201 additions and 108 deletions

View File

@ -382,11 +382,25 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId),
circuitBreakerService);
indexShard = new IndexShard(
routing,
this.indexSettings,
path,
store,
indexSortSupplier,
indexCache,
mapperService,
similarityService,
engineFactory,
eventListener,
searcherWrapper,
threadPool,
bigArrays,
engineWarmer,
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

View File

@ -249,24 +249,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
public IndexShard(
ShardRouting shardRouting,
IndexSettings indexSettings,
ShardPath path,
Store store,
Supplier<Sort> indexSortSupplier,
IndexCache indexCache,
MapperService mapperService,
SimilarityService similarityService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener,
IndexSearcherWrapper indexSearcherWrapper,
ThreadPool threadPool,
BigArrays bigArrays,
Engine.Warmer warmer,
List<SearchOperationListener> searchOperationListener,
List<IndexingOperationListener> listeners,
Runnable globalCheckpointSyncer,
CircuitBreakerService circuitBreakerService) throws IOException {
final ShardRouting shardRouting,
final IndexSettings indexSettings,
final ShardPath path,
final Store store,
final Supplier<Sort> indexSortSupplier,
final IndexCache indexCache,
final MapperService mapperService,
final SimilarityService similarityService,
final @Nullable EngineFactory engineFactory,
final IndexEventListener indexEventListener,
final IndexSearcherWrapper indexSearcherWrapper,
final ThreadPool threadPool,
final BigArrays bigArrays,
final Engine.Warmer warmer,
final List<SearchOperationListener> searchOperationListener,
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final CircuitBreakerService circuitBreakerService) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;

View File

@ -586,10 +586,14 @@ public class IndicesService extends AbstractLifecycleComponent
}
@Override
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
public IndexShard createShard(
final ShardRouting shardRouting,
final RecoveryState recoveryState,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -122,42 +121,54 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
@Inject
public IndicesClusterStateService(Settings settings,
IndicesService indicesService,
ClusterService clusterService,
ThreadPool threadPool,
PeerRecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService,
SearchService searchService,
SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService,
SnapshotShardsService snapshotShardsService,
PrimaryReplicaSyncer primaryReplicaSyncer,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard);
public IndicesClusterStateService(
final Settings settings,
final IndicesService indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
final SearchService searchService,
final SyncedFlushService syncedFlushService,
final PeerRecoverySourceService peerRecoverySourceService,
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction) {
this(
settings,
(AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService,
threadPool,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
repositoriesService,
searchService,
syncedFlushService,
peerRecoverySourceService,
snapshotShardsService,
primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard);
}
// for tests
IndicesClusterStateService(Settings settings,
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
ClusterService clusterService,
ThreadPool threadPool,
PeerRecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService,
SearchService searchService,
SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService,
SnapshotShardsService snapshotShardsService,
PrimaryReplicaSyncer primaryReplicaSyncer,
Consumer<ShardId> globalCheckpointSyncer) {
IndicesClusterStateService(
final Settings settings,
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
final SearchService searchService,
final SyncedFlushService syncedFlushService,
final PeerRecoverySourceService peerRecoverySourceService,
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final Consumer<ShardId> globalCheckpointSyncer) {
super(settings);
this.settings = settings;
this.buildInIndexListener =
@ -557,8 +568,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try {
logger.debug("{} creating shard", shardRouting.shardId());
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
repositoriesService, failedShardHandler, globalCheckpointSyncer);
indicesService.createShard(
shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
@ -843,12 +860,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
@Nullable U indexService(Index index);
/**
* Creates shard for the specified shard routing and starts recovery,
* Creates a shard for the specified shard routing and starts recovery.
*
* @param shardRouting the shard routing
* @param recoveryState the recovery state
* @param recoveryTargetService recovery service for the target
* @param recoveryListener a callback when recovery changes state (finishes or fails)
* @param repositoriesService service responsible for snapshot/restore
* @param onShardFailure a callback when this shard fails
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
* @return a new shard
* @throws IOException if an I/O exception occurs when creating the shard
*/
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
T createShard(
ShardRouting shardRouting,
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
/**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.

View File

@ -638,15 +638,31 @@ public class IndexShardIT extends ESSingleNodeTestCase {
return newShard;
}
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException {
public static final IndexShard newIndexShard(
final IndexService indexService,
final IndexShard shard,IndexSearcherWrapper wrapper,
final CircuitBreakerService cbs,
final IndexingOperationListener... listeners) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(),
indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners),
() -> {}, cbs);
return newShard;
return new IndexShard(
initializingShardRouting,
indexService.getIndexSettings(),
shard.shardPath(),
shard.store(),
indexService.getIndexSortSupplier(),
indexService.cache(),
indexService.mapperService(),
indexService.similarityService(),
shard.getEngineFactory(),
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
indexService.getBigArrays(),
null,
Collections.emptyList(),
Arrays.asList(listeners),
() -> {},
cbs);
}
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {

View File

@ -226,12 +226,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
public MockIndexShard createShard(
final ShardRouting shardRouting,
final RecoveryState recoveryState,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
failRandomly();
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting);

View File

@ -467,28 +467,39 @@ public class SnapshotsServiceTests extends ESTestCase {
deterministicTaskQueue.getThreadPool()
);
indicesClusterStateService = new IndicesClusterStateService(
settings, indicesService, clusterService, threadPool,
new PeerRecoveryTargetService(
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings,
clusterService
),
shardStateAction,
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
repositoriesService,
mock(SearchService.class),
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
snapshotShardsService,
new PrimaryReplicaSyncer(
transportService,
new TransportResyncReplicationAction(
settings, transportService, clusterService, indicesService, threadPool,
shardStateAction, actionFilters, indexNameExpressionResolver)
),
new GlobalCheckpointSyncAction(
settings, transportService, clusterService, indicesService, threadPool,
shardStateAction, actionFilters, indexNameExpressionResolver)
);
settings,
indicesService,
clusterService,
threadPool,
new PeerRecoveryTargetService(
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService),
shardStateAction,
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
repositoriesService,
mock(SearchService.class),
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
snapshotShardsService,
new PrimaryReplicaSyncer(
transportService,
new TransportResyncReplicationAction(
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver)),
new GlobalCheckpointSyncAction(
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver));
Map<Action, TransportAction> actions = new HashMap<>();
actions.put(CreateIndexAction.INSTANCE,
new TransportCreateIndexAction(

View File

@ -367,10 +367,25 @@ public abstract class IndexShardTestCase extends ESTestCase {
};
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService,
engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer,
breakerService);
indexShard = new IndexShard(
routing,
indexSettings,
shardPath,
store,
() -> null,
indexCache,
mapperService,
similarityService,
engineFactory,
indexEventListener,
indexSearcherWrapper,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
warmer,
Collections.emptyList(),
Arrays.asList(listeners),
globalCheckpointSyncer,
breakerService);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;
} finally {