From 2c6e78e16ceb71861dc23f173ecb702b0d36a096 Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 24 Mar 2016 15:28:10 +0100 Subject: [PATCH] more Writeable in ClusterStatsNodes --- .../cluster/stats/ClusterStatsNodes.java | 386 +++++++++--------- .../cluster/stats/ClusterStatsResponse.java | 2 +- 2 files changed, 190 insertions(+), 198 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 5f9f824b81e..d19d46ccfb4 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; @@ -41,42 +42,57 @@ import org.elasticsearch.plugins.PluginInfo; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -public class ClusterStatsNodes implements ToXContent, Streamable { +public class ClusterStatsNodes implements ToXContent, Writeable { - private Counts counts; - private Set versions; - private OsStats os; - private ProcessStats process; - private JvmStats jvm; - private FsInfo.Path fs; - private Set plugins; + private final Counts counts; + private final Set versions; + private final OsStats os; + private final ProcessStats process; + private final JvmStats jvm; + private final FsInfo.Path fs; + private final Set plugins; - private ClusterStatsNodes() { + ClusterStatsNodes(StreamInput in) throws IOException { + this.counts = new Counts(in); + + int size = in.readVInt(); + this.versions = new HashSet<>(size); + for (; size > 0; size--) { + this.versions.add(Version.readVersion(in)); + } + + this.os = new OsStats(in); + this.process = new ProcessStats(in); + this.jvm = new JvmStats(in); + this.fs = FsInfo.Path.readInfoFrom(in); + + size = in.readVInt(); + this.plugins = new HashSet<>(size); + for (; size > 0; size--) { + this.plugins.add(PluginInfo.readFromStream(in)); + } } - public ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) { - this.counts = new Counts(); + ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) { this.versions = new HashSet<>(); - this.os = new OsStats(); - this.jvm = new JvmStats(); this.fs = new FsInfo.Path(); this.plugins = new HashSet<>(); - this.process = new ProcessStats(); Set seenAddresses = new HashSet<>(nodeResponses.length); - + List nodeInfos = new ArrayList<>(); + List nodeStats = new ArrayList<>(); for (ClusterStatsNodeResponse nodeResponse : nodeResponses) { - - counts.addNodeInfo(nodeResponse.nodeInfo()); - versions.add(nodeResponse.nodeInfo().getVersion()); - process.addNodeStats(nodeResponse.nodeStats()); - jvm.addNodeInfoStats(nodeResponse.nodeInfo(), nodeResponse.nodeStats()); - plugins.addAll(nodeResponse.nodeInfo().getPlugins().getPluginInfos()); + nodeInfos.add(nodeResponse.nodeInfo()); + nodeStats.add(nodeResponse.nodeStats()); + this.versions.add(nodeResponse.nodeInfo().getVersion()); + this.plugins.addAll(nodeResponse.nodeInfo().getPlugins().getPluginInfos()); // now do the stats that should be deduped by hardware (implemented by ip deduping) TransportAddress publishAddress = nodeResponse.nodeInfo().getTransport().address().publishAddress(); @@ -84,19 +100,19 @@ public class ClusterStatsNodes implements ToXContent, Streamable { if (publishAddress.uniqueAddressTypeId() == 1) { inetAddress = ((InetSocketTransportAddress) publishAddress).address().getAddress(); } - if (!seenAddresses.add(inetAddress)) { continue; } - - os.addNodeInfo(nodeResponse.nodeInfo()); if (nodeResponse.nodeStats().getFs() != null) { - fs.add(nodeResponse.nodeStats().getFs().total()); + this.fs.add(nodeResponse.nodeStats().getFs().total()); } } + this.counts = new Counts(nodeInfos); + this.os = new OsStats(nodeInfos); + this.process = new ProcessStats(nodeStats); + this.jvm = new JvmStats(nodeInfos, nodeStats); } - public Counts getCounts() { return this.counts; } @@ -127,25 +143,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable { @Override - public void readFrom(StreamInput in) throws IOException { - counts = Counts.readCounts(in); - - int size = in.readVInt(); - versions = new HashSet<>(size); - for (; size > 0; size--) { - versions.add(Version.readVersion(in)); - } - - os = OsStats.readOsStats(in); - process = ProcessStats.readStats(in); - jvm = JvmStats.readJvmStats(in); - fs = FsInfo.Path.readInfoFrom(in); - - size = in.readVInt(); - plugins = new HashSet<>(size); - for (; size > 0; size--) { - plugins.add(PluginInfo.readFromStream(in)); - } + public ClusterStatsNodes readFrom(StreamInput in) throws IOException { + return new ClusterStatsNodes(in); } @Override @@ -163,12 +162,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable { } } - public static ClusterStatsNodes readNodeStats(StreamInput in) throws IOException { - ClusterStatsNodes nodeStats = new ClusterStatsNodes(); - nodeStats.readFrom(in); - return nodeStats; - } - static final class Fields { static final XContentBuilderString COUNT = new XContentBuilderString("count"); static final XContentBuilderString VERSIONS = new XContentBuilderString("versions"); @@ -214,31 +207,39 @@ public class ClusterStatsNodes implements ToXContent, Streamable { return builder; } - public static class Counts implements Streamable, ToXContent { + public static class Counts implements Writeable, ToXContent { static final String COORDINATING_ONLY = "coordinating_only"; - private int total; - private Map roles; + private final int total; + private final Map roles; - Counts() { - roles = new HashMap<>(); - for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) { - roles.put(role.getRoleName(), 0); - } - roles.put(COORDINATING_ONLY, 0); + @SuppressWarnings("unchecked") + private Counts(StreamInput in) throws IOException { + this.total = in.readVInt(); + this.roles = (Map)in.readGenericValue(); } - public void addNodeInfo(NodeInfo nodeInfo) { - total++; - if (nodeInfo.getNode().getRoles().size() == 0) { - Integer count = roles.get(COORDINATING_ONLY); - roles.put(COORDINATING_ONLY, ++count); - } else { - for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) { - Integer count = roles.get(role.getRoleName()); - roles.put(role.getRoleName(), ++count); + private Counts(List nodeInfos) { + this.roles = new HashMap<>(); + for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) { + this.roles.put(role.getRoleName(), 0); + } + this.roles.put(COORDINATING_ONLY, 0); + + int total = 0; + for (NodeInfo nodeInfo : nodeInfos) { + total++; + if (nodeInfo.getNode().getRoles().isEmpty()) { + Integer count = roles.get(COORDINATING_ONLY); + roles.put(COORDINATING_ONLY, ++count); + } else { + for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) { + Integer count = roles.get(role.getRoleName()); + roles.put(role.getRoleName(), ++count); + } } } + this.total = total; } public int getTotal() { @@ -249,17 +250,9 @@ public class ClusterStatsNodes implements ToXContent, Streamable { return roles; } - public static Counts readCounts(StreamInput in) throws IOException { - Counts c = new Counts(); - c.readFrom(in); - return c; - } - @Override - @SuppressWarnings("unchecked") - public void readFrom(StreamInput in) throws IOException { - total = in.readVInt(); - roles = (Map)in.readGenericValue(); + public Counts readFrom(StreamInput in) throws IOException { + return new Counts(in); } @Override @@ -282,24 +275,36 @@ public class ClusterStatsNodes implements ToXContent, Streamable { } } - public static class OsStats implements ToXContent, Streamable { - - int availableProcessors; - int allocatedProcessors; - long availableMemory; + public static class OsStats implements ToXContent, Writeable { + final int availableProcessors; + final int allocatedProcessors; final ObjectIntHashMap names; - public OsStats() { - names = new ObjectIntHashMap<>(); + @SuppressWarnings("unchecked") + private OsStats(StreamInput in) throws IOException { + this.availableProcessors = in.readVInt(); + this.allocatedProcessors = in.readVInt(); + int size = in.readVInt(); + this.names = new ObjectIntHashMap<>(); + for (int i = 0; i < size; i++) { + names.addTo(in.readString(), in.readVInt()); + } } - public void addNodeInfo(NodeInfo nodeInfo) { - availableProcessors += nodeInfo.getOs().getAvailableProcessors(); - allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors(); + private OsStats(List nodeInfos) { + this.names = new ObjectIntHashMap<>(); + int availableProcessors = 0; + int allocatedProcessors = 0; + for (NodeInfo nodeInfo : nodeInfos) { + availableProcessors += nodeInfo.getOs().getAvailableProcessors(); + allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors(); - if (nodeInfo.getOs().getName() != null) { - names.addTo(nodeInfo.getOs().getName(), 1); + if (nodeInfo.getOs().getName() != null) { + names.addTo(nodeInfo.getOs().getName(), 1); + } } + this.availableProcessors = availableProcessors; + this.allocatedProcessors = allocatedProcessors; } public int getAvailableProcessors() { @@ -310,27 +315,15 @@ public class ClusterStatsNodes implements ToXContent, Streamable { return allocatedProcessors; } - public ByteSizeValue getAvailableMemory() { - return new ByteSizeValue(availableMemory); - } - @Override - public void readFrom(StreamInput in) throws IOException { - availableProcessors = in.readVInt(); - allocatedProcessors = in.readVInt(); - availableMemory = in.readLong(); - int size = in.readVInt(); - names.clear(); - for (int i = 0; i < size; i++) { - names.addTo(in.readString(), in.readVInt()); - } + public OsStats readFrom(StreamInput in) throws IOException { + return new OsStats(in); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(availableProcessors); out.writeVInt(allocatedProcessors); - out.writeLong(availableMemory); out.writeVInt(names.size()); for (ObjectIntCursor name : names) { out.writeString(name.key); @@ -338,20 +331,11 @@ public class ClusterStatsNodes implements ToXContent, Streamable { } } - public static OsStats readOsStats(StreamInput in) throws IOException { - OsStats os = new OsStats(); - os.readFrom(in); - return os; - } - static final class Fields { static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors"); static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors"); static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString NAMES = new XContentBuilderString("names"); - static final XContentBuilderString MEM = new XContentBuilderString("mem"); - static final XContentBuilderString TOTAL = new XContentBuilderString("total"); - static final XContentBuilderString TOTAL_IN_BYTES = new XContentBuilderString("total_in_bytes"); static final XContentBuilderString COUNT = new XContentBuilderString("count"); } @@ -359,10 +343,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors); builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors); - builder.startObject(Fields.MEM); - builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory); - builder.endObject(); - builder.startArray(Fields.NAMES); for (ObjectIntCursor name : names) { builder.startObject(); @@ -371,35 +351,54 @@ public class ClusterStatsNodes implements ToXContent, Streamable { builder.endObject(); } builder.endArray(); - return builder; } } - public static class ProcessStats implements ToXContent, Streamable { + public static class ProcessStats implements ToXContent, Writeable { - int count; - int cpuPercent; - long totalOpenFileDescriptors; - long minOpenFileDescriptors = Long.MAX_VALUE; - long maxOpenFileDescriptors = Long.MIN_VALUE; + final int count; + final int cpuPercent; + final long totalOpenFileDescriptors; + final long minOpenFileDescriptors; + final long maxOpenFileDescriptors; - public void addNodeStats(NodeStats nodeStats) { - if (nodeStats.getProcess() == null) { - return; + private ProcessStats(StreamInput in) throws IOException { + this.count = in.readVInt(); + this.cpuPercent = in.readVInt(); + this.totalOpenFileDescriptors = in.readVLong(); + this.minOpenFileDescriptors = in.readLong(); + this.maxOpenFileDescriptors = in.readLong(); + } + + private ProcessStats(List nodeStatsList) { + int count = 0; + int cpuPercent = 0; + long totalOpenFileDescriptors = 0; + long minOpenFileDescriptors = Long.MAX_VALUE; + long maxOpenFileDescriptors = Long.MIN_VALUE; + for (NodeStats nodeStats : nodeStatsList) { + if (nodeStats.getProcess() == null) { + continue; + } + count++; + if (nodeStats.getProcess().getCpu() != null) { + cpuPercent += nodeStats.getProcess().getCpu().getPercent(); + } + long fd = nodeStats.getProcess().getOpenFileDescriptors(); + if (fd > 0) { + // fd can be -1 if not supported on platform + totalOpenFileDescriptors += fd; + } + // we still do min max calc on -1, so we'll have an indication of it not being supported on one of the nodes. + minOpenFileDescriptors = Math.min(minOpenFileDescriptors, fd); + maxOpenFileDescriptors = Math.max(maxOpenFileDescriptors, fd); } - count++; - if (nodeStats.getProcess().getCpu() != null) { - cpuPercent += nodeStats.getProcess().getCpu().getPercent(); - } - long fd = nodeStats.getProcess().getOpenFileDescriptors(); - if (fd > 0) { - // fd can be -1 if not supported on platform - totalOpenFileDescriptors += fd; - } - // we still do min max calc on -1, so we'll have an indication of it not being supported on one of the nodes. - minOpenFileDescriptors = Math.min(minOpenFileDescriptors, fd); - maxOpenFileDescriptors = Math.max(maxOpenFileDescriptors, fd); + this.count = count; + this.cpuPercent = cpuPercent; + this.totalOpenFileDescriptors = totalOpenFileDescriptors; + this.minOpenFileDescriptors = minOpenFileDescriptors; + this.maxOpenFileDescriptors = maxOpenFileDescriptors; } /** @@ -431,12 +430,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable { } @Override - public void readFrom(StreamInput in) throws IOException { - count = in.readVInt(); - cpuPercent = in.readVInt(); - totalOpenFileDescriptors = in.readVLong(); - minOpenFileDescriptors = in.readLong(); - maxOpenFileDescriptors = in.readLong(); + public ProcessStats readFrom(StreamInput in) throws IOException { + return new ProcessStats(in); } @Override @@ -448,12 +443,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable { out.writeLong(maxOpenFileDescriptors); } - public static ProcessStats readStats(StreamInput in) throws IOException { - ProcessStats cpu = new ProcessStats(); - cpu.readFrom(in); - return cpu; - } - static final class Fields { static final XContentBuilderString CPU = new XContentBuilderString("cpu"); static final XContentBuilderString PERCENT = new XContentBuilderString("percent"); @@ -477,20 +466,54 @@ public class ClusterStatsNodes implements ToXContent, Streamable { } } - public static class JvmStats implements Streamable, ToXContent { + public static class JvmStats implements Writeable, ToXContent { - ObjectIntHashMap versions; - long threads; - long maxUptime; - long heapUsed; - long heapMax; + private final ObjectIntHashMap versions; + private final long threads; + private final long maxUptime; + private final long heapUsed; + private final long heapMax; - JvmStats() { - versions = new ObjectIntHashMap<>(); - threads = 0; - maxUptime = 0; - heapMax = 0; - heapUsed = 0; + private JvmStats(StreamInput in) throws IOException { + int size = in.readVInt(); + this.versions = new ObjectIntHashMap<>(size); + for (; size > 0; size--) { + this.versions.addTo(JvmVersion.readJvmVersion(in), in.readVInt()); + } + this.threads = in.readVLong(); + this.maxUptime = in.readVLong(); + this.heapUsed = in.readVLong(); + this.heapMax = in.readVLong(); + } + + private JvmStats(List nodeInfos, List nodeStatsList) { + this.versions = new ObjectIntHashMap<>(); + long threads = 0; + long maxUptime = 0; + long heapMax = 0; + long heapUsed = 0; + for (NodeInfo nodeInfo : nodeInfos) { + versions.addTo(new JvmVersion(nodeInfo.getJvm()), 1); + } + + for (NodeStats nodeStats : nodeStatsList) { + org.elasticsearch.monitor.jvm.JvmStats js = nodeStats.getJvm(); + if (js == null) { + continue; + } + if (js.getThreads() != null) { + threads += js.getThreads().getCount(); + } + maxUptime = Math.max(maxUptime, js.getUptime().millis()); + if (js.getMem() != null) { + heapUsed += js.getMem().getHeapUsed().bytes(); + heapMax += js.getMem().getHeapMax().bytes(); + } + } + this.threads = threads; + this.maxUptime = maxUptime; + this.heapUsed = heapUsed; + this.heapMax = heapMax; } public ObjectIntHashMap getVersions() { @@ -525,33 +548,9 @@ public class ClusterStatsNodes implements ToXContent, Streamable { return new ByteSizeValue(heapMax); } - public void addNodeInfoStats(NodeInfo nodeInfo, NodeStats nodeStats) { - versions.addTo(new JvmVersion(nodeInfo.getJvm()), 1); - org.elasticsearch.monitor.jvm.JvmStats js = nodeStats.getJvm(); - if (js == null) { - return; - } - if (js.getThreads() != null) { - threads += js.getThreads().getCount(); - } - maxUptime = Math.max(maxUptime, js.getUptime().millis()); - if (js.getMem() != null) { - heapUsed += js.getMem().getHeapUsed().bytes(); - heapMax += js.getMem().getHeapMax().bytes(); - } - } - @Override - public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - versions = new ObjectIntHashMap<>(size); - for (; size > 0; size--) { - versions.addTo(JvmVersion.readJvmVersion(in), in.readVInt()); - } - threads = in.readVLong(); - maxUptime = in.readVLong(); - heapUsed = in.readVLong(); - heapMax = in.readVLong(); + public JvmStats readFrom(StreamInput in) throws IOException { + return new JvmStats(in); } @Override @@ -561,19 +560,12 @@ public class ClusterStatsNodes implements ToXContent, Streamable { v.key.writeTo(out); out.writeVInt(v.value); } - out.writeVLong(threads); out.writeVLong(maxUptime); out.writeVLong(heapUsed); out.writeVLong(heapMax); } - public static JvmStats readJvmStats(StreamInput in) throws IOException { - JvmStats jvmStats = new JvmStats(); - jvmStats.readFrom(in); - return jvmStats; - } - static final class Fields { static final XContentBuilderString VERSIONS = new XContentBuilderString("versions"); static final XContentBuilderString VERSION = new XContentBuilderString("version"); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 4eac6650a5e..b801c43b025 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -110,7 +110,7 @@ public class ClusterStatsResponse extends BaseNodesResponse