Added index and indices aggregate stats exported.

Added current version of dashboards

Original commit: elastic/x-pack-elasticsearch@657fea032e
This commit is contained in:
Boaz Leskes 2013-10-28 13:51:43 +01:00
parent 2adb58159f
commit 9604447d4d
3 changed files with 187 additions and 44 deletions

View File

@ -7,8 +7,10 @@ package org.elasticsearch.enterprise.monitor;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -28,6 +30,8 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
private final InternalIndicesService indicesService;
private final NodeService nodeService;
private final ClusterService clusterService;
private final Client client;
private volatile ExportingWorker exp;
private volatile Thread thread;
@ -37,11 +41,15 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
@Inject
public StatsExportersService(Settings settings, IndicesService indicesService,
NodeService nodeService, Discovery discovery) {
NodeService nodeService, ClusterService clusterService,
Client client,
Discovery discovery) {
super(settings);
this.indicesService = (InternalIndicesService) indicesService;
this.clusterService = clusterService;
this.nodeService = nodeService;
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(5));
this.client = client;
StatsExporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), discovery);
this.exporters = ImmutableSet.of(esExporter);
@ -79,7 +87,7 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
@Override
public void run() {
while (!closed) {
// sleep first to allow node to complete initialization before collectiont the first start
// sleep first to allow node to complete initialization before collecting the first start
try {
Thread.sleep(interval.millis());
} catch (InterruptedException e) {
@ -96,7 +104,7 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
try {
e.exportNodeStats(nodeStats);
} catch (Throwable t) {
logger.error("StatsExporter {} has thrown an exception:", t, e.name());
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
}
}
@ -108,9 +116,27 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
try {
e.exportShardStats(shardStatsArray);
} catch (Throwable t) {
logger.error("StatsExporter {} has thrown an exception:", t, e.name());
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
}
}
if (clusterService.state().nodes().localNodeMaster()) {
logger.debug("local node is master, exporting aggregated stats");
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().get();
for (StatsExporter e : exporters) {
try {
e.exportIndicesStats(indicesStatsResponse);
} catch (Throwable t) {
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
}
}
}
} catch (Throwable t) {
logger.error("Background thread had an uncaught exception:", t);
}

View File

@ -6,8 +6,10 @@
package org.elasticsearch.enterprise.monitor.exporter;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableMap;
@ -28,8 +30,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.node.service.NodeService;
import java.io.IOException;
import java.io.InputStream;
@ -57,6 +57,8 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
final NodeStatsRenderer nodeStatsRenderer;
final ShardStatsRenderer shardStatsRenderer;
final IndexStatsRenderer indexStatsRenderer;
final IndicesStatsRenderer indicesStatsRenderer;
public ESExporter(Settings settings, Discovery discovery) {
super(settings);
@ -75,6 +77,8 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
nodeStatsRenderer = new NodeStatsRenderer();
shardStatsRenderer = new ShardStatsRenderer();
indexStatsRenderer = new IndexStatsRenderer();
indicesStatsRenderer = new IndicesStatsRenderer();
logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
}
@ -97,52 +101,86 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
exportXContent("shard_stats", shardStatsRenderer);
}
@Override
public void exportIndicesStats(IndicesStatsResponse indicesStats) {
Map<String, IndexStats> perIndexStats = indicesStats.getIndices();
indexStatsRenderer.reset(perIndexStats.values().toArray(new IndexStats[perIndexStats.size()]));
indicesStatsRenderer.reset(indicesStats.getTotal(), indicesStats.getPrimaries());
logger.debug("exporting index_stats + indices_stats");
HttpURLConnection conn = openExportingConnection();
if (conn == null) {
return;
}
try {
addXContentRendererToConnection(conn, "index_stats", indexStatsRenderer);
addXContentRendererToConnection(conn, "indices_stats", indicesStatsRenderer);
sendCloseExportingConnection(conn);
} catch (IOException e) {
logger.error("error sending data", e);
return;
}
}
private HttpURLConnection openExportingConnection() {
if (!checkedForIndexTemplate) {
if (!checkForIndexTemplate()) {
logger.debug("no template defined yet. skipping");
return null;
}
}
logger.trace("setting up an export connection");
HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType());
if (conn == null) {
logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);
}
return conn;
}
private void addXContentRendererToConnection(HttpURLConnection conn, String type,
MultiXContentRenderer renderer) throws IOException {
OutputStream os = conn.getOutputStream();
// TODO: find a way to disable builder's substream flushing or something neat solution
for (int i = 0; i < renderer.length(); i++) {
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject().startObject("index")
.field("_index", getIndexName()).field("_type", type).endObject().endObject();
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
builder = XContentFactory.smileBuilder(os);
builder.humanReadable(false);
renderer.render(i, builder);
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
}
}
private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException {
logger.trace("sending exporting content");
OutputStream os = conn.getOutputStream();
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn);
} else {
conn.getInputStream().close(); // close and release to connection pool.
}
}
private void exportXContent(String type, MultiXContentRenderer xContentRenderer) {
if (xContentRenderer.length() == 0) {
return;
}
if (!checkedForIndexTemplate) {
if (!checkForIndexTemplate()) {
logger.debug("no template defined yet. skipping");
return;
}
;
}
logger.debug("exporting {}", type);
HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType());
HttpURLConnection conn = openExportingConnection();
if (conn == null) {
logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);
return;
}
try {
OutputStream os = conn.getOutputStream();
// TODO: find a way to disable builder's substream flushing or something neat solution
for (int i = 0; i < xContentRenderer.length(); i++) {
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject().startObject("index")
.field("_index", getIndexName()).field("_type", type).endObject().endObject();
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
builder = XContentFactory.smileBuilder(os);
builder.humanReadable(false);
xContentRenderer.render(i, builder);
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
}
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn);
} else {
conn.getInputStream().close(); // close and release to connection pool.
}
addXContentRendererToConnection(conn, type, xContentRenderer);
sendCloseExportingConnection(conn);
} catch (IOException e) {
logger.error("error sending data", e);
return;
@ -267,7 +305,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
private void addNodeInfo(XContentBuilder builder) throws IOException {
builder.startObject("node");
addNodeInfo(builder, "node");
}
private void addNodeInfo(XContentBuilder builder, String fieldname) throws IOException {
builder.startObject(fieldname);
DiscoveryNode node = discovery.localNode();
builder.field("id", node.id());
builder.field("name", node.name());
@ -352,5 +394,72 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
}
}
class IndexStatsRenderer implements MultiXContentRenderer {
IndexStats[] stats;
long collectionTime;
ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS;
public void reset(IndexStats[] stats) {
this.stats = stats;
collectionTime = System.currentTimeMillis();
}
@Override
public int length() {
return stats == null ? 0 : stats.length;
}
@Override
public void render(int index, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("@timestamp", defaultDatePrinter.print(collectionTime));
IndexStats indexStats = stats[index];
builder.field("index", indexStats.getIndex());
addNodeInfo(builder, "_source_node");
builder.startObject("primaries");
indexStats.getPrimaries().toXContent(builder, xContentParams);
builder.endObject();
builder.startObject("total");
indexStats.getTotal().toXContent(builder, xContentParams);
builder.endObject();
builder.endObject();
}
}
class IndicesStatsRenderer implements MultiXContentRenderer {
CommonStats totalStats;
CommonStats primariesStats;
long collectionTime;
ToXContent.Params xContentParams = ToXContent.EMPTY_PARAMS;
public void reset(CommonStats totalStats, CommonStats primariesStats) {
this.totalStats = totalStats;
this.primariesStats = primariesStats;
collectionTime = System.currentTimeMillis();
}
@Override
public int length() {
return totalStats == null ? 0 : 1;
}
@Override
public void render(int index, XContentBuilder builder) throws IOException {
assert index == 0;
builder.startObject();
builder.field("@timestamp", defaultDatePrinter.print(collectionTime));
addNodeInfo(builder, "_source_node");
builder.startObject("primaries");
primariesStats.toXContent(builder, xContentParams);
builder.endObject();
builder.startObject("total");
totalStats.toXContent(builder, xContentParams);
builder.endObject();
builder.endObject();
}
}
}

View File

@ -5,9 +5,14 @@
*/
package org.elasticsearch.enterprise.monitor.exporter;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.common.component.LifecycleComponent;
import java.util.Map;
public interface StatsExporter<T> extends LifecycleComponent<T> {
String name();
@ -15,4 +20,7 @@ public interface StatsExporter<T> extends LifecycleComponent<T> {
void exportNodeStats(NodeStats nodeStats);
void exportShardStats(ShardStats[] shardStatsArray);
void exportIndicesStats(IndicesStatsResponse indicesStats);
}