diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/Plugin.java b/src/main/java/org/elasticsearch/enterprise/monitor/Plugin.java index 90c30d36f39..e4a7371cac0 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/Plugin.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/Plugin.java @@ -22,7 +22,7 @@ public class Plugin extends AbstractPlugin { @Override public String name() { - return "Dash"; + return "Elasticsearch enterprise - monitor"; } @Override diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java b/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java index 8d2b32363f8..342ee6e5db0 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/StatsExportersService.java @@ -4,25 +4,24 @@ * you may not use this file except in compliance with the Elastic License. */ package org.elasticsearch.enterprise.monitor; -import org.elasticsearch.common.collect.ImmutableSet; -import org.elasticsearch.enterprise.monitor.exporter.ESExporter; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.enterprise.monitor.exporter.ESExporter; import org.elasticsearch.enterprise.monitor.exporter.StatsExporter; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesService; import org.elasticsearch.node.service.NodeService; import java.util.Collection; -import java.util.List; public class StatsExportersService extends AbstractLifecycleComponent { @@ -52,7 +51,7 @@ public class StatsExportersService extends AbstractLifecycleComponent shardStatsList = indicesService.shardLevelStats(CommonStatsFlags.ALL); -// -// logger.debug("Exporting shards stats"); -// for (StatsExporter e : exporters) { -// try { -// for (ShardStats shardStats : shardStatsList) -// e.exportShardStats(shardStats); -// } catch (Throwable t) { -// logger.error("StatsExporter {} has thrown an exception:", t, e.name()); -// } -// } + logger.debug("Collecting shard stats"); + ShardStats[] shardStatsArray = indicesService.shardStats(CommonStatsFlags.ALL); + + logger.debug("Exporting shards stats"); + for (StatsExporter e : exporters) { + try { + e.exportShardStats(shardStatsArray); + } catch (Throwable t) { + logger.error("StatsExporter {} has thrown an exception:", t, e.name()); + } + } } catch (Throwable t) { logger.error("Background thread had an uncaught exception:", t); } diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/ESExporter.java b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/ESExporter.java index b537ac2beed..65d41fae289 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/ESExporter.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/ESExporter.java @@ -4,78 +4,68 @@ * you may not use this file except in compliance with the Elastic License. */ package org.elasticsearch.enterprise.monitor.exporter; -import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.common.joda.time.format.ISODateTimeFormat; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.smile.SmileXContent; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; -import java.net.MalformedURLException; import java.net.URL; -import java.net.URLEncoder; -import java.util.*; public class ESExporter extends AbstractLifecycleComponent implements StatsExporter { - final String targetHost; - final int targetPort; + final String[] hosts; + final String indexPrefix; + final DateTimeFormatter indexTimeFormatter; + final int timeout; - final String targetPathPrefix; final ClusterName clusterName; final ESLogger logger = ESLoggerFactory.getLogger(ESExporter.class.getName()); final ToXContent.Params xContentParams; + public final static DateTimeFormatter defaultDatePrinter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC); + + boolean checkedForIndexTemplate = false; + public ESExporter(Settings settings, ClusterName clusterName) { super(settings); this.clusterName = clusterName; - // TODO: move to a single settings. - targetHost = settings.get("target.host", "localhost"); - targetPort = settings.getAsInt("target.post", 9200); - String targetIndexPrefix = settings.get("target.index.prefix", ""); + hosts = settings.getAsArray("hosts", new String[]{"localhost:9200"}); + indexPrefix = settings.get("index.prefix", "es_monitor"); + String indexTimeFormat = settings.get("index.timeformat", "YYYY.MM.dd"); + indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat); - try { - if (!targetIndexPrefix.isEmpty()) targetIndexPrefix += targetIndexPrefix + "_"; - targetPathPrefix = "/"+ URLEncoder.encode(targetIndexPrefix,"UTF-8") + URLEncoder.encode(clusterName.value(),"UTF-8"); + timeout = (int) settings.getAsTime("timeout", new TimeValue(6000)).seconds(); - } catch (UnsupportedEncodingException e) { - throw new ElasticSearchException("Can't encode target url", e); - } + xContentParams = new ToXContent.MapParams( + ImmutableMap.of("load_average_format", "hash")); - xContentParams = new ToXContent.MapParams(ImmutableMap.of("human_readable", "false")); + logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat); - - logger.info("ESExporter initialized. Target: {}:{} Index prefix set to {}", targetHost, targetPort, targetIndexPrefix ); - // explode early on broken settings - getTargetURL("test"); - - } - - private URL getTargetURL(String type) { - try { - String path = String.format("%1$s%2$tY.%2$tm.%2$td/%3$s", targetPathPrefix, new Date(), type); - return new URL("http", targetHost, targetPort, path); - } catch (MalformedURLException e) { - throw new ElasticSearchIllegalArgumentException("Target settings result in a malformed url"); - } } @Override @@ -86,36 +76,61 @@ public class ESExporter extends AbstractLifecycleComponent implement @Override public void exportNodeStats(NodeStats nodeStats) { - exportXContent("nodestats", nodeStats); + exportXContent("nodestats", new ToXContent[]{nodeStats}, nodeStats.getTimestamp()); } @Override - public void exportShardStats(ShardStats shardStats) { - //exportXContent("shardstats", shardStats); + public void exportShardStats(ShardStats[] shardStatsArray) { + exportXContent("shardstats", shardStatsArray, System.currentTimeMillis()); } - private void exportXContent(String type,ToXContent xContent) { - URL url = getTargetURL(type); - logger.debug("Exporting {} to {}", type, url); - HttpURLConnection conn; - try { - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("POST"); - conn.setDoOutput(true); - conn.setRequestProperty("Content-Type", XContentType.SMILE.restContentType()); - OutputStream os = conn.getOutputStream(); - XContentBuilder builder = XContentFactory.smileBuilder(os); + private void exportXContent(String type, ToXContent[] xContentArray, long collectionTimestamp) { + if (xContentArray == null || xContentArray.length == 0) { + return; + } - builder.startObject(); - xContent.toXContent(builder, xContentParams); - builder.endObject(); - - builder.close(); - - if (conn.getResponseCode() != 201) { - logger.error("Remote target didn't respond with 201 Created"); + if (!checkedForIndexTemplate) { + if (!checkForIndexTemplate()) { + logger.debug("no template defined yet. skipping"); + return; } - conn.getInputStream().close(); // close and release to connection pool. + ; + } + + logger.debug("Exporting {}", type); + HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType()); + if (conn == null) { + logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts); + return; + } + try { + OutputStream os = conn.getOutputStream(); + // TODO: find a way to disable builder's substream flushing. + for (ToXContent xContent : xContentArray) { + XContentBuilder builder = XContentFactory.smileBuilder(os); + builder.startObject().startObject("index") + .field("_index", getIndexName()).field("_type", type).endObject().endObject(); + builder.flush(); + os.write(SmileXContent.smileXContent.streamSeparator()); + + builder = XContentFactory.smileBuilder(os); + builder.humanReadable(false); + builder.startObject(); + builder.field("@timestamp", defaultDatePrinter.print(collectionTimestamp)); + xContent.toXContent(builder, xContentParams); + builder.endObject(); + builder.flush(); + os.write(SmileXContent.smileXContent.streamSeparator()); + + } + os.close(); + + if (conn.getResponseCode() != 200) { + logConnectionError("remote target didn't respond with 200 OK", conn); + } else { + conn.getInputStream().close(); // close and release to connection pool. + } + } catch (IOException e) { logger.error("Error connecting to target", e); @@ -137,4 +152,99 @@ public class ESExporter extends AbstractLifecycleComponent implement protected void doClose() throws ElasticSearchException { } + + private String getIndexName() { + return indexPrefix + "-" + indexTimeFormatter.print(System.currentTimeMillis()); + + } + + + private HttpURLConnection openConnection(String method, String uri) { + return openConnection(method, uri, null); + } + + private HttpURLConnection openConnection(String method, String uri, String contentType) { + for (String host : hosts) { + try { + URL templateUrl = new URL("http://" + host + uri); + HttpURLConnection conn = (HttpURLConnection) templateUrl.openConnection(); + conn.setRequestMethod(method); + conn.setConnectTimeout(timeout); + if (contentType != null) { + conn.setRequestProperty("Content-Type", XContentType.SMILE.restContentType()); + } + conn.setUseCaches(false); + if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) { + conn.setDoOutput(true); + } + conn.connect(); + + return conn; + } catch (IOException e) { + logger.error("error connecting to [{}]: {}", host, e); + } + } + + return null; + } + + private boolean checkForIndexTemplate() { + try { + + + String templateName = "enterprise.monitor." + indexPrefix; + + logger.debug("checking of target has template [{}]", templateName); + // DO HEAD REQUEST, when elasticsearch supports it + HttpURLConnection conn = openConnection("GET", "/_template/" + templateName); + if (conn == null) { + logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts); + return false; + } + + boolean hasTemplate = conn.getResponseCode() == 200; + + // nothing there, lets create it + if (!hasTemplate) { + logger.debug("no template found in elasticsearch for [{}]. Adding...", templateName); + conn = openConnection("PUT", "/_template/" + templateName, XContentType.SMILE.restContentType()); + OutputStream os = conn.getOutputStream(); + XContentBuilder builder = XContentFactory.smileBuilder(os); + builder.startObject(); + builder.field("template", indexPrefix + "*"); + builder.startObject("mappings").startObject("_default_"); + builder.startArray("dynamic_templates").startObject().startObject("string_fields") + .field("match", "*") + .field("match_mapping_type", "string") + .startObject("mapping").field("index", "not_analyzed").endObject() + .endObject().endObject().endArray(); + builder.endObject().endObject(); // mapping + root object. + builder.close(); + os.close(); + + if (conn.getResponseCode() != 200) { + logConnectionError("error adding index template to elasticsearch", conn); + } + conn.getInputStream().close(); // close and release to connection pool. + + } + checkedForIndexTemplate = true; + } catch (IOException e) { + logger.error("Error when checking/adding metrics template to elasticsearch", e); + return false; + } + return true; + } + + private void logConnectionError(String msg, HttpURLConnection conn) { + InputStream inputStream = conn.getErrorStream(); + java.util.Scanner s = new java.util.Scanner(inputStream, "UTF-8").useDelimiter("\\A"); + String err = s.hasNext() ? s.next() : ""; + try { + logger.error("{} response code [{} {}]. content: {}", msg, conn.getResponseCode(), conn.getResponseMessage(), err); + } catch (IOException e) { + logger.error("connection had an error while reporting the error. tough life."); + } + } } + diff --git a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java index 9e841848c60..93875ef978b 100644 --- a/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java +++ b/src/main/java/org/elasticsearch/enterprise/monitor/exporter/StatsExporter.java @@ -14,5 +14,5 @@ public interface StatsExporter extends LifecycleComponent { void exportNodeStats(NodeStats nodeStats); - void exportShardStats(ShardStats shardStats); + void exportShardStats(ShardStats[] shardStatsArray); }