Extended ESExporter infrastructure to use index templates and multiple hosts

Original commit: elastic/x-pack-elasticsearch@d0f4837110
This commit is contained in:
Boaz Leskes 2013-09-30 17:01:09 +02:00
parent 318b0e7b88
commit 41c7e045a7
4 changed files with 183 additions and 75 deletions

View File

@ -22,7 +22,7 @@ public class Plugin extends AbstractPlugin {
@Override
public String name() {
return "Dash";
return "Elasticsearch enterprise - monitor";
}
@Override

View File

@ -4,25 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.enterprise.monitor;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.enterprise.monitor.exporter.ESExporter;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.enterprise.monitor.exporter.ESExporter;
import org.elasticsearch.enterprise.monitor.exporter.StatsExporter;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesService;
import org.elasticsearch.node.service.NodeService;
import java.util.Collection;
import java.util.List;
public class StatsExportersService extends AbstractLifecycleComponent<StatsExportersService> {
@ -52,7 +51,7 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
e.start();
this.exp = new ExportingWorker();
this.thread = new Thread(exp, EsExecutors.threadName(settings, "dash"));
this.thread = new Thread(exp, EsExecutors.threadName(settings, "monitor"));
this.thread.setDaemon(true);
this.thread.start();
}
@ -92,18 +91,17 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
}
}
// logger.warn("Collecting shard stats");
// List<ShardStats> shardStatsList = indicesService.shardLevelStats(CommonStatsFlags.ALL);
//
// logger.debug("Exporting shards stats");
// for (StatsExporter e : exporters) {
// try {
// for (ShardStats shardStats : shardStatsList)
// e.exportShardStats(shardStats);
// } catch (Throwable t) {
// logger.error("StatsExporter {} has thrown an exception:", t, e.name());
// }
// }
logger.debug("Collecting shard stats");
ShardStats[] shardStatsArray = indicesService.shardStats(CommonStatsFlags.ALL);
logger.debug("Exporting shards stats");
for (StatsExporter e : exporters) {
try {
e.exportShardStats(shardStatsArray);
} catch (Throwable t) {
logger.error("StatsExporter {} has thrown an exception:", t, e.name());
}
}
} catch (Throwable t) {
logger.error("Background thread had an uncaught exception:", t);
}

View File

@ -4,78 +4,68 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.enterprise.monitor.exporter;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.*;
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements StatsExporter<ESExporter> {
final String targetHost;
final int targetPort;
final String[] hosts;
final String indexPrefix;
final DateTimeFormatter indexTimeFormatter;
final int timeout;
final String targetPathPrefix;
final ClusterName clusterName;
final ESLogger logger = ESLoggerFactory.getLogger(ESExporter.class.getName());
final ToXContent.Params xContentParams;
public final static DateTimeFormatter defaultDatePrinter = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC);
boolean checkedForIndexTemplate = false;
public ESExporter(Settings settings, ClusterName clusterName) {
super(settings);
this.clusterName = clusterName;
// TODO: move to a single settings.
targetHost = settings.get("target.host", "localhost");
targetPort = settings.getAsInt("target.post", 9200);
String targetIndexPrefix = settings.get("target.index.prefix", "");
hosts = settings.getAsArray("hosts", new String[]{"localhost:9200"});
indexPrefix = settings.get("index.prefix", "es_monitor");
String indexTimeFormat = settings.get("index.timeformat", "YYYY.MM.dd");
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat);
try {
if (!targetIndexPrefix.isEmpty()) targetIndexPrefix += targetIndexPrefix + "_";
targetPathPrefix = "/"+ URLEncoder.encode(targetIndexPrefix,"UTF-8") + URLEncoder.encode(clusterName.value(),"UTF-8");
timeout = (int) settings.getAsTime("timeout", new TimeValue(6000)).seconds();
} catch (UnsupportedEncodingException e) {
throw new ElasticSearchException("Can't encode target url", e);
}
xContentParams = new ToXContent.MapParams(
ImmutableMap.of("load_average_format", "hash"));
xContentParams = new ToXContent.MapParams(ImmutableMap.of("human_readable", "false"));
logger.info("ESExporter initialized. Targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
logger.info("ESExporter initialized. Target: {}:{} Index prefix set to {}", targetHost, targetPort, targetIndexPrefix );
// explode early on broken settings
getTargetURL("test");
}
private URL getTargetURL(String type) {
try {
String path = String.format("%1$s%2$tY.%2$tm.%2$td/%3$s", targetPathPrefix, new Date(), type);
return new URL("http", targetHost, targetPort, path);
} catch (MalformedURLException e) {
throw new ElasticSearchIllegalArgumentException("Target settings result in a malformed url");
}
}
@Override
@ -86,36 +76,61 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
@Override
public void exportNodeStats(NodeStats nodeStats) {
exportXContent("nodestats", nodeStats);
exportXContent("nodestats", new ToXContent[]{nodeStats}, nodeStats.getTimestamp());
}
@Override
public void exportShardStats(ShardStats shardStats) {
//exportXContent("shardstats", shardStats);
public void exportShardStats(ShardStats[] shardStatsArray) {
exportXContent("shardstats", shardStatsArray, System.currentTimeMillis());
}
private void exportXContent(String type,ToXContent xContent) {
URL url = getTargetURL(type);
logger.debug("Exporting {} to {}", type, url);
HttpURLConnection conn;
try {
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setRequestProperty("Content-Type", XContentType.SMILE.restContentType());
OutputStream os = conn.getOutputStream();
XContentBuilder builder = XContentFactory.smileBuilder(os);
private void exportXContent(String type, ToXContent[] xContentArray, long collectionTimestamp) {
if (xContentArray == null || xContentArray.length == 0) {
return;
}
builder.startObject();
xContent.toXContent(builder, xContentParams);
builder.endObject();
builder.close();
if (conn.getResponseCode() != 201) {
logger.error("Remote target didn't respond with 201 Created");
if (!checkedForIndexTemplate) {
if (!checkForIndexTemplate()) {
logger.debug("no template defined yet. skipping");
return;
}
conn.getInputStream().close(); // close and release to connection pool.
;
}
logger.debug("Exporting {}", type);
HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType());
if (conn == null) {
logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts);
return;
}
try {
OutputStream os = conn.getOutputStream();
// TODO: find a way to disable builder's substream flushing.
for (ToXContent xContent : xContentArray) {
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject().startObject("index")
.field("_index", getIndexName()).field("_type", type).endObject().endObject();
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
builder = XContentFactory.smileBuilder(os);
builder.humanReadable(false);
builder.startObject();
builder.field("@timestamp", defaultDatePrinter.print(collectionTimestamp));
xContent.toXContent(builder, xContentParams);
builder.endObject();
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
}
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn);
} else {
conn.getInputStream().close(); // close and release to connection pool.
}
} catch (IOException e) {
logger.error("Error connecting to target", e);
@ -137,4 +152,99 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
protected void doClose() throws ElasticSearchException {
}
private String getIndexName() {
return indexPrefix + "-" + indexTimeFormatter.print(System.currentTimeMillis());
}
private HttpURLConnection openConnection(String method, String uri) {
return openConnection(method, uri, null);
}
private HttpURLConnection openConnection(String method, String uri, String contentType) {
for (String host : hosts) {
try {
URL templateUrl = new URL("http://" + host + uri);
HttpURLConnection conn = (HttpURLConnection) templateUrl.openConnection();
conn.setRequestMethod(method);
conn.setConnectTimeout(timeout);
if (contentType != null) {
conn.setRequestProperty("Content-Type", XContentType.SMILE.restContentType());
}
conn.setUseCaches(false);
if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) {
conn.setDoOutput(true);
}
conn.connect();
return conn;
} catch (IOException e) {
logger.error("error connecting to [{}]: {}", host, e);
}
}
return null;
}
private boolean checkForIndexTemplate() {
try {
String templateName = "enterprise.monitor." + indexPrefix;
logger.debug("checking of target has template [{}]", templateName);
// DO HEAD REQUEST, when elasticsearch supports it
HttpURLConnection conn = openConnection("GET", "/_template/" + templateName);
if (conn == null) {
logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts);
return false;
}
boolean hasTemplate = conn.getResponseCode() == 200;
// nothing there, lets create it
if (!hasTemplate) {
logger.debug("no template found in elasticsearch for [{}]. Adding...", templateName);
conn = openConnection("PUT", "/_template/" + templateName, XContentType.SMILE.restContentType());
OutputStream os = conn.getOutputStream();
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject();
builder.field("template", indexPrefix + "*");
builder.startObject("mappings").startObject("_default_");
builder.startArray("dynamic_templates").startObject().startObject("string_fields")
.field("match", "*")
.field("match_mapping_type", "string")
.startObject("mapping").field("index", "not_analyzed").endObject()
.endObject().endObject().endArray();
builder.endObject().endObject(); // mapping + root object.
builder.close();
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("error adding index template to elasticsearch", conn);
}
conn.getInputStream().close(); // close and release to connection pool.
}
checkedForIndexTemplate = true;
} catch (IOException e) {
logger.error("Error when checking/adding metrics template to elasticsearch", e);
return false;
}
return true;
}
private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream();
java.util.Scanner s = new java.util.Scanner(inputStream, "UTF-8").useDelimiter("\\A");
String err = s.hasNext() ? s.next() : "";
try {
logger.error("{} response code [{} {}]. content: {}", msg, conn.getResponseCode(), conn.getResponseMessage(), err);
} catch (IOException e) {
logger.error("connection had an error while reporting the error. tough life.");
}
}
}

View File

@ -14,5 +14,5 @@ public interface StatsExporter<T> extends LifecycleComponent<T> {
void exportNodeStats(NodeStats nodeStats);
void exportShardStats(ShardStats shardStats);
void exportShardStats(ShardStats[] shardStatsArray);
}