Added annotation infrastructure + a implementation for shard events
Original commit: elastic/x-pack-elasticsearch@bfdf001a6f
This commit is contained in:
parent
06b392d581
commit
80b85f79ce
|
@ -11,20 +11,30 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.collect.ImmutableSet;
|
import org.elasticsearch.common.collect.ImmutableSet;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
|
import org.elasticsearch.marvel.monitor.annotation.Annotation;
|
||||||
|
import org.elasticsearch.marvel.monitor.annotation.ShardEventAnnotation;
|
||||||
import org.elasticsearch.marvel.monitor.exporter.ESExporter;
|
import org.elasticsearch.marvel.monitor.exporter.ESExporter;
|
||||||
import org.elasticsearch.marvel.monitor.exporter.StatsExporter;
|
import org.elasticsearch.marvel.monitor.exporter.StatsExporter;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.InternalIndicesService;
|
import org.elasticsearch.indices.InternalIndicesService;
|
||||||
import org.elasticsearch.node.service.NodeService;
|
import org.elasticsearch.node.service.NodeService;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
|
||||||
public class StatsExportersService extends AbstractLifecycleComponent<StatsExportersService> {
|
public class StatsExportersService extends AbstractLifecycleComponent<StatsExportersService> {
|
||||||
|
|
||||||
|
@ -33,12 +43,16 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
|
||||||
|
private final IndicesLifecycle.Listener indicesLifeCycleListener;
|
||||||
|
|
||||||
private volatile ExportingWorker exp;
|
private volatile ExportingWorker exp;
|
||||||
private volatile Thread thread;
|
private volatile Thread thread;
|
||||||
private final TimeValue interval;
|
private final TimeValue interval;
|
||||||
|
|
||||||
private Collection<StatsExporter> exporters;
|
private Collection<StatsExporter> exporters;
|
||||||
|
|
||||||
|
private final BlockingQueue<Annotation> pendingAnnotationsQueue;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public StatsExportersService(Settings settings, IndicesService indicesService,
|
public StatsExportersService(Settings settings, IndicesService indicesService,
|
||||||
NodeService nodeService, ClusterService clusterService,
|
NodeService nodeService, ClusterService clusterService,
|
||||||
|
@ -53,6 +67,9 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
|
||||||
|
|
||||||
StatsExporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), discovery);
|
StatsExporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), discovery);
|
||||||
this.exporters = ImmutableSet.of(esExporter);
|
this.exporters = ImmutableSet.of(esExporter);
|
||||||
|
|
||||||
|
indicesLifeCycleListener = new IndicesLifeCycleListener();
|
||||||
|
pendingAnnotationsQueue = ConcurrentCollections.newBlockingQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,14 +81,24 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
|
||||||
this.thread = new Thread(exp, EsExecutors.threadName(settings, "monitor"));
|
this.thread = new Thread(exp, EsExecutors.threadName(settings, "monitor"));
|
||||||
this.thread.setDaemon(true);
|
this.thread.setDaemon(true);
|
||||||
this.thread.start();
|
this.thread.start();
|
||||||
|
|
||||||
|
indicesService.indicesLifecycle().addListener(indicesLifeCycleListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws ElasticSearchException {
|
protected void doStop() throws ElasticSearchException {
|
||||||
this.exp.closed = true;
|
this.exp.closed = true;
|
||||||
this.thread.interrupt();
|
this.thread.interrupt();
|
||||||
|
try {
|
||||||
|
this.thread.join(60000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// we don't care...
|
||||||
|
}
|
||||||
for (StatsExporter e : exporters)
|
for (StatsExporter e : exporters)
|
||||||
e.stop();
|
e.stop();
|
||||||
|
|
||||||
|
indicesService.indicesLifecycle().removeListener(indicesLifeCycleListener);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -96,52 +123,106 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
|
||||||
|
|
||||||
// do the actual export..., go over the actual exporters list and...
|
// do the actual export..., go over the actual exporters list and...
|
||||||
try {
|
try {
|
||||||
logger.debug("Collecting node stats");
|
exportNodeStats();
|
||||||
NodeStats nodeStats = nodeService.stats();
|
|
||||||
|
|
||||||
logger.debug("Exporting node stats");
|
exportShardStats();
|
||||||
for (StatsExporter e : exporters) {
|
|
||||||
try {
|
|
||||||
e.exportNodeStats(nodeStats);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug("Collecting shard stats");
|
|
||||||
ShardStats[] shardStatsArray = indicesService.shardStats(CommonStatsFlags.ALL);
|
|
||||||
|
|
||||||
logger.debug("Exporting shards stats");
|
|
||||||
for (StatsExporter e : exporters) {
|
|
||||||
try {
|
|
||||||
e.exportShardStats(shardStatsArray);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
exportAnnotations();
|
||||||
|
|
||||||
if (clusterService.state().nodes().localNodeMaster()) {
|
if (clusterService.state().nodes().localNodeMaster()) {
|
||||||
logger.debug("local node is master, exporting aggregated stats");
|
exportIndicesStats();
|
||||||
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().get();
|
|
||||||
for (StatsExporter e : exporters) {
|
|
||||||
try {
|
|
||||||
e.exportIndicesStats(indicesStatsResponse);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logger.error("Background thread had an uncaught exception:", t);
|
logger.error("Background thread had an uncaught exception:", t);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug("shutting down worker, exporting pending annotation");
|
||||||
|
exportAnnotations();
|
||||||
|
|
||||||
|
logger.debug("worker shutdown");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exportIndicesStats() {
|
||||||
|
logger.debug("local node is master, exporting aggregated stats");
|
||||||
|
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().get();
|
||||||
|
for (StatsExporter e : exporters) {
|
||||||
|
try {
|
||||||
|
e.exportIndicesStats(indicesStatsResponse);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exportAnnotations() {
|
||||||
|
logger.debug("Exporting annotations");
|
||||||
|
ArrayList<Annotation> annotationsList = new ArrayList<Annotation>(pendingAnnotationsQueue.size());
|
||||||
|
pendingAnnotationsQueue.drainTo(annotationsList);
|
||||||
|
Annotation[] annotations = new Annotation[annotationsList.size()];
|
||||||
|
annotationsList.toArray(annotations);
|
||||||
|
|
||||||
|
for (StatsExporter e : exporters) {
|
||||||
|
try {
|
||||||
|
e.exportAnnotations(annotations);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exportShardStats() {
|
||||||
|
logger.debug("Collecting shard stats");
|
||||||
|
ShardStats[] shardStatsArray = indicesService.shardStats(CommonStatsFlags.ALL);
|
||||||
|
|
||||||
|
logger.debug("Exporting shards stats");
|
||||||
|
for (StatsExporter e : exporters) {
|
||||||
|
try {
|
||||||
|
e.exportShardStats(shardStatsArray);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exportNodeStats() {
|
||||||
|
logger.debug("Collecting node stats");
|
||||||
|
NodeStats nodeStats = nodeService.stats();
|
||||||
|
|
||||||
|
logger.debug("Exporting node stats");
|
||||||
|
for (StatsExporter e : exporters) {
|
||||||
|
try {
|
||||||
|
e.exportNodeStats(nodeStats);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class IndicesLifeCycleListener extends IndicesLifecycle.Listener {
|
||||||
|
@Override
|
||||||
|
public void afterIndexShardStarted(IndexShard indexShard) {
|
||||||
|
pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.STARTED,
|
||||||
|
indexShard.shardId(), indexShard.routingEntry()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeIndexShardCreated(ShardId shardId) {
|
||||||
|
pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.CREATED,
|
||||||
|
shardId, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
|
||||||
|
pendingAnnotationsQueue.add(new ShardEventAnnotation(System.currentTimeMillis(), ShardEventAnnotation.EventType.CLOSED,
|
||||||
|
indexShard.shardId(), indexShard.routingEntry()));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.marvel.monitor.annotation;
|
||||||
|
import org.elasticsearch.common.joda.Joda;
|
||||||
|
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public abstract class Annotation {
|
||||||
|
|
||||||
|
public final static DateTimeFormatter datePrinter = Joda.forPattern("date_time").printer();
|
||||||
|
|
||||||
|
protected long timestamp;
|
||||||
|
|
||||||
|
public Annotation(long timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long timestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return annotation's type as a short string without spaces
|
||||||
|
*/
|
||||||
|
public abstract String type();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* should return a short string based description of the annotation
|
||||||
|
*/
|
||||||
|
abstract String conciseDescription();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "[" + type() + "] annotation: [" + conciseDescription() + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||||
|
builder.field("@timestamp", datePrinter.print(timestamp));
|
||||||
|
builder.field("message", conciseDescription());
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.marvel.monitor.annotation;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class ShardEventAnnotation extends Annotation {
|
||||||
|
|
||||||
|
private final ShardRouting shardRouting;
|
||||||
|
private final ShardId shardId;
|
||||||
|
private EventType event;
|
||||||
|
|
||||||
|
public enum EventType {
|
||||||
|
CREATED,
|
||||||
|
STARTED,
|
||||||
|
CLOSED
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public ShardEventAnnotation(long timestamp, EventType event, ShardId shardId, ShardRouting shardRouting) {
|
||||||
|
super(timestamp);
|
||||||
|
this.event = event;
|
||||||
|
this.shardId = shardId;
|
||||||
|
this.shardRouting = shardRouting;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type() {
|
||||||
|
return "shard_event";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String conciseDescription() {
|
||||||
|
return "[" + event + "]" + (shardRouting != null ? shardRouting : shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||||
|
super.addXContentBody(builder, params);
|
||||||
|
builder.field("event", event);
|
||||||
|
builder.field("index", shardId.index());
|
||||||
|
builder.field("shard_id", shardId.id());
|
||||||
|
if (shardRouting != null) {
|
||||||
|
builder.field("routing");
|
||||||
|
shardRouting.toXContent(builder, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,10 +14,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
import org.elasticsearch.common.joda.Joda;
|
||||||
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
|
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
|
||||||
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
|
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
|
||||||
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
|
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
import org.elasticsearch.common.network.NetworkUtils;
|
import org.elasticsearch.common.network.NetworkUtils;
|
||||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.common.xcontent.smile.SmileXContent;
|
import org.elasticsearch.common.xcontent.smile.SmileXContent;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
|
import org.elasticsearch.marvel.monitor.annotation.Annotation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -49,9 +49,10 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
final Discovery discovery;
|
final Discovery discovery;
|
||||||
final String hostname;
|
final String hostname;
|
||||||
|
|
||||||
|
// TODO: logger name is not good now. Figure out why.
|
||||||
final ESLogger logger = ESLoggerFactory.getLogger(ESExporter.class.getName());
|
final ESLogger logger = ESLoggerFactory.getLogger(ESExporter.class.getName());
|
||||||
|
|
||||||
public final static DateTimeFormatter defaultDatePrinter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC);
|
public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer();
|
||||||
|
|
||||||
boolean checkedForIndexTemplate = false;
|
boolean checkedForIndexTemplate = false;
|
||||||
|
|
||||||
|
@ -59,6 +60,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
final ShardStatsRenderer shardStatsRenderer;
|
final ShardStatsRenderer shardStatsRenderer;
|
||||||
final IndexStatsRenderer indexStatsRenderer;
|
final IndexStatsRenderer indexStatsRenderer;
|
||||||
final IndicesStatsRenderer indicesStatsRenderer;
|
final IndicesStatsRenderer indicesStatsRenderer;
|
||||||
|
final AnnotationsRenderer annotationsRenderer;
|
||||||
|
|
||||||
public ESExporter(Settings settings, Discovery discovery) {
|
public ESExporter(Settings settings, Discovery discovery) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
@ -79,6 +81,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
shardStatsRenderer = new ShardStatsRenderer();
|
shardStatsRenderer = new ShardStatsRenderer();
|
||||||
indexStatsRenderer = new IndexStatsRenderer();
|
indexStatsRenderer = new IndexStatsRenderer();
|
||||||
indicesStatsRenderer = new IndicesStatsRenderer();
|
indicesStatsRenderer = new IndicesStatsRenderer();
|
||||||
|
annotationsRenderer = new AnnotationsRenderer();
|
||||||
|
|
||||||
logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
|
logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
|
||||||
}
|
}
|
||||||
|
@ -92,13 +95,13 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
@Override
|
@Override
|
||||||
public void exportNodeStats(NodeStats nodeStats) {
|
public void exportNodeStats(NodeStats nodeStats) {
|
||||||
nodeStatsRenderer.reset(nodeStats);
|
nodeStatsRenderer.reset(nodeStats);
|
||||||
exportXContent("node_stats", nodeStatsRenderer);
|
exportXContent(nodeStatsRenderer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exportShardStats(ShardStats[] shardStatsArray) {
|
public void exportShardStats(ShardStats[] shardStatsArray) {
|
||||||
shardStatsRenderer.reset(shardStatsArray);
|
shardStatsRenderer.reset(shardStatsArray);
|
||||||
exportXContent("shard_stats", shardStatsRenderer);
|
exportXContent(shardStatsRenderer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -112,8 +115,8 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
addXContentRendererToConnection(conn, "index_stats", indexStatsRenderer);
|
addXContentRendererToConnection(conn, indexStatsRenderer);
|
||||||
addXContentRendererToConnection(conn, "indices_stats", indicesStatsRenderer);
|
addXContentRendererToConnection(conn, indicesStatsRenderer);
|
||||||
sendCloseExportingConnection(conn);
|
sendCloseExportingConnection(conn);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("error sending data", e);
|
logger.error("error sending data", e);
|
||||||
|
@ -121,6 +124,13 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exportAnnotations(Annotation[] annotations) {
|
||||||
|
annotationsRenderer.reset(annotations);
|
||||||
|
exportXContent(annotationsRenderer);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private HttpURLConnection openExportingConnection() {
|
private HttpURLConnection openExportingConnection() {
|
||||||
if (!checkedForIndexTemplate) {
|
if (!checkedForIndexTemplate) {
|
||||||
if (!checkForIndexTemplate()) {
|
if (!checkForIndexTemplate()) {
|
||||||
|
@ -137,14 +147,14 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addXContentRendererToConnection(HttpURLConnection conn, String type,
|
private void addXContentRendererToConnection(HttpURLConnection conn,
|
||||||
MultiXContentRenderer renderer) throws IOException {
|
MultiXContentRenderer renderer) throws IOException {
|
||||||
OutputStream os = conn.getOutputStream();
|
OutputStream os = conn.getOutputStream();
|
||||||
// TODO: find a way to disable builder's substream flushing or something neat solution
|
// TODO: find a way to disable builder's substream flushing or something neat solution
|
||||||
for (int i = 0; i < renderer.length(); i++) {
|
for (int i = 0; i < renderer.length(); i++) {
|
||||||
XContentBuilder builder = XContentFactory.smileBuilder(os);
|
XContentBuilder builder = XContentFactory.smileBuilder(os);
|
||||||
builder.startObject().startObject("index")
|
builder.startObject().startObject("index")
|
||||||
.field("_index", getIndexName()).field("_type", type).endObject().endObject();
|
.field("_index", getIndexName()).field("_type", renderer.type(i)).endObject().endObject();
|
||||||
builder.flush();
|
builder.flush();
|
||||||
os.write(SmileXContent.smileXContent.streamSeparator());
|
os.write(SmileXContent.smileXContent.streamSeparator());
|
||||||
|
|
||||||
|
@ -168,18 +178,17 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void exportXContent(String type, MultiXContentRenderer xContentRenderer) {
|
private void exportXContent(MultiXContentRenderer xContentRenderer) {
|
||||||
if (xContentRenderer.length() == 0) {
|
if (xContentRenderer.length() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("exporting {}", type);
|
|
||||||
HttpURLConnection conn = openExportingConnection();
|
HttpURLConnection conn = openExportingConnection();
|
||||||
if (conn == null) {
|
if (conn == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
addXContentRendererToConnection(conn, type, xContentRenderer);
|
addXContentRendererToConnection(conn, xContentRenderer);
|
||||||
sendCloseExportingConnection(conn);
|
sendCloseExportingConnection(conn);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("error sending data", e);
|
logger.error("error sending data", e);
|
||||||
|
@ -300,6 +309,8 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
|
|
||||||
int length();
|
int length();
|
||||||
|
|
||||||
|
String type(int i);
|
||||||
|
|
||||||
void render(int index, XContentBuilder builder) throws IOException;
|
void render(int index, XContentBuilder builder) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -353,6 +364,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type(int i) {
|
||||||
|
return "node_stats";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void render(int index, XContentBuilder builder) throws IOException {
|
public void render(int index, XContentBuilder builder) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
@ -379,6 +395,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return stats == null ? 0 : stats.length;
|
return stats == null ? 0 : stats.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type(int i) {
|
||||||
|
return "shard_stats";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void render(int index, XContentBuilder builder) throws IOException {
|
public void render(int index, XContentBuilder builder) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
@ -410,6 +431,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return stats == null ? 0 : stats.length;
|
return stats == null ? 0 : stats.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type(int i) {
|
||||||
|
return "index_stats";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void render(int index, XContentBuilder builder) throws IOException {
|
public void render(int index, XContentBuilder builder) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
@ -445,6 +471,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
return totalStats == null ? 0 : 1;
|
return totalStats == null ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type(int i) {
|
||||||
|
return "indices_stats";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void render(int index, XContentBuilder builder) throws IOException {
|
public void render(int index, XContentBuilder builder) throws IOException {
|
||||||
assert index == 0;
|
assert index == 0;
|
||||||
|
@ -461,5 +492,35 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class AnnotationsRenderer implements MultiXContentRenderer {
|
||||||
|
|
||||||
|
Annotation[] annotations;
|
||||||
|
ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS;
|
||||||
|
|
||||||
|
public void reset(Annotation[] annotations) {
|
||||||
|
this.annotations = annotations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int length() {
|
||||||
|
return annotations == null ? 0 : annotations.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type(int i) {
|
||||||
|
return annotations[i].type();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void render(int index, XContentBuilder builder) throws IOException {
|
||||||
|
builder.startObject();
|
||||||
|
addNodeInfo(builder, "node");
|
||||||
|
annotations[index].addXContentBody(builder, xContentParams);
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.common.component.LifecycleComponent;
|
import org.elasticsearch.common.component.LifecycleComponent;
|
||||||
|
import org.elasticsearch.marvel.monitor.annotation.Annotation;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -23,4 +24,5 @@ public interface StatsExporter<T> extends LifecycleComponent<T> {
|
||||||
|
|
||||||
void exportIndicesStats(IndicesStatsResponse indicesStats);
|
void exportIndicesStats(IndicesStatsResponse indicesStats);
|
||||||
|
|
||||||
|
void exportAnnotations(Annotation[] annotations);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue