diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java b/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java index 4349c7083e1..b3e64634d22 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java @@ -7,8 +7,10 @@ package org.elasticsearch.enterprise.monitor; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -28,6 +30,8 @@ public class StatsExportersService extends AbstractLifecycleComponent implement final NodeStatsRenderer nodeStatsRenderer; final ShardStatsRenderer shardStatsRenderer; + final IndexStatsRenderer indexStatsRenderer; + final IndicesStatsRenderer indicesStatsRenderer; public ESExporter(Settings settings, Discovery discovery) { super(settings); @@ -75,6 +77,8 @@ public class ESExporter extends AbstractLifecycleComponent implement nodeStatsRenderer = new NodeStatsRenderer(); shardStatsRenderer = new ShardStatsRenderer(); + indexStatsRenderer = new IndexStatsRenderer(); + indicesStatsRenderer = new IndicesStatsRenderer(); logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat); } @@ -97,52 +101,86 @@ public class ESExporter extends AbstractLifecycleComponent implement exportXContent("shard_stats", shardStatsRenderer); } + @Override + public void exportIndicesStats(IndicesStatsResponse indicesStats) { + Map perIndexStats = indicesStats.getIndices(); + indexStatsRenderer.reset(perIndexStats.values().toArray(new IndexStats[perIndexStats.size()])); + indicesStatsRenderer.reset(indicesStats.getTotal(), indicesStats.getPrimaries()); + logger.debug("exporting index_stats + indices_stats"); + HttpURLConnection conn = openExportingConnection(); + if (conn == null) { + return; + } + try { + addXContentRendererToConnection(conn, "index_stats", indexStatsRenderer); + addXContentRendererToConnection(conn, "indices_stats", indicesStatsRenderer); + sendCloseExportingConnection(conn); + } catch (IOException e) { + logger.error("error sending data", e); + return; + } + } + + private HttpURLConnection openExportingConnection() { + if (!checkedForIndexTemplate) { + if (!checkForIndexTemplate()) { + logger.debug("no template defined yet. skipping"); + return null; + } + } + + logger.trace("setting up an export connection"); + HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType()); + if (conn == null) { + logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts); + } + return conn; + } + + private void addXContentRendererToConnection(HttpURLConnection conn, String type, + MultiXContentRenderer renderer) throws IOException { + OutputStream os = conn.getOutputStream(); + // TODO: find a way to disable builder's substream flushing or something neat solution + for (int i = 0; i < renderer.length(); i++) { + XContentBuilder builder = XContentFactory.smileBuilder(os); + builder.startObject().startObject("index") + .field("_index", getIndexName()).field("_type", type).endObject().endObject(); + builder.flush(); + os.write(SmileXContent.smileXContent.streamSeparator()); + + builder = XContentFactory.smileBuilder(os); + builder.humanReadable(false); + renderer.render(i, builder); + builder.flush(); + os.write(SmileXContent.smileXContent.streamSeparator()); + } + } + + private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException { + logger.trace("sending exporting content"); + OutputStream os = conn.getOutputStream(); + os.close(); + + if (conn.getResponseCode() != 200) { + logConnectionError("remote target didn't respond with 200 OK", conn); + } else { + conn.getInputStream().close(); // close and release to connection pool. + } + } private void exportXContent(String type, MultiXContentRenderer xContentRenderer) { if (xContentRenderer.length() == 0) { return; } - if (!checkedForIndexTemplate) { - if (!checkForIndexTemplate()) { - logger.debug("no template defined yet. skipping"); - return; - } - ; - } - logger.debug("exporting {}", type); - HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType()); + HttpURLConnection conn = openExportingConnection(); if (conn == null) { - logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts); return; } try { - OutputStream os = conn.getOutputStream(); - // TODO: find a way to disable builder's substream flushing or something neat solution - for (int i = 0; i < xContentRenderer.length(); i++) { - XContentBuilder builder = XContentFactory.smileBuilder(os); - builder.startObject().startObject("index") - .field("_index", getIndexName()).field("_type", type).endObject().endObject(); - builder.flush(); - os.write(SmileXContent.smileXContent.streamSeparator()); - - builder = XContentFactory.smileBuilder(os); - builder.humanReadable(false); - xContentRenderer.render(i, builder); - builder.flush(); - os.write(SmileXContent.smileXContent.streamSeparator()); - - } - os.close(); - - if (conn.getResponseCode() != 200) { - logConnectionError("remote target didn't respond with 200 OK", conn); - } else { - conn.getInputStream().close(); // close and release to connection pool. - } - - + addXContentRendererToConnection(conn, type, xContentRenderer); + sendCloseExportingConnection(conn); } catch (IOException e) { logger.error("error sending data", e); return; @@ -267,7 +305,11 @@ public class ESExporter extends AbstractLifecycleComponent implement private void addNodeInfo(XContentBuilder builder) throws IOException { - builder.startObject("node"); + addNodeInfo(builder, "node"); + } + + private void addNodeInfo(XContentBuilder builder, String fieldname) throws IOException { + builder.startObject(fieldname); DiscoveryNode node = discovery.localNode(); builder.field("id", node.id()); builder.field("name", node.name()); @@ -352,5 +394,72 @@ public class ESExporter extends AbstractLifecycleComponent implement } } + class IndexStatsRenderer implements MultiXContentRenderer { + + IndexStats[] stats; + long collectionTime; + ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS; + + public void reset(IndexStats[] stats) { + this.stats = stats; + collectionTime = System.currentTimeMillis(); + } + + @Override + public int length() { + return stats == null ? 0 : stats.length; + } + + @Override + public void render(int index, XContentBuilder builder) throws IOException { + builder.startObject(); + builder.field("@timestamp", defaultDatePrinter.print(collectionTime)); + IndexStats indexStats = stats[index]; + builder.field("index", indexStats.getIndex()); + addNodeInfo(builder, "_source_node"); + builder.startObject("primaries"); + indexStats.getPrimaries().toXContent(builder, xContentParams); + builder.endObject(); + builder.startObject("total"); + indexStats.getTotal().toXContent(builder, xContentParams); + builder.endObject(); + builder.endObject(); + } + } + + class IndicesStatsRenderer implements MultiXContentRenderer { + + CommonStats totalStats; + CommonStats primariesStats; + long collectionTime; + ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS; + + public void reset(CommonStats totalStats, CommonStats primariesStats) { + this.totalStats = totalStats; + this.primariesStats = primariesStats; + collectionTime = System.currentTimeMillis(); + } + + @Override + public int length() { + return totalStats == null ? 0 : 1; + } + + @Override + public void render(int index, XContentBuilder builder) throws IOException { + assert index == 0; + builder.startObject(); + builder.field("@timestamp", defaultDatePrinter.print(collectionTime)); + addNodeInfo(builder, "_source_node"); + builder.startObject("primaries"); + primariesStats.toXContent(builder, xContentParams); + builder.endObject(); + builder.startObject("total"); + totalStats.toXContent(builder, xContentParams); + builder.endObject(); + builder.endObject(); + } + } + } diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java index 93875ef978b..ba4a796bc01 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java @@ -5,9 +5,14 @@ */ package org.elasticsearch.enterprise.monitor.exporter; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.common.component.LifecycleComponent; +import java.util.Map; + public interface StatsExporter extends LifecycleComponent { String name(); @@ -15,4 +20,7 @@ public interface StatsExporter extends LifecycleComponent { void exportNodeStats(NodeStats nodeStats); void exportShardStats(ShardStats[] shardStatsArray); + + void exportIndicesStats(IndicesStatsResponse indicesStats); + }