Internal: Remove cyclic dependency between HttpServer and NodeService
NodeService has an "service attributes" map, which is only set by HttpServer on start/stop. But the only thing it puts in this map is already available as part of the HttpServer info which is added to node info requests. This change removes the attributes map and removes the dependency in HttpServer on NodeService.
This commit is contained in:
parent
01d7020ee3
commit
dbbfadeefa
|
@ -46,8 +46,6 @@ import static java.util.Collections.unmodifiableMap;
|
|||
* Node information (static, does not change over time).
|
||||
*/
|
||||
public class NodeInfo extends BaseNodeResponse {
|
||||
@Nullable
|
||||
private Map<String, String> serviceAttributes;
|
||||
|
||||
private Version version;
|
||||
private Build build;
|
||||
|
@ -85,14 +83,13 @@ public class NodeInfo extends BaseNodeResponse {
|
|||
public NodeInfo() {
|
||||
}
|
||||
|
||||
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Map<String, String> serviceAttributes, @Nullable Settings settings,
|
||||
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
|
||||
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
|
||||
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest,
|
||||
@Nullable ByteSizeValue totalIndexingBuffer) {
|
||||
super(node);
|
||||
this.version = version;
|
||||
this.build = build;
|
||||
this.serviceAttributes = serviceAttributes;
|
||||
this.settings = settings;
|
||||
this.os = os;
|
||||
this.process = process;
|
||||
|
@ -127,14 +124,6 @@ public class NodeInfo extends BaseNodeResponse {
|
|||
return this.build;
|
||||
}
|
||||
|
||||
/**
|
||||
* The service attributes of the node.
|
||||
*/
|
||||
@Nullable
|
||||
public Map<String, String> getServiceAttributes() {
|
||||
return this.serviceAttributes;
|
||||
}
|
||||
|
||||
/**
|
||||
* The settings of the node.
|
||||
*/
|
||||
|
@ -213,13 +202,15 @@ public class NodeInfo extends BaseNodeResponse {
|
|||
} else {
|
||||
totalIndexingBuffer = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
Map<String, String> builder = new HashMap<>();
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
builder.put(in.readString(), in.readString());
|
||||
if (version.onOrBefore(Version.V_5_0_0_alpha4)) {
|
||||
// service attributes were removed
|
||||
if (in.readBoolean()) {
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
in.readString(); // key
|
||||
in.readString(); // value
|
||||
}
|
||||
}
|
||||
serviceAttributes = unmodifiableMap(builder);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
settings = Settings.readSettingsFromStream(in);
|
||||
|
@ -262,15 +253,8 @@ public class NodeInfo extends BaseNodeResponse {
|
|||
out.writeBoolean(true);
|
||||
out.writeLong(totalIndexingBuffer.bytes());
|
||||
}
|
||||
if (getServiceAttributes() == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeVInt(serviceAttributes.size());
|
||||
for (Map.Entry<String, String> entry : serviceAttributes.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
out.writeString(entry.getValue());
|
||||
}
|
||||
if (version.onOrBefore(Version.V_5_0_0_alpha4)) {
|
||||
out.writeBoolean(false); // service attributes removed
|
||||
}
|
||||
if (settings == null) {
|
||||
out.writeBoolean(false);
|
||||
|
|
|
@ -73,12 +73,6 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
|
|||
builder.byteSizeField("total_indexing_buffer", "total_indexing_buffer_in_bytes", nodeInfo.getTotalIndexingBuffer());
|
||||
}
|
||||
|
||||
if (nodeInfo.getServiceAttributes() != null) {
|
||||
for (Map.Entry<String, String> nodeAttribute : nodeInfo.getServiceAttributes().entrySet()) {
|
||||
builder.field(nodeAttribute.getKey(), nodeAttribute.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
builder.startArray("roles");
|
||||
for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) {
|
||||
builder.value(role.getRoleName());
|
||||
|
|
|
@ -56,22 +56,18 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
|
|||
|
||||
private final RestController restController;
|
||||
|
||||
private final NodeService nodeService;
|
||||
|
||||
private final NodeClient client;
|
||||
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
@Inject
|
||||
public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService,
|
||||
public HttpServer(Settings settings, HttpServerTransport transport, RestController restController,
|
||||
NodeClient client, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.transport = transport;
|
||||
this.restController = restController;
|
||||
this.nodeService = nodeService;
|
||||
this.client = client;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
nodeService.setHttpServer(this);
|
||||
transport.httpServerAdapter(this);
|
||||
}
|
||||
|
||||
|
@ -82,12 +78,10 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
|
|||
if (logger.isInfoEnabled()) {
|
||||
logger.info("{}", transport.boundAddress());
|
||||
}
|
||||
nodeService.putAttribute("http_address", transport.boundAddress().publishAddress().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
nodeService.removeAttribute("http_address");
|
||||
transport.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,13 +19,15 @@
|
|||
|
||||
package org.elasticsearch.node.service;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -42,14 +44,6 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NodeService extends AbstractComponent implements Closeable {
|
||||
|
@ -64,18 +58,14 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
private final SettingsFilter settingsFilter;
|
||||
private ClusterService clusterService;
|
||||
private ScriptService scriptService;
|
||||
|
||||
@Nullable
|
||||
private HttpServer httpServer;
|
||||
|
||||
private volatile Map<String, String> serviceAttributes = emptyMap();
|
||||
private final HttpServer httpServer;
|
||||
|
||||
private final Discovery discovery;
|
||||
|
||||
@Inject
|
||||
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService,
|
||||
Discovery discovery, TransportService transportService, IndicesService indicesService,
|
||||
PluginsService pluginService, CircuitBreakerService circuitBreakerService,
|
||||
PluginsService pluginService, CircuitBreakerService circuitBreakerService, HttpServer httpServer,
|
||||
ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
|
@ -85,6 +75,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
this.discovery = discovery;
|
||||
this.pluginService = pluginService;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.httpServer = httpServer;
|
||||
this.clusterService = clusterService;
|
||||
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
|
||||
this.settingsFilter = settingsFilter;
|
||||
|
@ -99,38 +90,15 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService);
|
||||
}
|
||||
|
||||
public void setHttpServer(@Nullable HttpServer httpServer) {
|
||||
this.httpServer = httpServer;
|
||||
}
|
||||
|
||||
public synchronized void putAttribute(String key, String value) {
|
||||
Map<String, String> newServiceAttributes = new HashMap<>(serviceAttributes);
|
||||
newServiceAttributes.put(key, value);
|
||||
serviceAttributes = unmodifiableMap(newServiceAttributes);
|
||||
}
|
||||
|
||||
public synchronized void removeAttribute(String key) {
|
||||
Map<String, String> newServiceAttributes = new HashMap<>(serviceAttributes);
|
||||
newServiceAttributes.remove(key);
|
||||
serviceAttributes = unmodifiableMap(newServiceAttributes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attributes different services in the node can add to be reported as part of the node info (for example).
|
||||
*/
|
||||
public Map<String, String> attributes() {
|
||||
return this.serviceAttributes;
|
||||
}
|
||||
|
||||
public NodeInfo info() {
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(),
|
||||
settings,
|
||||
monitorService.osService().info(),
|
||||
monitorService.processService().info(),
|
||||
monitorService.jvmService().info(),
|
||||
threadPool.info(),
|
||||
transportService.info(),
|
||||
httpServer == null ? null : httpServer.info(),
|
||||
httpServer.info(),
|
||||
pluginService == null ? null : pluginService.info(),
|
||||
ingestService == null ? null : ingestService.info(),
|
||||
indicesService.getTotalIndexingBufferBytes()
|
||||
|
@ -139,14 +107,14 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
|
||||
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
|
||||
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(), serviceAttributes,
|
||||
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(),
|
||||
settings ? settingsFilter.filter(this.settings) : null,
|
||||
os ? monitorService.osService().info() : null,
|
||||
process ? monitorService.processService().info() : null,
|
||||
jvm ? monitorService.jvmService().info() : null,
|
||||
threadPool ? this.threadPool.info() : null,
|
||||
transport ? transportService.info() : null,
|
||||
http ? (httpServer == null ? null : httpServer.info()) : null,
|
||||
http ? httpServer.info() : null,
|
||||
plugin ? (pluginService == null ? null : pluginService.info()) : null,
|
||||
ingest ? (ingestService == null ? null : ingestService.info()) : null,
|
||||
indices ? indicesService.getTotalIndexingBufferBytes() : null
|
||||
|
@ -164,7 +132,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
threadPool.stats(),
|
||||
monitorService.fsService().stats(),
|
||||
transportService.stats(),
|
||||
httpServer == null ? null : httpServer.stats(),
|
||||
httpServer.stats(),
|
||||
circuitBreakerService.stats(),
|
||||
scriptService.stats(),
|
||||
discovery.stats(),
|
||||
|
@ -185,7 +153,7 @@ public class NodeService extends AbstractComponent implements Closeable {
|
|||
threadPool ? this.threadPool.stats() : null,
|
||||
fs ? monitorService.fsService().stats() : null,
|
||||
transport ? transportService.stats() : null,
|
||||
http ? (httpServer == null ? null : httpServer.stats()) : null,
|
||||
http ? httpServer.stats() : null,
|
||||
circuitBreaker ? circuitBreakerService.stats() : null,
|
||||
script ? scriptService.stats() : null,
|
||||
discoveryStats ? discovery.stats() : null,
|
||||
|
|
|
@ -81,11 +81,6 @@ public class NodeInfoStreamingTests extends ESTestCase {
|
|||
assertThat(nodeInfo.getBuild().toString(), equalTo(readNodeInfo.getBuild().toString()));
|
||||
assertThat(nodeInfo.getHostname(), equalTo(readNodeInfo.getHostname()));
|
||||
assertThat(nodeInfo.getVersion(), equalTo(readNodeInfo.getVersion()));
|
||||
assertThat(nodeInfo.getServiceAttributes().size(), equalTo(readNodeInfo.getServiceAttributes().size()));
|
||||
for (Map.Entry<String, String> entry : nodeInfo.getServiceAttributes().entrySet()) {
|
||||
assertNotNull(readNodeInfo.getServiceAttributes().get(entry.getKey()));
|
||||
assertThat(readNodeInfo.getServiceAttributes().get(entry.getKey()), equalTo(entry.getValue()));
|
||||
}
|
||||
compareJsonOutput(nodeInfo.getHttp(), readNodeInfo.getHttp());
|
||||
compareJsonOutput(nodeInfo.getJvm(), readNodeInfo.getJvm());
|
||||
compareJsonOutput(nodeInfo.getProcess(), readNodeInfo.getProcess());
|
||||
|
@ -149,6 +144,7 @@ public class NodeInfoStreamingTests extends ESTestCase {
|
|||
// pick a random long that sometimes exceeds an int:
|
||||
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
|
||||
}
|
||||
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo, indexingBuffer);
|
||||
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm,
|
||||
threadPoolInfo, transport, htttpInfo, plugins, ingestInfo, indexingBuffer);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue