Decouple ClusterStateTaskListener & ClusterApplier (#30809)

Today, the `ClusterApplier` and `MasterService` both use the
`ClusterStateTaskListener` interface to notify their callers when asynchronous
activities have completed. However, this is not wholly appropriate: none of the
callers into the `ClusterApplier` care about the `ClusterState` arguments that
they receive.  This change introduces a dedicated ClusterApplyListener
interface for callers into the `ClusterApplier`, to distinguish these listeners
from the real `ClusterStateTaskListener`s that are waiting for responses from
the `MasterService`.
This commit is contained in:
David Turner 2018-05-24 09:05:09 +01:00 committed by GitHub
parent 0bdfb5c5b5
commit ff0b6c795a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 71 additions and 61 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import java.util.function.Supplier;
@ -38,11 +37,29 @@ public interface ClusterApplier {
* @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply
* @param listener callback that is invoked after cluster state is applied
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);
/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();
/**
* Listener for results of cluster state application
*/
interface ClusterApplyListener {
/**
* Called on successful cluster state application
* @param source information where the cluster state came from
*/
default void onSuccess(String source) {
}
/**
* Called on failure during cluster state application
* @param source information where the cluster state came from
* @param e exception that occurred
*/
void onFailure(String source, Exception e);
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
@ -141,10 +140,10 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
final ClusterStateTaskListener listener;
final ClusterApplyListener listener;
final Function<ClusterState, ClusterState> updateFunction;
UpdateTask(Priority priority, String source, ClusterStateTaskListener listener,
UpdateTask(Priority priority, String source, ClusterApplyListener listener,
Function<ClusterState, ClusterState> updateFunction) {
super(priority, source);
this.listener = listener;
@ -301,7 +300,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener, Priority priority) {
final ClusterApplyListener listener, Priority priority) {
submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
(clusterState) -> {
clusterStateConsumer.accept(clusterState);
@ -311,13 +310,13 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
}
@Override
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
Function<ClusterState, ClusterState> applyFunction = currentState -> {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
@ -331,12 +330,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
final Function<ClusterState, ClusterState> executor,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
if (!lifecycle.started()) {
return;
}
try {
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),
() -> threadPool.generic().execute(
@ -417,7 +416,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
if (previousClusterState == newClusterState) {
task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
task.listener.onSuccess(task.source);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
@ -486,7 +485,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
callClusterStateListeners(clusterChangedEvent);
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
task.listener.onSuccess(task.source);
}
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
@ -511,11 +510,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
});
}
private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
private final ClusterStateTaskListener listener;
private static class SafeClusterApplyListener implements ClusterApplyListener {
private final ClusterApplyListener listener;
private final Logger logger;
SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) {
SafeClusterApplyListener(ClusterApplyListener listener, Logger logger) {
this.listener = listener;
this.logger = logger;
}
@ -532,14 +531,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
listener.clusterStateProcessed(source, oldState, newState);
listener.onSuccess(source);
} catch (Exception e) {
logger.error(new ParameterizedMessage(
"exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" +
"{}\nnew cluster state:\n{}",
source, oldState, newState), e);
"exception thrown by listener while notifying of cluster state processed from [{}]", source), e);
}
}
}

View File

@ -22,18 +22,16 @@ package org.elasticsearch.discovery.single;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -65,9 +63,9 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
clusterState = event.state();
CountDownLatch latch = new CountDownLatch(1);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
ClusterApplyListener listener = new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
ackListener.onNodeAck(transportService.getLocalNode(), null);
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery.zen;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -34,12 +33,11 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -789,9 +787,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
this::clusterState,
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -135,9 +136,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -151,9 +152,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}
@ -166,9 +167,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test3",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -216,9 +217,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
processedFirstTask.countDown();
}
@ -234,9 +235,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}
@ -247,9 +248,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
});
clusterApplierService.runOnApplierThread("test3",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -262,9 +263,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test4",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -340,10 +341,10 @@ public class ClusterApplierServiceTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -390,9 +391,9 @@ public class ClusterApplierServiceTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
@ -72,9 +71,9 @@ public class SingleNodeDiscoveryTests extends ESTestCase {
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
ClusterStateTaskListener listener) {
ClusterApplyListener listener) {
clusterState.set(clusterStateSupplier.get());
listener.clusterStateProcessed(source, clusterState.get(), clusterState.get());
listener.onSuccess(source);
}
});
discovery.start();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -314,8 +313,8 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -446,9 +447,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build())
.build();
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> newState, new ClusterStateTaskListener() {
clusterApplierService.onNewClusterState("test", () -> newState, new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

View File

@ -24,13 +24,13 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
@ -72,9 +72,9 @@ public class ClusterServiceUtils {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
executor.onNewClusterState("test setting state",
() -> ClusterState.builder(clusterState).version(clusterState.version() + 1).build(), new ClusterStateTaskListener() {
() -> ClusterState.builder(clusterState).version(clusterState.version() + 1).build(), new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}
@ -163,9 +163,9 @@ public class ClusterServiceUtils {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> ex = new AtomicReference<>();
clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}