Core: Drop nodeName from AbstractComponent (#34487)

`AbstractComponent` is trouble because its name implies that
*everything* should extend from it. It *is* useful, but maybe too
broadly useful. The things it offers access too, the `Settings` instance
for the entire server and a logger are nice to have around, but not
really needed *everywhere*. The `Settings` instance especially adds a
fair bit of ceremony to testing without any value.

This removes the `nodeName` method from `AbstractComponent` so it is
more clear where we actually need the node name.
This commit is contained in:
Nik Everett 2018-10-26 15:26:14 -04:00 committed by GitHub
parent 5e0b524aa5
commit 10295b306d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 38 additions and 22 deletions

View File

@ -95,15 +95,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final AtomicReference<ClusterState> state; // last applied state
private final String nodeName;
private NodeConnectionsService nodeConnectionsService;
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.nodeName = nodeName;
}
public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@ -130,7 +133,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
Objects.requireNonNull(state.get(), "please set initial state before starting");
addListener(localNodeMasterListeners);
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + CLUSTER_UPDATE_THREAD_NAME,
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
@ -60,9 +61,12 @@ public class ClusterService extends AbstractLifecycleComponent {
private final ClusterSettings clusterSettings;
private final String nodeName;
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = new MasterService(nodeName, settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
@ -70,7 +74,7 @@ public class ClusterService extends AbstractLifecycleComponent {
this::setSlowTaskLoggingThreshold);
// Add a no-op update consumer so changes are logged
this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings, threadPool);
}
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@ -199,6 +203,13 @@ public class ClusterService extends AbstractLifecycleComponent {
return settings;
}
/**
* The name of this node.
*/
public final String getNodeName() {
return nodeName;
}
/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.

View File

@ -70,6 +70,8 @@ public class MasterService extends AbstractLifecycleComponent {
public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";
private final String nodeName;
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
private java.util.function.Supplier<ClusterState> clusterStateSupplier;
@ -81,8 +83,9 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;
public MasterService(Settings settings, ThreadPool threadPool) {
public MasterService(String nodeName, Settings settings, ThreadPool threadPool) {
super(settings);
this.nodeName = nodeName;
// TODO: introduce a dedicated setting for master service
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.threadPool = threadPool;
@ -105,7 +108,7 @@ public class MasterService extends AbstractLifecycleComponent {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
nodeName + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());

View File

@ -23,7 +23,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
public abstract class AbstractComponent {
@ -36,11 +35,4 @@ public abstract class AbstractComponent {
this.deprecationLogger = new DeprecationLogger(logger);
this.settings = settings;
}
/**
* Returns the nodes name from the settings or the empty string if not set.
*/
public final String nodeName() {
return Node.NODE_NAME_SETTING.get(settings);
}
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
@ -117,6 +118,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final TimeValue resolveTimeout;
private final String nodeName;
private volatile boolean closed = false;
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
@ -131,6 +134,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
nodeName = Node.NODE_NAME_SETTING.get(settings);
logger.debug(
"using concurrent_connects [{}], resolve_timeout [{}]",
concurrentConnects,
@ -141,7 +145,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastZenPingExecutorService = EsExecutors.newScaling(
nodeName() + "/" + "unicast_connect",
nodeName + "/" + "unicast_connect",
0,
concurrentConnects,
60,

View File

@ -65,6 +65,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@ -209,6 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final TransportLogger transportLogger;
private final BytesReference pingMessage;
private final String nodeName;
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
@ -223,6 +225,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
@ -947,7 +950,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
stream.setVersion(nodeVersion);
stream.setFeatures(features);
RemoteTransportException tx = new RemoteTransportException(
nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
nodeName, new TransportAddress(channel.getLocalAddress()), action, error);
threadPool.getThreadContext().writeTo(stream);
stream.writeException(tx);
byte status = 0;

View File

@ -412,7 +412,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
public volatile Long currentTimeOverride = null;
TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
super("test_node", settings, clusterSettings, threadPool);
}
@Override

View File

@ -906,7 +906,7 @@ public class MasterServiceTests extends ESTestCase {
public volatile Long currentTimeOverride = null;
TimedMasterService(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
super("test_node", settings, threadPool);
}
@Override

View File

@ -50,7 +50,7 @@ import static junit.framework.TestCase.fail;
public class ClusterServiceUtils {
public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) {
MasterService masterService = new MasterService(Settings.EMPTY, threadPool);
MasterService masterService = new MasterService("test_master_node", Settings.EMPTY, threadPool);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state()));
masterService.setClusterStateSupplier(clusterStateRef::get);

View File

@ -363,7 +363,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
return emptyList();
}
Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);

View File

@ -55,7 +55,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
}
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.nodeName());
Auditor auditor = new Auditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client, threadPool),

View File

@ -112,7 +112,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent {
if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
LOGGER.error("Job update was submitted to non-master node [" + clusterService.getNodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;