Some Cleanup in o.e.gateway Package (#42108) (#42568)

* Removing obvious dead code
* Removing redundant listener interface
This commit is contained in:
Armin Braun 2019-05-27 11:28:12 +02:00 committed by GitHub
parent a5ca20a250
commit 49767fc1e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 46 additions and 120 deletions

View File

@ -93,7 +93,7 @@ public abstract class BaseGatewayShardAllocator {
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
*/
protected List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
List<NodeAllocationResult> results = new ArrayList<>();
for (RoutingNode node : allocation.routingNodes()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
@ -163,14 +164,14 @@ public class DanglingIndicesState implements ClusterStateListener {
}
try {
allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())),
new LocalAllocateDangledIndices.Listener() {
new ActionListener<LocalAllocateDangledIndices.AllocateDangledResponse>() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
logger.trace("allocated dangled");
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
logger.info("failed to send allocated dangled", e);
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import java.util.Arrays;
import java.util.function.Function;
@ -45,12 +44,9 @@ public class Gateway {
private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final int minimumMasterNodes;
private final IndicesService indicesService;
public Gateway(final Settings settings, final ClusterService clusterService,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService) {
this.indicesService = indicesService;
final TransportNodesListGatewayMetaState listGatewayMetaState) {
this.clusterService = clusterService;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);

View File

@ -26,14 +26,6 @@ import java.io.IOException;
public class GatewayException extends ElasticsearchException {
public GatewayException(String msg) {
super(msg);
}
public GatewayException(String msg, Throwable cause) {
super(msg, cause);
}
public GatewayException(StreamInput in) throws IOException {
super(in);
}

View File

@ -44,9 +44,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;
@ -76,11 +74,9 @@ import java.util.function.UnaryOperator;
public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState {
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final Settings settings;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final TransportService transportService;
//there is a single thread executing updateClusterState calls, hence no volatile modifier
@ -88,16 +84,13 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
protected ClusterState previousClusterState;
protected boolean incrementalWrite;
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
public GatewayMetaState(Settings settings, MetaStateService metaStateService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader,
TransportService transportService, ClusterService clusterService,
IndicesService indicesService) throws IOException {
TransportService transportService, ClusterService clusterService) throws IOException {
this.settings = settings;
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
@ -184,7 +177,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
}
protected boolean isMasterOrDataNode() {
private boolean isMasterOrDataNode() {
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}
@ -312,13 +305,12 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
}
long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try {
long generation = metaStateService.writeManifestAndCleanup(reason, manifest);
metaStateService.writeManifestAndCleanup(reason, manifest);
commitCleanupActions.forEach(Runnable::run);
finished = true;
return generation;
} catch (WriteStateException e) {
// if Manifest write results in dirty WriteStateException it's not safe to remove
// new metadata files, because if Manifest was actually written to disk and its deletion
@ -346,7 +338,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
*
* @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}.
*/
protected void updateClusterState(ClusterState newState, ClusterState previousState)
private void updateClusterState(ClusterState newState, ClusterState previousState)
throws WriteStateException {
MetaData newMetaData = newState.metaData();
@ -406,7 +398,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
private static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode());
return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
}
/**
@ -535,8 +527,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
}
private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<Index> relevantIndices;
relevantIndices = new HashSet<>();
Set<Index> relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());

View File

@ -41,7 +41,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@ -94,7 +93,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
public GatewayService(final Settings settings, final AllocationService allocationService, final ClusterService clusterService,
final ThreadPool threadPool,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService, final Discovery discovery) {
final Discovery discovery) {
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
@ -125,7 +124,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
recoveryRunnable = () ->
clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState, indicesService);
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () ->
gateway.performStateRecovery(new GatewayRecoveryListener());
}

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -76,7 +77,7 @@ public class LocalAllocateDangledIndices {
new AllocateDangledRequestHandler());
}
public void allocateDangled(Collection<IndexMetaData> indices, final Listener listener) {
public void allocateDangled(Collection<IndexMetaData> indices, ActionListener<AllocateDangledResponse> listener) {
ClusterState clusterState = clusterService.state();
DiscoveryNode masterNode = clusterState.nodes().getMasterNode();
if (masterNode == null) {
@ -110,12 +111,6 @@ public class LocalAllocateDangledIndices {
});
}
public interface Listener {
void onResponse(AllocateDangledResponse response);
void onFailure(Throwable e);
}
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
@Override
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel, Task task) throws Exception {
@ -257,10 +252,6 @@ public class LocalAllocateDangledIndices {
this.ack = ack;
}
public boolean ack() {
return ack;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -198,12 +198,11 @@ public class MetaStateService {
*
* @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()}
*/
public long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
logger.trace("[_meta] state written (generation: {})", generation);
return generation;
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
}

View File

@ -297,10 +297,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
private static NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();

View File

@ -56,11 +56,11 @@ public abstract class PriorityComparator implements Comparator<ShardRouting> {
return cmp;
}
private int priority(Settings settings) {
private static int priority(Settings settings) {
return IndexMetaData.INDEX_PRIORITY_SETTING.get(settings);
}
private long timeCreated(Settings settings) {
private static long timeCreated(Settings settings) {
return settings.getAsLong(IndexMetaData.SETTING_CREATION_DATE, -1L);
}

View File

@ -243,8 +243,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
* YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element
* in the returned tuple.
*/
private Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
private static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
Decision madeDecision = Decision.NO;
final boolean explain = allocation.debugDecision();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
@ -260,7 +260,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
if (explain) {
madeDecision = decision;
} else {
return Tuple.tuple(decision, nodeDecisions);
return Tuple.tuple(decision, null);
}
} else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) {
madeDecision = decision;
@ -276,8 +276,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
* Takes the store info for nodes that have a shard store and adds them to the node decisions,
* leaving the node explanations untouched for those nodes that do not have any store information.
*/
private List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
if (nodeDecisions == null || withShardStores == null) {
return null;
}
@ -295,8 +295,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
assert shard.currentNodeId() != null;
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
if (primaryNode == null) {

View File

@ -94,23 +94,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public Request(String... nodesIds) {
super(nodesIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodesGatewayMetaState extends BaseNodesResponse<NodeGatewayMetaState> {
NodesGatewayMetaState() {
}
public NodesGatewayMetaState(ClusterName clusterName, List<NodeGatewayMetaState> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@ -135,15 +122,6 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
super(nodeId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodeGatewayMetaState extends BaseNodeResponse {

View File

@ -51,6 +51,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}.
@ -318,14 +319,8 @@ public class TransportNodesListGatewayStartedShards extends
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
if (primary != that.primary) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;
return primary == that.primary && Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException);
}
@Override

View File

@ -472,8 +472,8 @@ public class Node implements Closeable {
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader, transportService, clusterService, indicesService);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader, transportService, clusterService);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));

View File

@ -32,8 +32,7 @@ public class GatewayServiceTests extends ESTestCase {
final ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null);
return new GatewayService(settings.build(),
null, clusterService, null, null, null, null);
return new GatewayService(settings.build(), null, clusterService, null, null, null);
}
public void testDefaultRecoverAfterTime() {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;
@ -46,10 +45,9 @@ public class MockGatewayMetaState extends GatewayMetaState {
public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException {
super(settings, nodeEnvironment, new MetaStateService(nodeEnvironment, xContentRegistry),
super(settings, new MetaStateService(nodeEnvironment, xContentRegistry),
mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class),
mock(TransportService.class), mock(ClusterService.class),
mock(IndicesService.class));
mock(TransportService.class), mock(ClusterService.class));
this.localNode = localNode;
}

View File

@ -22,6 +22,7 @@ import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.cluster.ClusterName;
@ -385,18 +386,18 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
.numberOfShards(1)
.numberOfReplicas(0)
.build();
DanglingListener listener = new DanglingListener();
dangling.allocateDangled(Arrays.asList(indexMetaData), listener);
listener.latch.await();
CountDownLatch latch = new CountDownLatch(1);
dangling.allocateDangled(Arrays.asList(indexMetaData), ActionListener.wrap(latch::countDown));
latch.await();
assertThat(clusterService.state(), equalTo(originalState));
// remove the alias
client().admin().indices().prepareAliases().removeAlias(indexName, alias).get();
// now try importing a dangling index with the same name as the alias, it should succeed.
listener = new DanglingListener();
dangling.allocateDangled(Arrays.asList(indexMetaData), listener);
listener.latch.await();
latch = new CountDownLatch(1);
dangling.allocateDangled(Arrays.asList(indexMetaData), ActionListener.wrap(latch::countDown));
latch.await();
assertThat(clusterService.state(), not(originalState));
assertNotNull(clusterService.state().getMetaData().index(alias));
}
@ -431,20 +432,6 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
indicesService.verifyIndexIsDeleted(tombstonedIndex, clusterState);
}
private static class DanglingListener implements LocalAllocateDangledIndices.Listener {
final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
latch.countDown();
}
}
/**
* Tests that teh {@link MapperService} created by {@link IndicesService#createIndexMapperService(IndexMetaData)} contains
* custom types and similarities registered by plugins