diff --git a/src/main/java/org/elasticsearch/marvel/monitor/StatsExportersService.java b/src/main/java/org/elasticsearch/marvel/monitor/StatsExportersService.java index 639fa20e838..0c3fdaeb030 100644 --- a/src/main/java/org/elasticsearch/marvel/monitor/StatsExportersService.java +++ b/src/main/java/org/elasticsearch/marvel/monitor/StatsExportersService.java @@ -11,20 +11,30 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.marvel.monitor.annotation.Annotation; +import org.elasticsearch.marvel.monitor.annotation.ShardEventAnnotation; import org.elasticsearch.marvel.monitor.exporter.ESExporter; import org.elasticsearch.marvel.monitor.exporter.StatsExporter; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesService; import org.elasticsearch.node.service.NodeService; +import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; public class StatsExportersService extends AbstractLifecycleComponent { @@ -33,12 +43,16 @@ public class StatsExportersService extends AbstractLifecycleComponent exporters; + private final BlockingQueue pendingAnnotationsQueue; + @Inject public StatsExportersService(Settings settings, IndicesService indicesService, NodeService nodeService, ClusterService clusterService, @@ -53,6 +67,9 @@ public class StatsExportersService extends AbstractLifecycleComponent annotationsList = new ArrayList(pendingAnnotationsQueue.size()); + pendingAnnotationsQueue.drainTo(annotationsList); + Annotation[] annotations = new Annotation[annotationsList.size()]; + annotationsList.toArray(annotations); + + for (StatsExporter e : exporters) { + try { + e.exportAnnotations(annotations); + } catch (Throwable t) { + logger.error("StatsExporter [{}] has thrown an exception:", t, e.name()); + } + } + } + + private void exportShardStats() { + logger.debug("Collecting shard stats"); + ShardStats[] shardStatsArray = indicesService.shardStats(CommonStatsFlags.ALL); + + logger.debug("Exporting shards stats"); + for (StatsExporter e : exporters) { + try { + e.exportShardStats(shardStatsArray); + } catch (Throwable t) { + logger.error("StatsExporter [{}] has thrown an exception:", t, e.name()); + } + } + } + + private void exportNodeStats() { + logger.debug("Collecting node stats"); + NodeStats nodeStats = nodeService.stats(); + + logger.debug("Exporting node stats"); + for (StatsExporter e : exporters) { + try { + e.exportNodeStats(nodeStats); + } catch (Throwable t) { + logger.error("StatsExporter [{}] has thrown an exception:", t, e.name()); + } + } + } + } + + + class IndicesLifeCycleListener extends IndicesLifecycle.Listener { + @Override + public void afterIndexShardStarted(IndexShard indexShard) { + pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.STARTED, + indexShard.shardId(), indexShard.routingEntry())); + + } + + @Override + public void beforeIndexShardCreated(ShardId shardId) { + pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.CREATED, + shardId, null)); + } + + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { + pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.CLOSED, + indexShard.shardId(), indexShard.routingEntry())); + } } } diff --git a/src/main/java/org/elasticsearch/marvel/monitor/annotation/Annotation.java b/src/main/java/org/elasticsearch/marvel/monitor/annotation/Annotation.java new file mode 100644 index 00000000000..9d883db4ccc --- /dev/null +++ b/src/main/java/org/elasticsearch/marvel/monitor/annotation/Annotation.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.monitor.annotation; +import org.elasticsearch.common.joda.Joda; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public abstract class Annotation { + + public final static DateTimeFormatter datePrinter = Joda.forPattern("date_time").printer(); + + protected long timestamp; + + public Annotation(long timestamp) { + this.timestamp = timestamp; + } + + public long timestamp() { + return timestamp; + } + + /** + * @return annotation's type as a short string without spaces + */ + public abstract String type(); + + /** + * should return a short string based description of the annotation + */ + abstract String conciseDescription(); + + @Override + public String toString() { + return "[" + type() + "] annotation: [" + conciseDescription() + "]"; + } + + public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.field("@timestamp", datePrinter.print(timestamp)); + builder.field("message", conciseDescription()); + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/marvel/monitor/annotation/ShardEventAnnotation.java b/src/main/java/org/elasticsearch/marvel/monitor/annotation/ShardEventAnnotation.java new file mode 100644 index 00000000000..a99ae5bfdbf --- /dev/null +++ b/src/main/java/org/elasticsearch/marvel/monitor/annotation/ShardEventAnnotation.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.marvel.monitor.annotation; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class ShardEventAnnotation extends Annotation { + + private final ShardRouting shardRouting; + private final ShardId shardId; + private EventType event; + + public enum EventType { + CREATED, + STARTED, + CLOSED + } + + + public ShardEventAnnotation(long timestamp, EventType event, ShardId shardId, ShardRouting shardRouting) { + super(timestamp); + this.event = event; + this.shardId = shardId; + this.shardRouting = shardRouting; + } + + @Override + public String type() { + return "shard_event"; + } + + @Override + String conciseDescription() { + return "[" + event + "]" + (shardRouting != null ? shardRouting : shardId); + } + + @Override + public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException { + super.addXContentBody(builder, params); + builder.field("event", event); + builder.field("index", shardId.index()); + builder.field("shard_id", shardId.id()); + if (shardRouting != null) { + builder.field("routing"); + shardRouting.toXContent(builder, params); + } + + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/marvel/monitor/exporter/ESExporter.java b/src/main/java/org/elasticsearch/marvel/monitor/exporter/ESExporter.java index 62abc42fb82..65b9041527a 100644 --- a/src/main/java/org/elasticsearch/marvel/monitor/exporter/ESExporter.java +++ b/src/main/java/org/elasticsearch/marvel/monitor/exporter/ESExporter.java @@ -14,10 +14,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.joda.Joda; import org.elasticsearch.common.joda.time.format.DateTimeFormat; import org.elasticsearch.common.joda.time.format.DateTimeFormatter; -import org.elasticsearch.common.joda.time.format.ISODateTimeFormat; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.network.NetworkUtils; @@ -30,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.smile.SmileXContent; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.marvel.monitor.annotation.Annotation; import java.io.IOException; import java.io.InputStream; @@ -49,9 +49,10 @@ public class ESExporter extends AbstractLifecycleComponent implement final Discovery discovery; final String hostname; + // TODO: logger name is not good now. Figure out why. final ESLogger logger = ESLoggerFactory.getLogger(ESExporter.class.getName()); - public final static DateTimeFormatter defaultDatePrinter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC); + public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer(); boolean checkedForIndexTemplate = false; @@ -59,6 +60,7 @@ public class ESExporter extends AbstractLifecycleComponent implement final ShardStatsRenderer shardStatsRenderer; final IndexStatsRenderer indexStatsRenderer; final IndicesStatsRenderer indicesStatsRenderer; + final AnnotationsRenderer annotationsRenderer; public ESExporter(Settings settings, Discovery discovery) { super(settings); @@ -79,6 +81,7 @@ public class ESExporter extends AbstractLifecycleComponent implement shardStatsRenderer = new ShardStatsRenderer(); indexStatsRenderer = new IndexStatsRenderer(); indicesStatsRenderer = new IndicesStatsRenderer(); + annotationsRenderer = new AnnotationsRenderer(); logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat); } @@ -92,13 +95,13 @@ public class ESExporter extends AbstractLifecycleComponent implement @Override public void exportNodeStats(NodeStats nodeStats) { nodeStatsRenderer.reset(nodeStats); - exportXContent("node_stats", nodeStatsRenderer); + exportXContent(nodeStatsRenderer); } @Override public void exportShardStats(ShardStats[] shardStatsArray) { shardStatsRenderer.reset(shardStatsArray); - exportXContent("shard_stats", shardStatsRenderer); + exportXContent(shardStatsRenderer); } @Override @@ -112,8 +115,8 @@ public class ESExporter extends AbstractLifecycleComponent implement return; } try { - addXContentRendererToConnection(conn, "index_stats", indexStatsRenderer); - addXContentRendererToConnection(conn, "indices_stats", indicesStatsRenderer); + addXContentRendererToConnection(conn, indexStatsRenderer); + addXContentRendererToConnection(conn, indicesStatsRenderer); sendCloseExportingConnection(conn); } catch (IOException e) { logger.error("error sending data", e); @@ -121,6 +124,13 @@ public class ESExporter extends AbstractLifecycleComponent implement } } + @Override + public void exportAnnotations(Annotation[] annotations) { + annotationsRenderer.reset(annotations); + exportXContent(annotationsRenderer); + } + + private HttpURLConnection openExportingConnection() { if (!checkedForIndexTemplate) { if (!checkForIndexTemplate()) { @@ -137,14 +147,14 @@ public class ESExporter extends AbstractLifecycleComponent implement return conn; } - private void addXContentRendererToConnection(HttpURLConnection conn, String type, + private void addXContentRendererToConnection(HttpURLConnection conn, MultiXContentRenderer renderer) throws IOException { OutputStream os = conn.getOutputStream(); // TODO: find a way to disable builder's substream flushing or something neat solution for (int i = 0; i < renderer.length(); i++) { XContentBuilder builder = XContentFactory.smileBuilder(os); builder.startObject().startObject("index") - .field("_index", getIndexName()).field("_type", type).endObject().endObject(); + .field("_index", getIndexName()).field("_type", renderer.type(i)).endObject().endObject(); builder.flush(); os.write(SmileXContent.smileXContent.streamSeparator()); @@ -168,18 +178,17 @@ public class ESExporter extends AbstractLifecycleComponent implement } } - private void exportXContent(String type, MultiXContentRenderer xContentRenderer) { + private void exportXContent(MultiXContentRenderer xContentRenderer) { if (xContentRenderer.length() == 0) { return; } - logger.debug("exporting {}", type); HttpURLConnection conn = openExportingConnection(); if (conn == null) { return; } try { - addXContentRendererToConnection(conn, type, xContentRenderer); + addXContentRendererToConnection(conn, xContentRenderer); sendCloseExportingConnection(conn); } catch (IOException e) { logger.error("error sending data", e); @@ -300,6 +309,8 @@ public class ESExporter extends AbstractLifecycleComponent implement int length(); + String type(int i); + void render(int index, XContentBuilder builder) throws IOException; } @@ -353,6 +364,11 @@ public class ESExporter extends AbstractLifecycleComponent implement return 1; } + @Override + public String type(int i) { + return "node_stats"; + } + @Override public void render(int index, XContentBuilder builder) throws IOException { builder.startObject(); @@ -379,6 +395,11 @@ public class ESExporter extends AbstractLifecycleComponent implement return stats == null ? 0 : stats.length; } + @Override + public String type(int i) { + return "shard_stats"; + } + @Override public void render(int index, XContentBuilder builder) throws IOException { builder.startObject(); @@ -410,6 +431,11 @@ public class ESExporter extends AbstractLifecycleComponent implement return stats == null ? 0 : stats.length; } + @Override + public String type(int i) { + return "index_stats"; + } + @Override public void render(int index, XContentBuilder builder) throws IOException { builder.startObject(); @@ -445,6 +471,11 @@ public class ESExporter extends AbstractLifecycleComponent implement return totalStats == null ? 0 : 1; } + @Override + public String type(int i) { + return "indices_stats"; + } + @Override public void render(int index, XContentBuilder builder) throws IOException { assert index == 0; @@ -461,5 +492,35 @@ public class ESExporter extends AbstractLifecycleComponent implement } } + + class AnnotationsRenderer implements MultiXContentRenderer { + + Annotation[] annotations; + ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS; + + public void reset(Annotation[] annotations) { + this.annotations = annotations; + } + + @Override + public int length() { + return annotations == null ? 0 : annotations.length; + } + + @Override + public String type(int i) { + return annotations[i].type(); + } + + + @Override + public void render(int index, XContentBuilder builder) throws IOException { + builder.startObject(); + addNodeInfo(builder, "node"); + annotations[index].addXContentBody(builder, xContentParams); + builder.endObject(); + } + } + } diff --git a/src/main/java/org/elasticsearch/marvel/monitor/exporter/StatsExporter.java b/src/main/java/org/elasticsearch/marvel/monitor/exporter/StatsExporter.java index 860d88dead2..ffec9d567f9 100644 --- a/src/main/java/org/elasticsearch/marvel/monitor/exporter/StatsExporter.java +++ b/src/main/java/org/elasticsearch/marvel/monitor/exporter/StatsExporter.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.marvel.monitor.annotation.Annotation; import java.util.Map; @@ -23,4 +24,5 @@ public interface StatsExporter extends LifecycleComponent { void exportIndicesStats(IndicesStatsResponse indicesStats); + void exportAnnotations(Annotation[] annotations); }