Reformat some classes in the index universe
This commit reformats some classes in the index universe with the purpose of breaking some long method definitions and invocations into a line per parameter. This has the advantage that for an upcoming change to these definitions and invocations, the diff for that change will be a single line per definition or invocation. That makes these sorts of changes easier to read.
This commit is contained in:
parent
c0368a2086
commit
e11a32eda8
|
@ -382,10 +382,24 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
||||||
DirectoryService directoryService = indexStore.newDirectoryService(path);
|
DirectoryService directoryService = indexStore.newDirectoryService(path);
|
||||||
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
|
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
|
||||||
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
|
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
|
||||||
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
|
indexShard = new IndexShard(
|
||||||
indexCache, mapperService, similarityService, engineFactory,
|
routing,
|
||||||
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
|
this.indexSettings,
|
||||||
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId),
|
path,
|
||||||
|
store,
|
||||||
|
indexSortSupplier,
|
||||||
|
indexCache,
|
||||||
|
mapperService,
|
||||||
|
similarityService,
|
||||||
|
engineFactory,
|
||||||
|
eventListener,
|
||||||
|
searcherWrapper,
|
||||||
|
threadPool,
|
||||||
|
bigArrays,
|
||||||
|
engineWarmer,
|
||||||
|
searchOperationListeners,
|
||||||
|
indexingOperationListeners,
|
||||||
|
() -> globalCheckpointSyncer.accept(shardId),
|
||||||
circuitBreakerService);
|
circuitBreakerService);
|
||||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||||
eventListener.afterIndexShardCreated(indexShard);
|
eventListener.afterIndexShardCreated(indexShard);
|
||||||
|
|
|
@ -249,24 +249,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||||
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
|
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
|
||||||
|
|
||||||
public IndexShard(
|
public IndexShard(
|
||||||
ShardRouting shardRouting,
|
final ShardRouting shardRouting,
|
||||||
IndexSettings indexSettings,
|
final IndexSettings indexSettings,
|
||||||
ShardPath path,
|
final ShardPath path,
|
||||||
Store store,
|
final Store store,
|
||||||
Supplier<Sort> indexSortSupplier,
|
final Supplier<Sort> indexSortSupplier,
|
||||||
IndexCache indexCache,
|
final IndexCache indexCache,
|
||||||
MapperService mapperService,
|
final MapperService mapperService,
|
||||||
SimilarityService similarityService,
|
final SimilarityService similarityService,
|
||||||
@Nullable EngineFactory engineFactory,
|
final @Nullable EngineFactory engineFactory,
|
||||||
IndexEventListener indexEventListener,
|
final IndexEventListener indexEventListener,
|
||||||
IndexSearcherWrapper indexSearcherWrapper,
|
final IndexSearcherWrapper indexSearcherWrapper,
|
||||||
ThreadPool threadPool,
|
final ThreadPool threadPool,
|
||||||
BigArrays bigArrays,
|
final BigArrays bigArrays,
|
||||||
Engine.Warmer warmer,
|
final Engine.Warmer warmer,
|
||||||
List<SearchOperationListener> searchOperationListener,
|
final List<SearchOperationListener> searchOperationListener,
|
||||||
List<IndexingOperationListener> listeners,
|
final List<IndexingOperationListener> listeners,
|
||||||
Runnable globalCheckpointSyncer,
|
final Runnable globalCheckpointSyncer,
|
||||||
CircuitBreakerService circuitBreakerService) throws IOException {
|
final CircuitBreakerService circuitBreakerService) throws IOException {
|
||||||
super(shardRouting.shardId(), indexSettings);
|
super(shardRouting.shardId(), indexSettings);
|
||||||
assert shardRouting.initializing();
|
assert shardRouting.initializing();
|
||||||
this.shardRouting = shardRouting;
|
this.shardRouting = shardRouting;
|
||||||
|
|
|
@ -586,10 +586,14 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
|
public IndexShard createShard(
|
||||||
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
|
final ShardRouting shardRouting,
|
||||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
final RecoveryState recoveryState,
|
||||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
final PeerRecoveryTargetService recoveryTargetService,
|
||||||
|
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||||
|
final RepositoriesService repositoriesService,
|
||||||
|
final Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||||
|
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||||
ensureChangesAllowed();
|
ensureChangesAllowed();
|
||||||
IndexService indexService = indexService(shardRouting.index());
|
IndexService indexService = indexService(shardRouting.index());
|
||||||
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
|
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
@ -122,42 +121,54 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||||
private final PrimaryReplicaSyncer primaryReplicaSyncer;
|
private final PrimaryReplicaSyncer primaryReplicaSyncer;
|
||||||
private final Consumer<ShardId> globalCheckpointSyncer;
|
private final Consumer<ShardId> globalCheckpointSyncer;
|
||||||
|
|
||||||
@Inject
|
public IndicesClusterStateService(
|
||||||
public IndicesClusterStateService(Settings settings,
|
final Settings settings,
|
||||||
IndicesService indicesService,
|
final IndicesService indicesService,
|
||||||
ClusterService clusterService,
|
final ClusterService clusterService,
|
||||||
ThreadPool threadPool,
|
final ThreadPool threadPool,
|
||||||
PeerRecoveryTargetService recoveryTargetService,
|
final PeerRecoveryTargetService recoveryTargetService,
|
||||||
ShardStateAction shardStateAction,
|
final ShardStateAction shardStateAction,
|
||||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
final NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||||
RepositoriesService repositoriesService,
|
final RepositoriesService repositoriesService,
|
||||||
SearchService searchService,
|
final SearchService searchService,
|
||||||
SyncedFlushService syncedFlushService,
|
final SyncedFlushService syncedFlushService,
|
||||||
PeerRecoverySourceService peerRecoverySourceService,
|
final PeerRecoverySourceService peerRecoverySourceService,
|
||||||
SnapshotShardsService snapshotShardsService,
|
final SnapshotShardsService snapshotShardsService,
|
||||||
PrimaryReplicaSyncer primaryReplicaSyncer,
|
final PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||||
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
|
final GlobalCheckpointSyncAction globalCheckpointSyncAction) {
|
||||||
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
|
this(
|
||||||
clusterService, threadPool, recoveryTargetService, shardStateAction,
|
settings,
|
||||||
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
|
(AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
|
||||||
snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard);
|
clusterService,
|
||||||
|
threadPool,
|
||||||
|
recoveryTargetService,
|
||||||
|
shardStateAction,
|
||||||
|
nodeMappingRefreshAction,
|
||||||
|
repositoriesService,
|
||||||
|
searchService,
|
||||||
|
syncedFlushService,
|
||||||
|
peerRecoverySourceService,
|
||||||
|
snapshotShardsService,
|
||||||
|
primaryReplicaSyncer,
|
||||||
|
globalCheckpointSyncAction::updateGlobalCheckpointForShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for tests
|
// for tests
|
||||||
IndicesClusterStateService(Settings settings,
|
IndicesClusterStateService(
|
||||||
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
|
final Settings settings,
|
||||||
ClusterService clusterService,
|
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
|
||||||
ThreadPool threadPool,
|
final ClusterService clusterService,
|
||||||
PeerRecoveryTargetService recoveryTargetService,
|
final ThreadPool threadPool,
|
||||||
ShardStateAction shardStateAction,
|
final PeerRecoveryTargetService recoveryTargetService,
|
||||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
final ShardStateAction shardStateAction,
|
||||||
RepositoriesService repositoriesService,
|
final NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||||
SearchService searchService,
|
final RepositoriesService repositoriesService,
|
||||||
SyncedFlushService syncedFlushService,
|
final SearchService searchService,
|
||||||
PeerRecoverySourceService peerRecoverySourceService,
|
final SyncedFlushService syncedFlushService,
|
||||||
SnapshotShardsService snapshotShardsService,
|
final PeerRecoverySourceService peerRecoverySourceService,
|
||||||
PrimaryReplicaSyncer primaryReplicaSyncer,
|
final SnapshotShardsService snapshotShardsService,
|
||||||
Consumer<ShardId> globalCheckpointSyncer) {
|
final PrimaryReplicaSyncer primaryReplicaSyncer,
|
||||||
|
final Consumer<ShardId> globalCheckpointSyncer) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.buildInIndexListener =
|
this.buildInIndexListener =
|
||||||
|
@ -557,8 +568,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||||
try {
|
try {
|
||||||
logger.debug("{} creating shard", shardRouting.shardId());
|
logger.debug("{} creating shard", shardRouting.shardId());
|
||||||
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
|
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
|
||||||
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
|
indicesService.createShard(
|
||||||
repositoriesService, failedShardHandler, globalCheckpointSyncer);
|
shardRouting,
|
||||||
|
recoveryState,
|
||||||
|
recoveryTargetService,
|
||||||
|
new RecoveryListener(shardRouting),
|
||||||
|
repositoriesService,
|
||||||
|
failedShardHandler,
|
||||||
|
globalCheckpointSyncer);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
|
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
|
||||||
}
|
}
|
||||||
|
@ -843,10 +860,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||||
@Nullable U indexService(Index index);
|
@Nullable U indexService(Index index);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates shard for the specified shard routing and starts recovery,
|
* Creates a shard for the specified shard routing and starts recovery.
|
||||||
|
*
|
||||||
|
* @param shardRouting the shard routing
|
||||||
|
* @param recoveryState the recovery state
|
||||||
|
* @param recoveryTargetService recovery service for the target
|
||||||
|
* @param recoveryListener a callback when recovery changes state (finishes or fails)
|
||||||
|
* @param repositoriesService service responsible for snapshot/restore
|
||||||
|
* @param onShardFailure a callback when this shard fails
|
||||||
|
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
|
||||||
|
* @return a new shard
|
||||||
|
* @throws IOException if an I/O exception occurs when creating the shard
|
||||||
*/
|
*/
|
||||||
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
|
T createShard(
|
||||||
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
|
ShardRouting shardRouting,
|
||||||
|
RecoveryState recoveryState,
|
||||||
|
PeerRecoveryTargetService recoveryTargetService,
|
||||||
|
PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||||
|
RepositoriesService repositoriesService,
|
||||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
|
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -638,15 +638,31 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
||||||
return newShard;
|
return newShard;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
|
public static final IndexShard newIndexShard(
|
||||||
CircuitBreakerService cbs, IndexingOperationListener... listeners) throws IOException {
|
final IndexService indexService,
|
||||||
|
final IndexShard shard,IndexSearcherWrapper wrapper,
|
||||||
|
final CircuitBreakerService cbs,
|
||||||
|
final IndexingOperationListener... listeners) throws IOException {
|
||||||
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
|
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
|
||||||
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
|
return new IndexShard(
|
||||||
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(),
|
initializingShardRouting,
|
||||||
indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
indexService.getIndexSettings(),
|
||||||
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners),
|
shard.shardPath(),
|
||||||
() -> {}, cbs);
|
shard.store(),
|
||||||
return newShard;
|
indexService.getIndexSortSupplier(),
|
||||||
|
indexService.cache(),
|
||||||
|
indexService.mapperService(),
|
||||||
|
indexService.similarityService(),
|
||||||
|
shard.getEngineFactory(),
|
||||||
|
indexService.getIndexEventListener(),
|
||||||
|
wrapper,
|
||||||
|
indexService.getThreadPool(),
|
||||||
|
indexService.getBigArrays(),
|
||||||
|
null,
|
||||||
|
Collections.emptyList(),
|
||||||
|
Arrays.asList(listeners),
|
||||||
|
() -> {},
|
||||||
|
cbs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
|
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
|
||||||
|
|
|
@ -226,12 +226,14 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState,
|
public MockIndexShard createShard(
|
||||||
PeerRecoveryTargetService recoveryTargetService,
|
final ShardRouting shardRouting,
|
||||||
PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
final RecoveryState recoveryState,
|
||||||
RepositoriesService repositoriesService,
|
final PeerRecoveryTargetService recoveryTargetService,
|
||||||
Consumer<IndexShard.ShardFailure> onShardFailure,
|
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
|
||||||
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
final RepositoriesService repositoriesService,
|
||||||
|
final Consumer<IndexShard.ShardFailure> onShardFailure,
|
||||||
|
final Consumer<ShardId> globalCheckpointSyncer) throws IOException {
|
||||||
failRandomly();
|
failRandomly();
|
||||||
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
|
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
|
||||||
MockIndexShard indexShard = indexService.createShard(shardRouting);
|
MockIndexShard indexShard = indexService.createShard(shardRouting);
|
||||||
|
|
|
@ -467,11 +467,12 @@ public class SnapshotsServiceTests extends ESTestCase {
|
||||||
deterministicTaskQueue.getThreadPool()
|
deterministicTaskQueue.getThreadPool()
|
||||||
);
|
);
|
||||||
indicesClusterStateService = new IndicesClusterStateService(
|
indicesClusterStateService = new IndicesClusterStateService(
|
||||||
settings, indicesService, clusterService, threadPool,
|
settings,
|
||||||
|
indicesService,
|
||||||
|
clusterService,
|
||||||
|
threadPool,
|
||||||
new PeerRecoveryTargetService(
|
new PeerRecoveryTargetService(
|
||||||
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings,
|
deterministicTaskQueue.getThreadPool(), transportService, recoverySettings, clusterService),
|
||||||
clusterService
|
|
||||||
),
|
|
||||||
shardStateAction,
|
shardStateAction,
|
||||||
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
|
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
|
||||||
repositoriesService,
|
repositoriesService,
|
||||||
|
@ -482,13 +483,23 @@ public class SnapshotsServiceTests extends ESTestCase {
|
||||||
new PrimaryReplicaSyncer(
|
new PrimaryReplicaSyncer(
|
||||||
transportService,
|
transportService,
|
||||||
new TransportResyncReplicationAction(
|
new TransportResyncReplicationAction(
|
||||||
settings, transportService, clusterService, indicesService, threadPool,
|
settings,
|
||||||
shardStateAction, actionFilters, indexNameExpressionResolver)
|
transportService,
|
||||||
),
|
clusterService,
|
||||||
|
indicesService,
|
||||||
|
threadPool,
|
||||||
|
shardStateAction,
|
||||||
|
actionFilters,
|
||||||
|
indexNameExpressionResolver)),
|
||||||
new GlobalCheckpointSyncAction(
|
new GlobalCheckpointSyncAction(
|
||||||
settings, transportService, clusterService, indicesService, threadPool,
|
settings,
|
||||||
shardStateAction, actionFilters, indexNameExpressionResolver)
|
transportService,
|
||||||
);
|
clusterService,
|
||||||
|
indicesService,
|
||||||
|
threadPool,
|
||||||
|
shardStateAction,
|
||||||
|
actionFilters,
|
||||||
|
indexNameExpressionResolver));
|
||||||
Map<Action, TransportAction> actions = new HashMap<>();
|
Map<Action, TransportAction> actions = new HashMap<>();
|
||||||
actions.put(CreateIndexAction.INSTANCE,
|
actions.put(CreateIndexAction.INSTANCE,
|
||||||
new TransportCreateIndexAction(
|
new TransportCreateIndexAction(
|
||||||
|
|
|
@ -367,9 +367,24 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||||
};
|
};
|
||||||
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||||
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
|
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings);
|
||||||
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService,
|
indexShard = new IndexShard(
|
||||||
engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
|
routing,
|
||||||
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer,
|
indexSettings,
|
||||||
|
shardPath,
|
||||||
|
store,
|
||||||
|
() -> null,
|
||||||
|
indexCache,
|
||||||
|
mapperService,
|
||||||
|
similarityService,
|
||||||
|
engineFactory,
|
||||||
|
indexEventListener,
|
||||||
|
indexSearcherWrapper,
|
||||||
|
threadPool,
|
||||||
|
BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
|
warmer,
|
||||||
|
Collections.emptyList(),
|
||||||
|
Arrays.asList(listeners),
|
||||||
|
globalCheckpointSyncer,
|
||||||
breakerService);
|
breakerService);
|
||||||
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
|
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
|
||||||
success = true;
|
success = true;
|
||||||
|
|
Loading…
Reference in New Issue