Merge branch 'master' into shield-kibana-auth

Original commit: elastic/x-pack-elasticsearch@f7a15fb35c
This commit is contained in:
Lukas Olson 2015-12-10 14:33:19 -07:00
commit 0087126649
46 changed files with 1340 additions and 463 deletions

View File

@ -4,15 +4,11 @@
* https://github.com/elastic/x-plugins/issues/724
*/
apply plugin: 'elasticsearch.standalone-test'
apply plugin: 'elasticsearch.messy-test'
dependencies {
testCompile project(path: ':x-plugins:elasticsearch:x-pack', configuration: 'testArtifacts')
testCompile project(path: ':modules:lang-groovy', configuration: 'runtime')
}
// TODO: remove this, its because gradle does not bring in plugin-metadata for lang-groovy
// into the test classpath: if it did, then things will work
test {
systemProperty 'tests.security.manager', 'false'
// some tests depend on both groovy and mustache! this is really bad!
testCompile project(path: ':modules:lang-mustache', configuration: 'runtime')
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.sort.SortOrder;
@ -48,6 +49,7 @@ public class IndexActionIntegrationTests extends AbstractWatcherIntegrationTestC
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = super.pluginTypes();
types.add(GroovyPlugin.class);
types.add(MustachePlugin.class);
return types;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.watcher.test.WatcherTestUtils;
@ -51,6 +52,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = super.pluginTypes();
types.add(GroovyPlugin.class);
types.add(MustachePlugin.class);
return types;
}

View File

@ -0,0 +1,11 @@
/*
* Messy tests that depend on mustache directly. Fix these!
*/
apply plugin: 'elasticsearch.messy-test'
dependencies {
testCompile project(path: ':x-plugins:elasticsearch:x-pack', configuration: 'testArtifacts')
testCompile project(path: ':modules:lang-mustache', configuration: 'runtime')
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
package org.elasticsearch.messy.tests;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
@ -13,12 +13,11 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.Template;
import org.elasticsearch.script.mustache.MustacheScriptEngineService;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.condition.compare.CompareCondition;
@ -35,6 +34,9 @@ import org.elasticsearch.watcher.trigger.schedule.support.MonthTimes;
import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes;
import org.elasticsearch.watcher.watch.WatchStore;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -57,6 +59,14 @@ import static org.hamcrest.Matchers.*;
*/
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.pluginTypes());
types.add(MustachePlugin.class);
return types;
}
public void testIndexWatch() throws Exception {
WatcherClient watcherClient = watcherClient();
createIndex("idx");

View File

@ -3,11 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.actions.email;
package org.elasticsearch.messy.tests;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.actions.email.service.EmailTemplate;
@ -18,6 +20,8 @@ import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.junit.After;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -47,6 +51,14 @@ public class EmailActionIntegrationTests extends AbstractWatcherIntegrationTestC
public void cleanup() throws Exception {
server.stop();
}
@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.pluginTypes());
types.add(MustachePlugin.class);
return types;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {

View File

@ -3,17 +3,23 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.history;
package org.elasticsearch.messy.tests;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -31,6 +37,15 @@ import static org.hamcrest.Matchers.notNullValue;
* not analyzed so they can be used in aggregations
*/
public class HistoryTemplateSearchInputMappingsTests extends AbstractWatcherIntegrationTestCase {
@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.pluginTypes());
types.add(MustachePlugin.class);
return types;
}
@Override
protected boolean timeWarped() {
return true; // just to have better control over the triggers

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.search;
package org.elasticsearch.messy.tests;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -17,7 +17,9 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.script.Template;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
@ -27,6 +29,9 @@ import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.search.SearchInputFactory;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
@ -46,6 +51,7 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -68,6 +74,15 @@ import static org.joda.time.DateTimeZone.UTC;
*/
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchInputTests extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.nodePlugins());
types.add(MustachePlugin.class);
return types;
}
private final static String TEMPLATE_QUERY = "{\"query\":{\"filtered\":{\"query\":{\"match\":{\"event_type\":{\"query\":\"a\"," +
"\"type\":\"boolean\"}}},\"filter\":{\"range\":{\"_timestamp\":" +
"{\"from\":\"{{ctx.trigger.scheduled_time}}||-{{seconds_param}}\",\"to\":\"{{ctx.trigger.scheduled_time}}\"," +

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
package org.elasticsearch.messy.tests;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -23,7 +23,9 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.script.Template;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
@ -39,6 +41,9 @@ import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.text.TextTemplate;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformBuilders;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -55,6 +60,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -88,6 +94,15 @@ import static org.joda.time.DateTimeZone.UTC;
*/
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class SearchTransformTests extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.nodePlugins());
types.add(MustachePlugin.class);
return types;
}
@Override
public Settings nodeSettings(int nodeOrdinal) {
final Path tempDir = createTempDir();

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.integration;
package org.elasticsearch.messy.tests;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
@ -11,8 +11,10 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.cache.query.terms.TermsLookup;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.Template;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.script.mustache.MustacheScriptEngineService;
import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.test.ShieldIntegTestCase;
@ -20,14 +22,25 @@ import org.elasticsearch.test.ShieldSettingsSource;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@ShieldIntegTestCase.AwaitsFix(bugUrl = "clean up test to not use mustache templates, otherwise needs many resources here")
public class ShieldCachePermissionTests extends ShieldIntegTestCase {
static final String READ_ONE_IDX_USER = "read_user";
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.nodePlugins());
types.add(MustachePlugin.class);
return types;
}
@Override
public String configUsers() {

View File

@ -3,11 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.actions;
package org.elasticsearch.messy.tests;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.condition.compare.CompareCondition;
import org.elasticsearch.watcher.execution.ExecutionState;
@ -18,6 +20,8 @@ import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -36,6 +40,15 @@ import static org.hamcrest.Matchers.is;
/**
*/
public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTestCase {
@Override
protected List<Class<? extends Plugin>> pluginTypes() {
List<Class<? extends Plugin>> types = new ArrayList<>();
types.addAll(super.pluginTypes());
types.add(MustachePlugin.class);
return types;
}
private IndexResponse indexTestDoc() {
createIndex("actions", "events");
ensureGreen("actions", "events");

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/**
* This package contains tests that use mustache to test what looks
* to be unrelated functionality, or functionality that should be
* tested with a mock instead. Instead of doing an epic battle
* with these tests, they are temporarily moved here to the mustache
* module's tests, but that is likely not where they belong. Please
* help by cleaning them up and we can remove this package!
*
* <ul>
* <li>If the test is testing templating integration with another core subsystem,
* fix it to use a mock instead, so it can be in the core tests again</li>
* <li>If the test is just being lazy, and does not really need templating to test
* something, clean it up!</li>
* </ul>
*/
// renames that took place:
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/test/integration/BasicWatcherTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/BasicWatcherTests.java
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/actions/email/EmailActionIntegrationTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/EmailActionIntegrationTests.java
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/history/HistoryTemplateSearchInputMappingsTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/HistoryTemplateSearchInputMappingsTests.java
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/SearchInputTests.java
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/SearchTransformTests.java
// renamed: x-pack/shield/src/test/java/org/elasticsearch/integration/ShieldCachePermissionTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/ShieldCachePermissionTests.java
// renamed: x-pack/watcher/src/test/java/org/elasticsearch/watcher/actions/TimeThrottleIntegrationTests.java -> qa/messy-test-xpack-with-mustache/src/test/java/org/elasticsearch/messy/tests/TimeThrottleIntegrationTests.java
package org.elasticsearch.messy.tests;

View File

@ -33,11 +33,12 @@ dependencies {
compile 'com.google.guava:guava:16.0.1' // needed by watcher and shield tests for jimfs
compile 'com.google.code.findbugs:jsr305:3.0.1' // TODO: remove this
compile 'com.sun.mail:javax.mail:1.5.3'
compile 'javax.activation:activation:1.1.1'
// fork of mustache: https://github.com/elastic/x-plugins/issues/1116
compile 'com.github.spullara.mustache.java:compiler:0.9.1' // TODO: remove this
testCompile 'org.subethamail:subethasmtp:3.1.7'
// common test deps
testCompile 'org.elasticsearch:securemock:1.1'
testCompile 'org.elasticsearch:securemock:1.2'
testCompile 'org.slf4j:slf4j-log4j12:1.6.2'
testCompile 'org.slf4j:slf4j-api:1.6.2'

View File

@ -53,9 +53,17 @@ public class IndexStatsCollector extends AbstractCollector<IndexStatsCollector>
List<MarvelDoc> results = new ArrayList<>(1);
try {
IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
.setRefresh(true)
.setIndices(marvelSettings.indices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.clear()
.setDocs(true)
.setFieldData(true)
.setIndexing(true)
.setMerge(true)
.setSearch(true)
.setSegments(true)
.setStore(true)
.setRefresh(true)
.get(marvelSettings.indexStatsTimeout());
long timestamp = System.currentTimeMillis();

View File

@ -54,7 +54,11 @@ public class IndicesStatsCollector extends AbstractCollector<IndicesStatsCollect
IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
.setIndices(marvelSettings.indices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setRefresh(true)
.clear()
.setDocs(true)
.setIndexing(true)
.setSearch(true)
.setStore(true)
.get(marvelSettings.indicesStatsTimeout());
return Collections.singletonList(new IndicesStatsMarvelDoc(clusterUUID(), TYPE, System.currentTimeMillis(), indicesStats));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.marvel.agent.collector.node;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.bootstrap.BootstrapInfo;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -69,7 +70,7 @@ public class NodeStatsCollector extends AbstractCollector<NodeStatsCollector> {
protected Collection<MarvelDoc> doCollect() throws Exception {
List<MarvelDoc> results = new ArrayList<>(1);
NodeStats nodeStats = nodeService.stats();
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.ALL, true, true, true, true, true, false, false, false,false, false);
// Here we are calling directly the DiskThresholdDecider to retrieve the high watermark value
// It would be nicer to use a settings API like documented in #6732

View File

@ -5,10 +5,7 @@
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -26,7 +23,6 @@ public abstract class Exporter {
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final String BULK_TIMEOUT_SETTING = "bulk.timeout";
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2;
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
protected final String type;

View File

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.marvel.support.VersionUtils;
import java.io.ByteArrayOutputStream;
@ -25,7 +26,7 @@ public final class MarvelTemplateUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
public static final String INDEX_TEMPLATE_NAME = ".marvel-es";
public static final String MARVEL_VERSION_FIELD = "marvel_version";
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2;
private MarvelTemplateUtils() {
}
@ -43,10 +44,18 @@ public final class MarvelTemplateUtils {
}
}
public static Version loadDefaultTemplateVersion() {
return parseTemplateVersion(loadDefaultTemplate());
}
public static Version templateVersion(IndexTemplateMetaData templateMetaData) {
String version = templateMetaData.settings().get("index." + MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) {
return Version.fromString(version);
try {
return Version.fromString(version);
} catch (IllegalArgumentException e) {
return null;
}
}
return null;
}
@ -57,10 +66,52 @@ public final class MarvelTemplateUtils {
}
public static Version parseTemplateVersion(byte[] template) {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template);
try {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template);
} catch (IllegalArgumentException e) {
return null;
}
}
public static Version parseTemplateVersion(String template) {
return VersionUtils.parseVersion(MARVEL_VERSION_FIELD, template);
public static boolean installedTemplateVersionIsSufficient(Version installed) {
// null indicates couldn't parse the version from the installed template, this means it is probably too old or invalid...
if (installed == null) {
return false;
}
// ensure the template is not too old
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
// We do not enforce that versions are equivalent to the current version as we may be in a rolling upgrade scenario
// and until a master is elected with the new version, data nodes that have been upgraded will not be able to ship
// data. This means that there is an implication that the new shippers will ship data correctly even with an old template.
// There is also no upper bound and we rely on elasticsearch nodes not being able to connect to each other across major
// versions
return true;
}
public static boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed, ESLogger logger, String exporterName) {
if (installed == null) {
logger.debug("exporter [{}] - currently installed marvel template is missing a version - installing a new one [{}]", exporterName, current);
return true;
}
// Never update a very old template
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
// Always update a template to the last up-to-date version
if (current.after(installed)) {
logger.debug("exporter [{}] - currently installed marvel template version [{}] will be updated to a newer version [{}]", exporterName, installed, current);
return true;
// When the template is up-to-date, do not update
} else if (current.equals(installed)) {
logger.debug("exporter [{}] - currently installed marvel template version [{}] is up-to-date", exporterName, installed);
return false;
// Never update a template that is newer than the expected one
} else {
logger.debug("exporter [{}] - currently installed marvel template version [{}] is newer than the one required [{}]... keeping it.", exporterName, installed, current);
return false;
}
}
}

View File

@ -8,9 +8,11 @@ package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -18,6 +20,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
@ -40,9 +43,10 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.*;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionIsSufficient;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate;
/**
*
@ -88,7 +92,6 @@ public class HttpExporter extends Exporter {
volatile boolean checkedAndUploadedIndexTemplate = false;
volatile boolean supportedClusterVersion = false;
/** Version of the built-in template **/
final Version templateVersion;
@ -126,7 +129,7 @@ public class HttpExporter extends Exporter {
hostnameVerification = config.settings().getAsBoolean(SSL_HOSTNAME_VERIFICATION_SETTING, true);
// Checks that the built-in template is versioned
templateVersion = MarvelTemplateUtils.parseTemplateVersion(MarvelTemplateUtils.loadDefaultTemplate());
templateVersion = MarvelTemplateUtils.loadDefaultTemplateVersion();
if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version");
}
@ -381,6 +384,37 @@ public class HttpExporter extends Exporter {
* @return true if template exists or was uploaded successfully.
*/
private boolean checkAndUploadIndexTemplate(final String host) {
byte[] installedTemplate;
try {
installedTemplate = findMarvelTemplate(host);
} catch (Exception e) {
logger.debug("http exporter [{}] - exception when loading the existing marvel template on host[{}]", e, name(), host);
return false;
}
// if we cannot find a template or a compatible template, we'll install one in / update it.
if (installedTemplate == null) {
logger.debug("http exporter [{}] - could not find existing marvel template, installing a new one", name());
return putTemplate(host);
}
Version installedTemplateVersion = MarvelTemplateUtils.parseTemplateVersion(installedTemplate);
if (installedTemplateVersionMandatesAnUpdate(templateVersion, installedTemplateVersion, logger, name())) {
logger.debug("http exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), templateVersion, installedTemplateVersion);
return putTemplate(host);
} else if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.error("http exporter [{}] - marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
name(), installedTemplateVersion, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts.
return false;
}
return true;
}
private byte[] findMarvelTemplate(String host) throws IOException {
String url = "_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME;
if (templateCheckTimeout != null) {
url += "?timeout=" + templateCheckTimeout;
@ -388,40 +422,28 @@ public class HttpExporter extends Exporter {
HttpURLConnection connection = null;
try {
logger.debug("checking if marvel template exists on the marvel cluster");
logger.debug("http exporter [{}] - checking if marvel template exists on the marvel cluster", name());
connection = openConnection(host, "GET", url, null);
if (connection == null) {
logger.debug("no available connection to check marvel template existence");
return false;
throw new IOException("no available connection to check marvel template existence");
}
byte[] remoteTemplate = null;
// 200 means that the template has been found, 404 otherwise
if (connection.getResponseCode() == 200) {
logger.debug("marvel template found, checking its version");
logger.debug("marvel template found");
byte[] remoteTemplate;
try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
remoteTemplate = out.toByteArray();
}
if ((remoteTemplate == null) || (remoteTemplate.length == 0)) {
logger.error("unable to load remote marvel template on host [{}]", host);
return false;
}
Version remoteVersion = MarvelTemplateUtils.parseTemplateVersion(remoteTemplate);
logger.debug("detected existing remote template in version [{}] on host [{}]", remoteVersion, host);
if ((remoteVersion != null) && (remoteVersion.onOrAfter(MIN_SUPPORTED_TEMPLATE_VERSION))) {
logger.debug("remote template in version [{}] is compatible with the min. supported version [{}]", remoteVersion, MIN_SUPPORTED_TEMPLATE_VERSION);
return true;
}
}
} catch (IOException e) {
logger.error("failed to verify the marvel template to [{}]:\n{}", host, e.getMessage());
return false;
return remoteTemplate;
} catch (Exception e) {
logger.error("http exporter [{}] - failed to verify the marvel template to [{}]:\n{}", name(), host, e.getMessage());
throw e;
} finally {
if (connection != null) {
try {
@ -431,28 +453,30 @@ public class HttpExporter extends Exporter {
}
}
}
}
boolean putTemplate(String host) {
HttpURLConnection connection = null;
try {
connection = openConnection(host, "PUT", url, XContentType.JSON.restContentType());
connection = openConnection(host, "PUT", "_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME, XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("no available connection to update marvel template");
logger.debug("http exporter [{}] - no available connection to update marvel template", name());
return false;
}
logger.debug("loading marvel pre-configured template");
logger.debug("http exporter [{}] - loading marvel pre-configured template", name());
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
// Uploads the template and closes the outputstream
Streams.copy(template, connection.getOutputStream());
if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) {
logConnectionError("error adding the marvel template to [" + host + "]", connection);
return false;
}
logger.info("marvel template updated to version [{}]", templateVersion);
logger.info("http exporter [{}] - marvel template updated to version [{}]", name(), templateVersion);
} catch (IOException e) {
logger.error("failed to update the marvel template to [{}]:\n{}", host, e.getMessage());
logger.error("http exporter [{}] - failed to update the marvel template to [{}]:\n{}", name(), host, e.getMessage());
return false;
} finally {
@ -465,9 +489,111 @@ public class HttpExporter extends Exporter {
}
}
if (config.settings().getAsBoolean("update_mappings", true)) {
updateMappings(host, MarvelSettings.MARVEL_DATA_INDEX_NAME);
updateMappings(host, indexNameResolver().resolve(System.currentTimeMillis()));
}
return true;
}
// TODO: Remove this method once marvel indices are versioned (v 2.2.0)
void updateMappings(String host, String index) {
logger.trace("http exporter [{}] - updating mappings for index [{}]", name(), index);
// Parse the default template to get its mappings
PutIndexTemplateRequest template = new PutIndexTemplateRequest().source(MarvelTemplateUtils.loadDefaultTemplate());
if ((template == null) || (template.mappings() == null) || (template.mappings().isEmpty())) {
return;
}
Set<String> indexMappings = new HashSet<>();
HttpURLConnection connection = null;
try {
connection = openConnection(host, "GET", "/" + index + "/_mapping", XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("http exporter [{}] - no available connection to get index mappings", name());
return;
}
if (connection.getResponseCode() == 404) {
logger.trace("http exporter [{}] - index [{}] does not exist", name(), index);
return;
} else if (connection.getResponseCode() == 200) {
try (InputStream is = connection.getInputStream()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
Map<String, Object> mappings = XContentHelper.convertToMap(new BytesArray(out.toByteArray()), false).v2();
if ((mappings.get(index) != null) && (mappings.get(index) instanceof Map)) {
Map m = (Map) ((Map) mappings.get(index)).get("mappings");
if (m != null) {
indexMappings = m.keySet();
}
}
}
} else {
logConnectionError("http exporter [" + name() +"] - failed to get mappings for index [" + index + "] on host [" + host + "]", connection);
return;
}
} catch (Exception e) {
logger.error("http exporter [{}] - failed to update the marvel template to [{}]:\n{}", name(), host, e.getMessage());
return;
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
// Iterates over document types defined in the default template
for (String type : template.mappings().keySet()) {
if (indexMappings.contains(type)) {
logger.trace("http exporter [{}] - type [{} already exists in mapping of index [{}]", name(), type, index);
continue;
}
logger.trace("http exporter [{}] - adding type [{}] to index [{}] mappings", name(), type, index);
updateMappingForType(host, index, type, template.mappings().get(type));
}
}
void updateMappingForType(String host, String index, String type, String mappingSource) {
logger.trace("http exporter [{}] - updating index [{}] mappings for type [{}] on host [{}]", name(), index, type, host);
HttpURLConnection connection = null;
try {
connection = openConnection(host, "PUT", "/" + index + "/_mapping/" + type, XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("http exporter [{}] - no available connection to update index mapping", name());
return;
}
// Uploads the template and closes the outputstream
Streams.copy(Strings.toUTF8Bytes(mappingSource), connection.getOutputStream());
if (connection.getResponseCode() != 200 && connection.getResponseCode() != 201) {
logConnectionError("http exporter [" + name() +"] - mapping of index [" + index + "] failed to be updated for type [" + type + "] on host [" + host + "]", connection);
return;
}
logger.trace("http exporter [{}] - mapping of index [{}] updated for type [{}]", name(), index, type);
} catch (Exception e) {
logger.error("http exporter [{}] - failed to update mapping of index [{}] for type [{}]", name(), index, type);
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
}
private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream();
String err = "";

View File

@ -7,6 +7,10 @@ package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
@ -15,18 +19,20 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.SecuredClient;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionIsSufficient;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate;
/**
*
@ -42,11 +48,21 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
private volatile LocalBulk bulk;
private volatile boolean active = true;
/** Version of the built-in template **/
private final Version templateVersion;
public LocalExporter(Exporter.Config config, Client client, ClusterService clusterService, RendererRegistry renderers) {
super(TYPE, config);
this.client = client;
this.clusterService = clusterService;
this.renderers = renderers;
// Checks that the built-in template is versioned
templateVersion = MarvelTemplateUtils.loadDefaultTemplateVersion();
if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version");
}
bulk = resolveBulk(clusterService.state(), bulk);
clusterService.add(this);
}
@ -122,7 +138,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
return null;
}
Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.debug("local exporter [{}] - cannot start. the currently installed marvel template (version [{}]) is incompatible with the " +
"current elasticsearch version [{}]. waiting until the template is updated", name(), installedTemplateVersion, Version.CURRENT);
return null;
@ -138,21 +154,21 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
// if we cannot find a template or a compatible template, we'll install one in / update it.
if (installedTemplate == null) {
logger.debug("local exporter [{}] - could not find existing marvel template, installing a new one", name());
putTemplate(config.settings().getAsSettings("template.settings"));
putTemplate();
// we'll get that template on the next cluster state update
return null;
}
Version installedTemplateVersion = MarvelTemplateUtils.templateVersion(installedTemplate);
if (installedTemplateVersionMandatesAnUpdate(Version.CURRENT, installedTemplateVersion)) {
logger.debug("local exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), Version.CURRENT, installedTemplateVersion);
putTemplate(config.settings().getAsSettings("template.settings"));
if (installedTemplateVersionMandatesAnUpdate(templateVersion, installedTemplateVersion, logger, name())) {
logger.debug("local exporter [{}] - installing new marvel template [{}], replacing [{}]", name(), templateVersion, installedTemplateVersion);
putTemplate();
// we'll get that template on the next cluster state update
return null;
} else if (!installedTemplateVersionIsSufficient(Version.CURRENT, installedTemplateVersion)) {
} else if (!installedTemplateVersionIsSufficient(installedTemplateVersion)) {
logger.error("local exporter [{}] - marvel template version [{}] is below the minimum compatible version [{}]. "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
name(), installedTemplateVersion, MIN_SUPPORTED_TEMPLATE_VERSION);
name(), installedTemplateVersion, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
// we're not going to do anything with the template.. it's too old, and the schema might
// be too different than what this version of marvel/es can work with. For this reason we're
// not going to export any data, to avoid mapping conflicts.
@ -163,86 +179,91 @@ public class LocalExporter extends Exporter implements ClusterStateListener {
return currentBulk != null ? currentBulk : new LocalBulk(name(), logger, client, indexNameResolver, renderers);
}
boolean installedTemplateVersionIsSufficient(Version current, Version installed) {
// null indicates couldn't parse the version from the installed template, this means it is probably too old or invalid...
if (installed == null) {
return false;
}
// ensure the template is not too old
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
void putTemplate() {
PutIndexTemplateRequest request = new PutIndexTemplateRequest(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).source(MarvelTemplateUtils.loadDefaultTemplate());
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
// We do not enforce that versions are equivalent to the current version as we may be in a rolling upgrade scenario
// and until a master is elected with the new version, data nodes that have been upgraded will not be able to ship
// data. This means that there is an implication that the new shippers will ship data correctly even with an old template.
// There is also no upper bound and we rely on elasticsearch nodes not being able to connect to each other across major
// versions
return true;
}
// async call, so we won't block cluster event thread
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
if (response.isAcknowledged()) {
logger.trace("local exporter [{}] - successfully installed marvel template", name());
boolean installedTemplateVersionMandatesAnUpdate(Version current, Version installed) {
if (installed == null) {
logger.debug("local exporter [{}] - currently installed marvel template is missing a version - installing a new one [{}]", name(), current);
return true;
}
// Never update a very old template
if (installed.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
return false;
}
// Always update a template to the last up-to-date version
if (current.after(installed)) {
logger.debug("local exporter [{}] - currently installed marvel template version [{}] will be updated to a newer version [{}]", name(), installed, current);
return true;
// When the template is up-to-date, force an update for snapshot versions only
} else if (current.equals(installed)) {
logger.debug("local exporter [{}] - currently installed marvel template version [{}] is up-to-date", name(), installed);
return installed.snapshot() && !current.snapshot();
// Never update a template that is newer than the expected one
} else {
logger.debug("local exporter [{}] - currently installed marvel template version [{}] is newer than the one required [{}]... keeping it.", name(), installed, current);
return false;
}
}
void putTemplate(Settings customSettings) {
try (InputStream is = getClass().getResourceAsStream("/marvel_index_template.json")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
final byte[] template = out.toByteArray();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
.put(customSettings)
// making sure we override any other template that may apply
.put("order", Integer.MAX_VALUE)
.build();
request.settings(updatedSettings);
if (config.settings().getAsBoolean("update_mappings", true)) {
updateMappings(MarvelSettings.MARVEL_DATA_INDEX_NAME);
updateMappings(indexNameResolver().resolve(System.currentTimeMillis()));
}
} else {
logger.error("local exporter [{}] - failed to update marvel index template", name());
}
}
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
@Override
public void onFailure(Throwable throwable) {
logger.error("local exporter [{}] - failed to update marvel index template", throwable, name());
}
});
}
// async call, so we won't block cluster event thread
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
if (response.isAcknowledged()) {
logger.trace("local exporter [{}] - successfully installed marvel template", name());
} else {
logger.error("local exporter [{}] - failed to update marvel index template", name());
// TODO: Remove this method once marvel indices are versioned (v 2.2.0)
void updateMappings(String index) {
logger.trace("local exporter [{}] - updating mappings for index [{}]", name(), index);
// Parse the default template to get its mappings
PutIndexTemplateRequest template = new PutIndexTemplateRequest().source(MarvelTemplateUtils.loadDefaultTemplate());
if ((template == null) || (template.mappings() == null) || (template.mappings().isEmpty())) {
return;
}
// async call, so we won't block cluster event thread
client.admin().indices().getMappings(new GetMappingsRequest().indices(index), new ActionListener<GetMappingsResponse>() {
@Override
public void onResponse(GetMappingsResponse response) {
ImmutableOpenMap<String, MappingMetaData> indexMappings = response.getMappings().get(index);
if (indexMappings != null) {
// Iterates over document types defined in the default template
for (String type : template.mappings().keySet()) {
if (indexMappings.get(type) != null) {
logger.trace("local exporter [{}] - type [{} already exists in mapping of index [{}]", name(), type, index);
continue;
}
logger.trace("local exporter [{}] - adding type [{}] to index [{}] mappings", name(), type, index);
updateMappingForType(index, type, template.mappings().get(type));
}
}
}
@Override
public void onFailure(Throwable throwable) {
logger.error("local exporter [{}] - failed to update marvel index template", throwable, name());
@Override
public void onFailure(Throwable e) {
if (e instanceof IndexNotFoundException) {
logger.trace("local exporter [{}] - index [{}] not found, unable to update mappings", name(), index);
} else {
logger.error("local exporter [{}] - failed to get mappings for index [{}]", name(), index);
}
});
}
});
}
} catch (Exception e) {
throw new IllegalStateException("failed to update marvel index template", e);
}
void updateMappingForType(String index, String type, String mappingSource) {
logger.trace("local exporter [{}] - updating index [{}] mappings for type [{}]", name(), index, type);
client.admin().indices().putMapping(new PutMappingRequest(index).type(type).source(mappingSource), new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
if (response.isAcknowledged()) {
logger.trace("local exporter [{}] - mapping of index [{}] updated for type [{}]", name(), index, type);
} else {
logger.trace("local exporter [{}] - mapping of index [{}] failed to be updated for type [{}]", name(), index, type);
}
}
@Override
public void onFailure(Throwable e) {
logger.error("local exporter [{}] - failed to update mapping of index [{}] for type [{}]", name(), index, type);
}
});
}
public static class Factory extends Exporter.Factory<LocalExporter> {

View File

@ -0,0 +1,261 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoCollector;
import org.elasticsearch.marvel.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.VersionUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
@ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public abstract class AbstractExporterTemplateTestCase extends MarvelIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MarvelSettings.INTERVAL, "-1");
for (Map.Entry<String, String> setting : exporterSettings().getAsMap().entrySet()) {
settings.put("marvel.agent.exporters._exporter." + setting.getKey(), setting.getValue());
}
return settings.build();
}
protected abstract Settings exporterSettings();
protected abstract void deleteTemplate() throws Exception;
protected abstract void putTemplate(String version) throws Exception;
protected abstract void createMarvelIndex(String index) throws Exception;
protected abstract void assertTemplateUpdated(Version version) throws Exception;
protected abstract void assertTemplateNotUpdated(Version version) throws Exception;
protected abstract void assertMappingsUpdated(String... indices) throws Exception;
protected abstract void assertMappingsNotUpdated(String... indices) throws Exception;
protected abstract void assertIndicesNotCreated() throws Exception;
public void testCreateWhenNoExistingTemplate() throws Exception {
internalCluster().startNode();
deleteTemplate();
doExporting();
logger.debug("--> template does not exist: it should have been created in the current version");
assertTemplateUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should be up-to-date");
assertMappingsUpdated(currentIndices());
}
public void testUpdateWhenExistingTemplateHasNoVersion() throws Exception {
internalCluster().startNode();
putTemplate("");
doExporting();
logger.debug("--> existing template does not have a version: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should be up-to-date");
assertMappingsUpdated(currentIndices());
}
public void testUpdateWhenExistingTemplateHasWrongVersion() throws Exception {
internalCluster().startNode();
putTemplate(randomAsciiOfLength(5));
doExporting();
logger.debug("--> existing template has a wrong version: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should be up-to-date");
assertMappingsUpdated(currentIndices());
}
public void testNoUpdateWhenExistingTemplateIsTooOld() throws Exception {
internalCluster().startNode();
putTemplate(VersionUtils.getFirstVersion().number());
doExporting();
logger.debug("--> existing template is too old: it should not be updated");
assertTemplateNotUpdated(VersionUtils.getFirstVersion());
doExporting();
logger.debug("--> existing template is too old: no data is exported");
assertIndicesNotCreated();
}
public void testUpdateWhenExistingTemplateIsOld() throws Exception {
internalCluster().startNode();
putTemplate(VersionUtils.getPreviousVersion(currentVersion()).number());
doExporting();
logger.debug("--> existing template is old but supported: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should be up-to-date");
assertMappingsUpdated(currentIndices());
}
public void testUpdateWhenExistingTemplateIsUpToDate() throws Exception {
internalCluster().startNode();
putTemplate(currentVersion().toString());
doExporting();
logger.debug("--> existing template has the same version: it should not be updated");
assertTemplateNotUpdated(currentVersion());
doExporting();
logger.debug("--> mappings should not have been updated");
assertMappingsNotUpdated(currentIndices());
}
public void testMappingsUpdate() throws Exception {
boolean updateMappings = randomBoolean();
logger.debug("--> update_mappings is {}", updateMappings);
internalCluster().startNode(Settings.builder().put("marvel.agent.exporters._exporter.update_mappings", updateMappings));
logger.debug("--> putting a template with a very old version so that it will not be updated");
putTemplate(VersionUtils.getFirstVersion().toString());
logger.debug("--> creating marvel data index");
createMarvelIndex(MarvelSettings.MARVEL_DATA_INDEX_NAME);
logger.debug("--> creating a cold marvel index");
createMarvelIndex(coldIndex());
logger.debug("--> creating an active marvel index");
createMarvelIndex(hotIndex());
logger.debug("--> all indices have a old mapping now");
assertMappingsNotUpdated(coldIndex(), hotIndex(), MarvelSettings.MARVEL_DATA_INDEX_NAME);
logger.debug("--> updating the template with a previous version, so that it will be updated when exporting documents");
putTemplate(VersionUtils.getPreviousVersion(currentVersion()).number());
doExporting();
logger.debug("--> existing template is old: it should be updated to the current version");
assertTemplateUpdated(currentVersion());
logger.debug("--> cold marvel index: mappings should not have been updated");
assertMappingsNotUpdated(coldIndex());
if (updateMappings) {
logger.debug("--> marvel indices: mappings should be up-to-date");
assertMappingsUpdated(MarvelSettings.MARVEL_DATA_INDEX_NAME, hotIndex());
} else {
logger.debug("--> marvel indices: mappings should bnot have been updated");
assertMappingsNotUpdated(MarvelSettings.MARVEL_DATA_INDEX_NAME, hotIndex());
}
}
protected void doExporting() throws Exception {
List<MarvelDoc> docs = new ArrayList<>();
for (Class<? extends Collector> collectorClass : Arrays.asList(ClusterInfoCollector.class, NodeStatsCollector.class)) {
Collector collector = internalCluster().getInstance(collectorClass);
docs.addAll(collector.collect());
}
exporter().export(docs);
}
private Exporter exporter() {
Exporters exporters = internalCluster().getInstance(Exporters.class);
return exporters.iterator().next();
}
private Version currentVersion() {
return MarvelTemplateUtils.loadDefaultTemplateVersion();
}
private String[] currentIndices() {
return new String[]{hotIndex(), MarvelSettings.MARVEL_DATA_INDEX_NAME};
}
private String coldIndex() {
return exporter().indexNameResolver().resolve(new DateTime(2012, 3, 10, 0, 0, DateTimeZone.UTC).getMillis());
}
private String hotIndex() {
return exporter().indexNameResolver().resolve(System.currentTimeMillis());
}
/** Generates a template that looks like an old one **/
protected static BytesReference generateTemplateSource(String version) throws IOException {
return jsonBuilder().startObject()
.field("template", ".marvel-es-*")
.startObject("settings")
.field("index.number_of_shards", 1)
.field("index.number_of_replicas", 1)
.field("index.mapper.dynamic", false)
.field(MarvelTemplateUtils.MARVEL_VERSION_FIELD, version)
.endObject()
.startObject("mappings")
.startObject("_default_")
.startObject("_all")
.field("enabled", false)
.endObject()
.field("date_detection", false)
.startObject("properties")
.startObject("cluster_uuid")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "date_time")
.endObject()
.endObject()
.endObject()
.startObject("cluster_info")
.field("enabled", false)
.endObject()
.startObject("node_stats")
.startObject("properties")
.startObject("node_stats")
.field("type", "object")
.endObject()
.endObject()
.endObject()
.endObject().bytes();
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.marvel.support.VersionUtils;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.MARVEL_VERSION_FIELD;
public class MarvelTemplateUtilsTests extends ESTestCase {
public void testLoadTemplate() {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0));
}
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template);
Version version = MarvelTemplateUtils.parseTemplateVersion(template);
assertNotNull(version);
}
public void testParseTemplateVersionFromStringTemplate() throws IOException {
List<String> templates = new ArrayList<>();
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}");
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}");
templates.add("{\"marvel_version\": \"1.7.1\"}");
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
templates.add("{\"marvel_version\": \"2.0.0\"}");
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) {
Version version = MarvelTemplateUtils.parseTemplateVersion(Strings.toUTF8Bytes(template));
assertNotNull(version);
}
Version version = MarvelTemplateUtils.parseTemplateVersion(Strings.toUTF8Bytes("{\"marvel.index_format\": \"7\"}"));
assertNull(version);
}
public void testParseVersion() throws IOException {
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
}
public void testTemplateVersionMandatesAnUpdate() {
// Version is unknown
assertTrue(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, null, logger, "unit-test"));
// Version is too old
Version unsupported = org.elasticsearch.test.VersionUtils.getPreviousVersion(MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupported, logger, "unit-test"));
// Version is old but supported
assertTrue(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION, logger, "unit-test"));
// Version is up to date
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, Version.CURRENT, logger, "unit-test"));
// Version is up to date
Version previous = org.elasticsearch.test.VersionUtils.getPreviousVersion(Version.CURRENT);
assertFalse(MarvelTemplateUtils.installedTemplateVersionMandatesAnUpdate(previous, Version.CURRENT, logger, "unit-test"));
}
public void testTemplateVersionIsSufficient() {
// Version is unknown
assertFalse(MarvelTemplateUtils.installedTemplateVersionIsSufficient(null));
// Version is too old
Version unsupported = org.elasticsearch.test.VersionUtils.getPreviousVersion(MarvelTemplateUtils.MIN_SUPPORTED_TEMPLATE_VERSION);
assertFalse(MarvelTemplateUtils.installedTemplateVersionIsSufficient(unsupported));
// Version is OK
assertTrue(MarvelTemplateUtils.installedTemplateVersionIsSufficient(Version.CURRENT));
}
}

View File

@ -0,0 +1,251 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.http;
import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.BindException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
public class HttpExporterTemplateTests extends AbstractExporterTemplateTestCase {
private MockWebServer webServer;
private MockServerDispatcher dispatcher;
@Before
public void startWebServer() throws Exception {
for (int webPort = 9250; webPort < 9300; webPort++) {
try {
webServer = new MockWebServer();
dispatcher = new MockServerDispatcher();
webServer.setDispatcher(dispatcher);
webServer.start(webPort);
return;
} catch (BindException be) {
logger.warn("port [{}] was already in use trying next port", webPort);
}
}
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
}
@After
public void stopWebServer() throws Exception {
webServer.shutdown();
}
@Override
protected Settings exporterSettings() {
return Settings.builder()
.put("type", "http")
.put("host", webServer.getHostName() + ":" + webServer.getPort())
.put("connection.keep_alive", false)
.put(Exporter.INDEX_NAME_TIME_FORMAT_SETTING, "YYYY")
.build();
}
@Override
protected void deleteTemplate() {
dispatcher.setTemplate(null);
}
@Override
protected void putTemplate(String version) throws Exception {
dispatcher.setTemplate(generateTemplateSource(version).toBytes());
}
@Override
protected void createMarvelIndex(String index) throws Exception {
dispatcher.addIndex(index);
}
@Override
protected void assertTemplateUpdated(Version version) {
// Checks that a PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME), is(true));
// Checks that the current template has the expected version
assertThat(MarvelTemplateUtils.parseTemplateVersion(dispatcher.getTemplate()), equalTo(version));
}
@Override
protected void assertTemplateNotUpdated(Version version) throws Exception {
// Checks that no PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/_template/" + MarvelTemplateUtils.INDEX_TEMPLATE_NAME), is(false));
// Checks that the current template has the expected version
assertThat(MarvelTemplateUtils.parseTemplateVersion(dispatcher.getTemplate()), equalTo(version));
}
@Override
protected void assertIndicesNotCreated() throws Exception {
// Checks that no Bulk request has been made
assertThat(dispatcher.hasRequest("POST", "/_bulk"), is(false));
assertThat(dispatcher.mappings.size(), equalTo(0));
}
@Override
protected void assertMappingsUpdated(String... indices) throws Exception {
// Load the mappings of the old template
Set<String> oldMappings = new PutIndexTemplateRequest().source(generateTemplateSource(null)).mappings().keySet();
// Load the mappings of the latest template
Set<String> newMappings = new PutIndexTemplateRequest().source(generateTemplateSource(null)).mappings().keySet();
newMappings.removeAll(oldMappings);
for (String index : indices) {
for (String mapping : newMappings) {
// Checks that a PUT Mapping request has been made for every type that was not in the old template
assertThat(dispatcher.hasRequest("PUT", "/" + index + "/_mapping/" + mapping), equalTo(true));
}
}
}
@Override
protected void assertMappingsNotUpdated(String... indices) throws Exception {
for (String index : indices) {
// Checks that no PUT Template request has been made
assertThat(dispatcher.hasRequest("PUT", "/" + index + "/_mapping/"), is(false));
}
}
class MockServerDispatcher extends Dispatcher {
private final MockResponse OK = newResponse(200, "");
private final MockResponse NOT_FOUND = newResponse(404, "");
private final Set<String> requests = new HashSet<>();
private final Map<String, Set<String>> mappings = new HashMap<>();
private byte[] template;
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
synchronized (this) {
final String requestLine = request.getRequestLine();
requests.add(requestLine);
switch (requestLine) {
// Cluster version
case "GET / HTTP/1.1":
return newResponse(200, "{\"version\": {\"number\": \"" + Version.CURRENT.number() + "\"}}");
// Template
case "GET /_template/.marvel-es HTTP/1.1":
return (template == null) ? NOT_FOUND : newResponse(200, new BytesArray(template).toUtf8());
case "PUT /_template/.marvel-es HTTP/1.1":
this.template = request.getBody().readByteArray();
return OK;
// Bulk
case "POST /_bulk HTTP/1.1":
return OK;
default:
String[] paths = Strings.splitStringToArray(request.getPath(), '/');
// Index Mappings
if ((paths != null) && (paths.length > 0) && ("_mapping".equals(paths[1]))) {
if (!mappings.containsKey(paths[0])) {
// Index does not exist
return NOT_FOUND;
}
// Get index mappings
if ("GET".equals(request.getMethod())) {
try {
// Builds a fake mapping response
XContentBuilder builder = jsonBuilder().startObject().startObject(paths[0]).startObject("mappings");
for (String type : mappings.get(paths[0])) {
builder.startObject(type).endObject();
}
builder.endObject().endObject().endObject();
return newResponse(200, builder.bytes().toUtf8());
} catch (IOException e) {
return newResponse(500, e.getMessage());
}
// Put index mapping
} else if ("PUT".equals(request.getMethod()) && paths.length > 2) {
Set<String> types = mappings.get(paths[0]);
if (types == null) {
types = new HashSet<>();
}
types.add(paths[2]);
return OK;
}
}
break;
}
return newResponse(500, "MockServerDispatcher does not support: " + request.getRequestLine());
}
}
MockResponse newResponse(int code, String body) {
return new MockResponse().setResponseCode(code).setBody(body);
}
void setTemplate(byte[] template) {
synchronized (this) {
this.template = template;
}
}
byte[] getTemplate() {
return template;
}
void addIndex(String index) {
synchronized (this) {
if (template != null) {
// Simulate the use of the index template when creating an index
mappings.put(index, new HashSet<>(new PutIndexTemplateRequest().source(template).mappings().keySet()));
} else {
mappings.put(index, null);
}
}
}
int countRequests(String method, String path) {
int count = 0;
for (String request : requests) {
if (request.startsWith(method + " " + path)) {
count += 1;
}
}
return count;
}
boolean hasRequest(String method, String path) {
return countRequests(method, path) > 0;
}
}
}

View File

@ -13,7 +13,6 @@ import okio.Buffer;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -88,7 +87,8 @@ public class HttpExporterTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false);
.put("marvel.agent.exporters._http.connection.keep_alive", false)
.put("marvel.agent.exporters._http.update_mappings", false);
String agentNode = internalCluster().startNode(builder);
HttpExporter exporter = getExporter(agentNode);
@ -142,56 +142,14 @@ public class HttpExporterTests extends MarvelIntegTestCase {
assertThat(getExporter(nodeName).hosts, Matchers.arrayContaining("test3"));
}
public void testTemplateUpdate() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false);
logger.info("--> starting node");
enqueueGetClusterVersionResponse(Version.CURRENT);
enqueueResponse(404, "marvel template does not exist");
enqueueResponse(201, "marvel template created");
enqueueResponse(200, "successful bulk request ");
String agentNode = internalCluster().startNode(builder);
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
final int nbDocs = randomIntBetween(1, 25);
exporter.export(newRandomMarvelDocs(nbDocs));
assertThat(webServer.getRequestCount(), equalTo(4));
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es"));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es"));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("POST"));
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
assertBulkRequest(recordedRequest.getBody(), nbDocs);
}
public void testHostChangeReChecksTemplate() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false);
.put("marvel.agent.exporters._http.connection.keep_alive", false)
.put("marvel.agent.exporters._http.update_mappings", false);
logger.info("--> starting node");
@ -287,44 +245,6 @@ public class HttpExporterTests extends MarvelIntegTestCase {
}
}
public void testUnsupportedTemplateVersion() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false);
logger.info("--> starting node");
enqueueGetClusterVersionResponse(Version.CURRENT);
// returning a fake template with an unsupported version
Version unsupportedVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
enqueueResponse(200, XContentHelper.toString(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build()));
String agentNode = internalCluster().startNode(builder);
logger.info("--> exporting data");
HttpExporter exporter = getExporter(agentNode);
assertThat(exporter.supportedClusterVersion, is(false));
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
assertThat(exporter.supportedClusterVersion, is(true));
assertThat(webServer.getRequestCount(), equalTo(3));
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/"));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es"));
recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/.marvel-es"));
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
}
public void testUnsupportedClusterVersion() throws Exception {
Settings.Builder builder = Settings.builder()
.put(MarvelSettings.INTERVAL, "-1")
@ -357,7 +277,8 @@ public class HttpExporterTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters._http.type", "http")
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false);
.put("marvel.agent.exporters._http.connection.keep_alive", false)
.put("marvel.agent.exporters._http.update_mappings", false);
String agentNode = internalCluster().startNode(builder);

View File

@ -5,64 +5,16 @@
*/
package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.Version;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.support.VersionUtils;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils.MARVEL_VERSION_FIELD;
import static org.hamcrest.CoreMatchers.equalTo;
public class HttpExporterUtilsTests extends ESTestCase {
public void testLoadTemplate() {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template);
assertThat(template.length, Matchers.greaterThan(0));
}
public void testParseTemplateVersionFromByteArrayTemplate() throws IOException {
byte[] template = MarvelTemplateUtils.loadDefaultTemplate();
assertNotNull(template);
Version version = MarvelTemplateUtils.parseTemplateVersion(template);
assertNotNull(version);
}
public void testParseTemplateVersionFromStringTemplate() throws IOException {
List<String> templates = new ArrayList<>();
templates.add("{\"marvel_version\": \"1.4.0.Beta1\"}");
templates.add("{\"marvel_version\": \"1.6.2-SNAPSHOT\"}");
templates.add("{\"marvel_version\": \"1.7.1\"}");
templates.add("{\"marvel_version\": \"2.0.0-beta1\"}");
templates.add("{\"marvel_version\": \"2.0.0\"}");
templates.add("{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }");
for (String template : templates) {
Version version = MarvelTemplateUtils.parseTemplateVersion(template);
assertNotNull(version);
}
Version version = MarvelTemplateUtils.parseTemplateVersion("{\"marvel.index_format\": \"7\"}");
assertNull(version);
}
public void testParseVersion() throws IOException {
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0-beta1\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"2.0.0\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel_version\": \"1.5.2\"}"));
assertNotNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{ \"template\": \".marvel*\", \"settings\": { \"marvel_version\": \"2.0.0-beta1-SNAPSHOT\", \"index.number_of_shards\": 1 } }"));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD, "{\"marvel.index_format\": \"7\"}"));
assertNull(VersionUtils.parseVersion(MARVEL_VERSION_FIELD + "unkown", "{\"marvel_version\": \"1.5.2\"}"));
}
public void testHostParsing() throws MalformedURLException, URISyntaxException {
URL url = HttpExporterUtils.parseHostWithPath("localhost:9200", "");

View File

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.marvel.agent.exporter.AbstractExporterTemplateTestCase;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
public class LocalExporterTemplateTests extends AbstractExporterTemplateTestCase {
@Override
protected Settings exporterSettings() {
return Settings.builder().put("type", LocalExporter.TYPE).build();
}
@Override
protected Set<String> excludeTemplates() {
// Always delete the template between tests
return Collections.emptySet();
}
@Override
protected void deleteTemplate() throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().prepareDeleteTemplate(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).get());
}
@Override
protected void putTemplate(String version) throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().preparePutTemplate(MarvelTemplateUtils.INDEX_TEMPLATE_NAME).setSource(generateTemplateSource(version)).get());
}
@Override
protected void createMarvelIndex(String index) throws Exception {
waitNoPendingTasksOnAll();
createIndex(index);
}
@Override
protected void assertTemplateUpdated(Version version) throws Exception {
waitNoPendingTasksOnAll();
awaitMarvelTemplateInstalled(version);
}
@Override
protected void assertTemplateNotUpdated(Version version) throws Exception {
waitNoPendingTasksOnAll();
awaitMarvelTemplateInstalled(version);
}
private void assertMappings(byte[] reference, String... indices) throws Exception {
waitNoPendingTasksOnAll();
Map<String, String> mappings = new PutIndexTemplateRequest().source(reference).mappings();
assertBusy(new Runnable() {
@Override
public void run() {
for (String index : indices) {
GetMappingsResponse response = client().admin().indices().prepareGetMappings(index).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get();
ImmutableOpenMap<String, MappingMetaData> indexMappings = response.getMappings().get(index);
assertNotNull(indexMappings);
assertThat(indexMappings.size(), equalTo(mappings.size()));
for (String mapping : mappings.keySet()) {
// We just check that mapping type exists, we don't verify its content
assertThat("mapping type " + mapping + " should exist in index " + index, indexMappings.get(mapping), notNullValue());
}
}
}
});
}
@Override
protected void assertMappingsUpdated(String... indices) throws Exception {
assertMappings(MarvelTemplateUtils.loadDefaultTemplate(), indices);
}
@Override
protected void assertMappingsNotUpdated(String... indices) throws Exception {
assertMappings(generateTemplateSource(null).toBytes(), indices);
}
@Override
protected void assertIndicesNotCreated() throws Exception {
waitNoPendingTasksOnAll();
try {
assertThat(client().admin().indices().prepareExists(MarvelSettings.MARVEL_INDICES_PREFIX + "*").get().isExists(), is(false));
} catch (IndexNotFoundException e) {
// with shield we might get that if wildcards were resolved to no indices
if (!shieldEnabled) {
throw e;
}
}
}
}

View File

@ -10,17 +10,9 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
@ -29,13 +21,11 @@ import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.joda.time.format.DateTimeFormat;
import org.junit.After;
@ -43,14 +33,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.marvel.agent.exporter.Exporter.MIN_SUPPORTED_TEMPLATE_VERSION;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class LocalExporterTests extends MarvelIntegTestCase {
@ -105,105 +92,17 @@ public class LocalExporterTests extends MarvelIntegTestCase {
public void testTemplateCreation() throws Exception {
internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
.put("marvel.agent.exporters._local.template.settings.index.number_of_replicas", 0)
.build());
securedEnsureGreen();
LocalExporter exporter = getLocalExporter("_local");
assertTrue(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, null));
// start collecting
updateMarvelInterval(3L, TimeUnit.SECONDS);
waitForMarvelIndices();
// lets wait until the marvel template will be installed
awaitMarvelTemplateInstalled();
awaitMarvelDocsCount(greaterThan(0L));
assertThat(getCurrentlyInstalledTemplateVersion(), is(Version.CURRENT));
}
public void testTemplateUpdate() throws Exception {
internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE)
.put("marvel.agent.exporters._local.template.settings.index.number_of_replicas", 0)
.build());
securedEnsureGreen();
LocalExporter exporter = getLocalExporter("_local");
Version fakeVersion = MIN_SUPPORTED_TEMPLATE_VERSION;
assertThat(exporter.installedTemplateVersionMandatesAnUpdate(Version.CURRENT, fakeVersion), is(true));
// start collecting
updateMarvelInterval(3L, TimeUnit.SECONDS);
waitForMarvelIndices();
// first, lets wait for the marvel template to be installed
awaitMarvelTemplateInstalled();
// stop collecting before cluster restart
updateMarvelInterval(-1, TimeUnit.SECONDS);
wipeMarvelIndices();
// now lets update the template with an old one and then restart the cluster
exporter.putTemplate(Settings.builder().put(MarvelTemplateUtils.MARVEL_VERSION_FIELD, fakeVersion.toString()).build());
logger.debug("full cluster restart");
final CountDownLatch latch = new CountDownLatch(1);
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public void doAfterNodes(int n, Client client) throws Exception {
latch.countDown();
}
});
if (!latch.await(30, TimeUnit.SECONDS)) {
fail("waited too long (at least 30 seconds) for the cluster to restart");
}
// start collecting again
updateMarvelInterval(3L, TimeUnit.SECONDS);
waitForMarvelIndices();
// now that the cluster is restarting, lets wait for the new template version to be installed
awaitMarvelTemplateInstalled(Version.CURRENT);
}
public void testUnsupportedTemplateVersion() throws Exception {
Exporter.Config config = new Exporter.Config("_name", Settings.EMPTY, Settings.builder()
.put("type", "local").build());
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
boolean master = randomBoolean();
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.masterNode()).thenReturn(master);
when(clusterService.localNode()).thenReturn(localNode);
RendererRegistry renderers = mock(RendererRegistry.class);
LocalExporter exporter = spy(new LocalExporter(config, client, clusterService, renderers));
// creating a cluster state mock that holds unsupported template version
Version unsupportedVersion = randomFrom(Version.V_0_18_0, Version.V_1_0_0, Version.V_1_4_0);
IndexTemplateMetaData template = mock(IndexTemplateMetaData.class);
when(template.settings()).thenReturn(Settings.builder().put("index.marvel_version", unsupportedVersion.toString()).build());
MetaData metaData = mock(MetaData.class);
when(metaData.getTemplates()).thenReturn(ImmutableOpenMap.<String, IndexTemplateMetaData>builder().fPut(MarvelTemplateUtils.INDEX_TEMPLATE_NAME, template).build());
ClusterBlocks blocks = mock(ClusterBlocks.class);
when(blocks.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)).thenReturn(false);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(metaData);
when(clusterState.blocks()).thenReturn(blocks);
when(clusterService.state()).thenReturn(clusterState);
assertThat(exporter.resolveBulk(clusterState, null), nullValue());
verifyZeroInteractions(client);
if (master) {
verify(exporter, times(1)).installedTemplateVersionMandatesAnUpdate(Version.CURRENT, unsupportedVersion);
}
verify(exporter, times(1)).installedTemplateVersionIsSufficient(Version.CURRENT, unsupportedVersion);
}
public void testIndexTimestampFormat() throws Exception {
long time = System.currentTimeMillis();
String timeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
@ -241,40 +140,6 @@ public class LocalExporterTests extends MarvelIntegTestCase {
awaitIndexExists(indexName);
}
public void testInstalledTemplateVersionChecking() throws Exception {
Exporter.Config config = new Exporter.Config("_name", Settings.EMPTY, Settings.builder()
.put("type", "local").build());
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
boolean master = randomBoolean();
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.masterNode()).thenReturn(master);
when(clusterService.localNode()).thenReturn(localNode);
RendererRegistry renderers = mock(RendererRegistry.class);
LocalExporter exporter = new LocalExporter(config, client, clusterService, renderers);
assertTrue("current template version should always be sufficient", exporter.installedTemplateVersionIsSufficient(Version.CURRENT, Version.CURRENT));
Version version = Version.fromId(Version.CURRENT.id + 1000000);
assertTrue("future versions should be considered sufficient in case of a rolling upgrade scenario",
exporter.installedTemplateVersionIsSufficient(Version.CURRENT, version));
// make sure we test at least one snapshot and non-snapshot
String versionStr = "2.0.1";
if (randomBoolean()) {
versionStr += "-SNAPSHOT";
}
Version version1 = Version.fromString(versionStr);
assertTrue("snapshots should not matter", exporter.installedTemplateVersionIsSufficient(version1, version1));
// test the minimum version
assertTrue("minimum template version should always be sufficient", exporter.installedTemplateVersionIsSufficient(Version.CURRENT, Exporter.MIN_SUPPORTED_TEMPLATE_VERSION));
// test a version below the minimum version
assertFalse("version below minimum should not be sufficient", exporter.installedTemplateVersionIsSufficient(Version.CURRENT, Version.V_2_0_0_beta1));
assertFalse("null version should not be sufficient", exporter.installedTemplateVersionIsSufficient(Version.CURRENT, null));
}
public void testLocalExporterFlush() throws Exception {
internalCluster().startNode(Settings.builder()
.put("marvel.agent.exporters._local.type", LocalExporter.TYPE)

View File

@ -39,7 +39,6 @@ public class ClusterStateTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put(MarvelSettings.COLLECTORS, ClusterStateCollector.NAME)
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsNodes;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit;
@ -33,7 +32,6 @@ public class ClusterStatsTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put(MarvelSettings.COLLECTORS, ClusterStatsCollector.NAME)
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -38,7 +38,6 @@ public class IndexRecoveryTests extends MarvelIntegTestCase {
.put(MarvelSettings.INDICES, INDEX_PREFIX + "*")
.put(MarvelSettings.COLLECTORS, IndexRecoveryCollector.NAME)
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -30,7 +30,6 @@ public class IndexStatsTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put(MarvelSettings.COLLECTORS, IndexStatsCollector.NAME)
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -30,7 +30,6 @@ public class IndicesStatsTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put(MarvelSettings.COLLECTORS, IndicesStatsCollector.NAME)
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -31,7 +31,6 @@ public class MultiNodesStatsTests extends MarvelIntegTestCase {
.put(super.nodeSettings(nodeOrdinal))
.put(MarvelSettings.INTERVAL, "-1")
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -31,7 +31,6 @@ public class NodeStatsTests extends MarvelIntegTestCase {
.put(MarvelSettings.INTERVAL, "-1")
.put(MarvelSettings.COLLECTORS, NodeStatsCollector.NAME)
.put("marvel.agent.exporters.default_local.type", LocalExporter.TYPE)
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -45,7 +45,6 @@ public class ShardsTests extends MarvelIntegTestCase {
.put(MarvelSettings.COLLECTORS, ShardsCollector.NAME)
.put(MarvelSettings.INDICES, INDEX_PREFIX + "*")
.put("marvel.agent.exporters.default_local.type", "local")
.put("marvel.agent.exporters.default_local.template.settings.index.number_of_replicas", 0)
.build();
}

View File

@ -39,7 +39,10 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -110,9 +113,7 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
* Override and returns {@code false} to force running without shield
*/
protected boolean enableShield() {
boolean r = randomBoolean();
logger.info("--> shield is{}", r);
return r;
return randomBoolean();
}
protected void stopCollection() {
@ -170,15 +171,15 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
}, 30, TimeUnit.SECONDS);
}
protected void ensureMarvelIndicesGreen() {
protected void ensureMarvelIndicesYellow() {
if (shieldEnabled) {
try {
ensureGreen(".marvel-es-*");
ensureYellow(".marvel-es-*");
} catch (IndexNotFoundException e) {
// might happen with shield...
}
} else {
ensureGreen(".marvel-es-*");
ensureYellow(".marvel-es-*");
}
}
@ -257,7 +258,7 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
ensureMarvelIndicesGreen();
ensureMarvelIndicesYellow();
}
});
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.shield.action.authc.cache.ClearRealmCacheAction;
import org.elasticsearch.shield.action.authc.cache.TransportClearRealmCacheAction;
import org.elasticsearch.shield.audit.AuditTrailModule;
import org.elasticsearch.shield.audit.index.IndexAuditUserHolder;
import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.shield.authc.AuthenticationModule;
import org.elasticsearch.shield.authc.Realms;
import org.elasticsearch.shield.authc.support.SecuredString;
@ -52,8 +53,6 @@ import org.elasticsearch.xpack.XPackPlugin;
import java.nio.file.Path;
import java.util.*;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
*
@ -121,7 +120,16 @@ public class ShieldPlugin extends Plugin {
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (enabled && clientMode == false) {
return Arrays.<Class<? extends LifecycleComponent>>asList(ShieldLicensee.class, InternalCryptoService.class, FileRolesStore.class, Realms.class, IPFilter.class);
List<Class<? extends LifecycleComponent>> list = new ArrayList<>();
if (AuditTrailModule.fileAuditLoggingEnabled(settings)) {
list.add(LoggingAuditTrail.class);
}
list.add(ShieldLicensee.class);
list.add(InternalCryptoService.class);
list.add(FileRolesStore.class);
list.add(Realms.class);
list.add(IPFilter.class);
return list;
}
return Collections.emptyList();
}

View File

@ -70,11 +70,11 @@ public class AuditTrailModule extends AbstractShieldModule.Node {
return settings.getAsBoolean("shield.audit.enabled", false);
}
public static boolean indexAuditLoggingEnabled(Settings settings) {
public static boolean fileAuditLoggingEnabled(Settings settings) {
if (auditingEnabled(settings)) {
String[] outputs = settings.getAsArray("shield.audit.outputs");
String[] outputs = settings.getAsArray("shield.audit.outputs", new String[] { LoggingAuditTrail.NAME });
for (String output : outputs) {
if (output.equals(IndexAuditTrail.NAME)) {
if (output.equals(LoggingAuditTrail.NAME)) {
return true;
}
}

View File

@ -5,6 +5,9 @@
*/
package org.elasticsearch.shield.audit.logfile;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -33,14 +36,15 @@ import static org.elasticsearch.shield.audit.AuditUtil.restRequestContent;
/**
*
*/
public class LoggingAuditTrail implements AuditTrail {
public class LoggingAuditTrail extends AbstractLifecycleComponent<LoggingAuditTrail> implements AuditTrail {
public static final String NAME = "logfile";
private final String prefix;
private final ESLogger logger;
private final Transport transport;
private String prefix;
@Override
public String name() {
return NAME;
@ -48,19 +52,43 @@ public class LoggingAuditTrail implements AuditTrail {
@Inject
public LoggingAuditTrail(Settings settings, Transport transport) {
this(resolvePrefix(settings, transport), transport, Loggers.getLogger(LoggingAuditTrail.class));
this(settings, transport, Loggers.getLogger(LoggingAuditTrail.class));
}
LoggingAuditTrail(Settings settings, Transport transport, ESLogger logger) {
this(resolvePrefix(settings, transport), transport, logger);
this("", settings, transport, logger);
}
LoggingAuditTrail(String prefix, Transport transport, ESLogger logger) {
LoggingAuditTrail(String prefix, Settings settings, Transport transport, ESLogger logger) {
super(settings);
this.logger = logger;
this.prefix = prefix;
this.transport = transport;
}
@Override
protected void doStart() {
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
prefix = resolvePrefix(settings, transport);
} else {
transport.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
prefix = resolvePrefix(settings, transport);
}
});
}
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public void anonymousAccessDenied(String action, TransportMessage<?> message) {
String indices = indicesString(message);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -110,6 +111,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
.put("shield.audit.logfile.prefix.emit_node_name", randomBoolean())
.build();
transport = mock(Transport.class);
when(transport.lifecycleState()).thenReturn(Lifecycle.State.STARTED);
when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE }, DummyTransportAddress.INSTANCE));
prefix = LoggingAuditTrail.resolvePrefix(settings, transport);
}
@ -117,7 +119,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAnonymousAccessDeniedTransport() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);
auditTrail.anonymousAccessDenied("_action", message);
@ -153,7 +155,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.anonymousAccessDenied(request);
switch (level) {
case ERROR:
@ -173,7 +175,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAuthenticationFailed() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);;
auditTrail.authenticationFailed(new MockToken(), "_action", message);
@ -201,7 +203,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAuthenticationFailedNoToken() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);;
auditTrail.authenticationFailed("_action", message);
@ -234,7 +236,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
when(request.uri()).thenReturn("_uri");
String expectedMessage = prepareRestContent(request);
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.authenticationFailed(new MockToken(), request);
switch (level) {
case ERROR:
@ -257,7 +259,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
when(request.uri()).thenReturn("_uri");
String expectedMessage = prepareRestContent(request);
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.authenticationFailed(request);
switch (level) {
case ERROR:
@ -275,7 +277,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAuthenticationFailedRealm() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);;
auditTrail.authenticationFailed("_realm", new MockToken(), "_action", message);
@ -304,7 +306,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
when(request.uri()).thenReturn("_uri");
String expectedMessage = prepareRestContent(request);
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.authenticationFailed("_realm", new MockToken(), request);
switch (level) {
case ERROR:
@ -322,7 +324,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAccessGranted() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);
boolean runAs = randomBoolean();
@ -360,7 +362,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAccessGrantedInternalSystemAction() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);
auditTrail.accessGranted(User.SYSTEM, "internal:_action", message);
@ -384,7 +386,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAccessGrantedInternalSystemActionNonSystemUser() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);
boolean runAs = randomBoolean();
@ -422,7 +424,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testAccessDenied() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = randomBoolean() ? new MockMessage() : new MockIndicesRequest();
String origins = LoggingAuditTrail.originAttributes(message, transport);
boolean runAs = randomBoolean();
@ -461,7 +463,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
String origins = LoggingAuditTrail.originAttributes(message, transport);
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.tamperedRequest(action, message);
switch (level) {
case ERROR:
@ -498,7 +500,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
String userInfo = runAs ? "principal=[running as], run_by_principal=[_username]" : "principal=[_username]";
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
auditTrail.tamperedRequest(user, action, message);
switch (level) {
case ERROR:
@ -524,7 +526,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testConnectionDenied() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = new ShieldIpFilterRule(false, "_all");
auditTrail.connectionDenied(inetAddress, "default", rule);
@ -544,7 +546,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testConnectionGranted() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = IPFilter.DEFAULT_PROFILE_ACCEPT_ALL;
auditTrail.connectionGranted(inetAddress, "default", rule);
@ -566,7 +568,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testRunAsGranted() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = new MockMessage();
String origins = LoggingAuditTrail.originAttributes(message, transport);
User user = new User.Simple("_username", new String[]{"r1"}, new User.Simple("running as", new String[] {"r2"}));
@ -589,7 +591,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
public void testRunAsDenied() throws Exception {
for (Level level : Level.values()) {
CapturingLogger logger = new CapturingLogger(level);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger);
LoggingAuditTrail auditTrail = new LoggingAuditTrail(settings, transport, logger).start();
TransportMessage message = new MockMessage();
String origins = LoggingAuditTrail.originAttributes(message, transport);
User user = new User.Simple("_username", new String[]{"r1"}, new User.Simple("running as", new String[] {"r2"}));

View File

@ -122,6 +122,9 @@ public class ShieldSettingsSource extends ClusterDiscoveryConfiguration.UnicastZ
.put("marvel.enabled", false)
.put("shield.audit.enabled", randomBoolean())
.put("shield.audit.logfile.prefix.emit_node_host_address", randomBoolean())
.put("shield.audit.logfile.prefix.emit_node_host_name", randomBoolean())
.put("shield.audit.logfile.prefix.emit_node_name", randomBoolean())
.put(InternalCryptoService.FILE_SETTING, writeFile(folder, "system_key", systemKey))
.put("shield.authc.realms.esusers.type", ESUsersRealm.TYPE)
.put("shield.authc.realms.esusers.order", 0)

View File

@ -12,10 +12,12 @@ indices:admin/analyze[s]
indices:admin/cache/clear[n]
indices:admin/forcemerge[n]
indices:admin/flush[s]
indices:admin/flush[s][p]
indices:admin/flush[s][r]
indices:admin/mappings/fields/get[index]
indices:admin/mappings/fields/get[index][s]
indices:admin/refresh[s]
indices:admin/refresh[s][p]
indices:admin/refresh[s][r]
indices:admin/upgrade
indices:admin/upgrade[n]
@ -42,8 +44,13 @@ indices:data/read/search[phase/query]
indices:data/read/suggest[s]
indices:data/read/tv[s]
indices:data/write/bulk[s]
indices:data/write/bulk[s][p]
indices:data/write/bulk[s][r]
indices:data/write/delete
indices:data/write/delete[p]
indices:data/write/delete[r]
indices:data/write/index
indices:data/write/index[p]
indices:data/write/index[r]
indices:data/write/update[s]
indices:monitor/recovery[n]

View File

@ -4,4 +4,11 @@ grant {
// needed to set expert SSL options, etc
permission java.lang.RuntimePermission "setFactory";
// needed when sending emails for javax.activation
// otherwise a classnotfound exception is thrown due to trying
// to load the class with the application class loader
permission java.lang.RuntimePermission "setContextClassLoader";
permission java.lang.RuntimePermission "getClassLoader";
};

View File

@ -99,6 +99,7 @@ public class Account {
}
transport.connect(config.smtp.host, config.smtp.port, user, password);
ClassLoader contextClassLoader = null;
try {
MimeMessage message = profile.toMimeMessage(email, session);
String mid = message.getMessageID();
@ -108,6 +109,17 @@ public class Account {
// we need to add it back
message.setHeader(Profile.MESSAGE_ID_HEADER, mid);
}
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
contextClassLoader = AccessController.doPrivileged((PrivilegedAction<ClassLoader>) () -> Thread.currentThread().getContextClassLoader());
// if we cannot get the context class loader, changing does not make sense, as we run into the danger of not being able to change it back
if (contextClassLoader != null) {
setContextClassLoader(this.getClass().getClassLoader());
}
transport.sendMessage(message, message.getAllRecipients());
} finally {
try {
@ -115,10 +127,25 @@ public class Account {
} catch (MessagingException me) {
logger.error("failed to close email transport for account [" + config.name + "]");
}
if (contextClassLoader != null) {
setContextClassLoader(contextClassLoader);
}
}
return email;
}
private void setContextClassLoader(final ClassLoader classLoader) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
Thread.currentThread().setContextClassLoader(classLoader);
return null;
});
}
static class Config {
static final String SMTP_SETTINGS_PREFIX = "mail.smtp.";

View File

@ -183,11 +183,6 @@ public class WatcherXContentParser implements XContentParser {
return parser.numberType();
}
@Override
public boolean estimatedNumberType() {
return parser.estimatedNumberType();
}
@Override
public short shortValue(boolean coerce) throws IOException {
return parser.shortValue(coerce);

View File

@ -1,5 +1,10 @@
---
"Basic array_compare watch":
- skip:
version: " - "
reason: Remove direct dependency on mustache (or at least it should be using xmustache!!!!)
- do:
cluster.health:
wait_for_status: yellow