mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-02 17:09:18 +00:00
Marvel: Use mock web server in HttpExporterTests
Original commit: elastic/x-pack-elasticsearch@b69b28af90
This commit is contained in:
parent
0ebc6198ac
commit
2f1c88a633
@ -42,6 +42,13 @@
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp</groupId>
|
||||
<artifactId>mockwebserver</artifactId>
|
||||
<version>2.3.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -54,6 +54,7 @@ public class HttpExporter extends Exporter {
|
||||
public static final String HOST_SETTING = "host";
|
||||
public static final String CONNECTION_TIMEOUT_SETTING = "connection.timeout";
|
||||
public static final String CONNECTION_READ_TIMEOUT_SETTING = "connection.read_timeout";
|
||||
public static final String CONNECTION_KEEP_ALIVE_SETTING = "connection.keep_alive";
|
||||
public static final String AUTH_USERNAME_SETTING = "auth.username";
|
||||
public static final String AUTH_PASSWORD_SETTING = "auth.password";
|
||||
|
||||
@ -91,6 +92,7 @@ public class HttpExporter extends Exporter {
|
||||
/** Version of the built-in template **/
|
||||
final Version templateVersion;
|
||||
|
||||
boolean keepAlive;
|
||||
final ConnectionKeepAliveWorker keepAliveWorker;
|
||||
Thread keepAliveThread;
|
||||
|
||||
@ -117,6 +119,7 @@ public class HttpExporter extends Exporter {
|
||||
String templateCheckTimeoutValue = config.settings().get(TEMPLATE_CHECK_TIMEOUT_SETTING, null);
|
||||
templateCheckTimeout = TimeValue.parseTimeValue(templateCheckTimeoutValue, null, settingFQN(TEMPLATE_CHECK_TIMEOUT_SETTING));
|
||||
|
||||
keepAlive = config.settings().getAsBoolean(CONNECTION_KEEP_ALIVE_SETTING, true);
|
||||
keepAliveWorker = new ConnectionKeepAliveWorker();
|
||||
|
||||
sslSocketFactory = createSSLSocketFactory(config.settings().getAsSettings(SSL_SETTING));
|
||||
@ -511,9 +514,11 @@ public class HttpExporter extends Exporter {
|
||||
}
|
||||
|
||||
protected void initKeepAliveThread() {
|
||||
keepAliveThread = new Thread(keepAliveWorker, "marvel-exporter[" + config.name() + "][keep_alive]");
|
||||
keepAliveThread.setDaemon(true);
|
||||
keepAliveThread.start();
|
||||
if (keepAlive) {
|
||||
keepAliveThread = new Thread(keepAliveWorker, "marvel-exporter[" + config.name() + "][keep_alive]");
|
||||
keepAliveThread.setDaemon(true);
|
||||
keepAliveThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -5,134 +5,114 @@
|
||||
*/
|
||||
package org.elasticsearch.marvel.agent.exporter.http;
|
||||
|
||||
import com.squareup.okhttp.mockwebserver.MockResponse;
|
||||
import com.squareup.okhttp.mockwebserver.MockWebServer;
|
||||
import com.squareup.okhttp.mockwebserver.QueueDispatcher;
|
||||
import com.squareup.okhttp.mockwebserver.RecordedRequest;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
|
||||
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
|
||||
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
|
||||
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
|
||||
import org.elasticsearch.marvel.agent.exporter.Exporters;
|
||||
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
|
||||
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
|
||||
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
|
||||
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.SuppressLocalMode;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.BindException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
|
||||
|
||||
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
|
||||
@SuppressLocalMode
|
||||
@ClusterScope(scope = TEST, transportClientRatio = 0.0, numDataNodes = 0, numClientNodes = 0)
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/729")
|
||||
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
|
||||
public class HttpExporterTests extends MarvelIntegTestCase {
|
||||
|
||||
final static AtomicLong timeStampGenerator = new AtomicLong();
|
||||
|
||||
@Override
|
||||
protected boolean enableShield() {
|
||||
return false;
|
||||
}
|
||||
private int webPort;
|
||||
private MockWebServer webServer;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
startCollection();
|
||||
public void startWebservice() throws Exception {
|
||||
for (webPort = 9250; webPort < 9300; webPort++) {
|
||||
try {
|
||||
webServer = new MockWebServer();
|
||||
QueueDispatcher dispatcher = new QueueDispatcher();
|
||||
dispatcher.setFailFast(true);
|
||||
webServer.setDispatcher(dispatcher);
|
||||
webServer.start(webPort);
|
||||
return;
|
||||
} catch (BindException be) {
|
||||
logger.warn("port [{}] was already in use trying next port", webPort);
|
||||
}
|
||||
}
|
||||
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
stopCollection();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(Node.HTTP_ENABLED, true)
|
||||
.put("shield.enabled", false)
|
||||
.build();
|
||||
webServer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleExport() throws Exception {
|
||||
TargetNode target = TargetNode.start(internalCluster());
|
||||
public void testExport() throws Exception {
|
||||
enqueueGetClusterVersionResponse(Version.CURRENT);
|
||||
enqueueResponse(404, "marvel template does not exist");
|
||||
enqueueResponse(201, "marvel template created");
|
||||
enqueueResponse(200, "successful bulk request ");
|
||||
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.INTERVAL, "-1")
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.put("marvel.agent.exporters._http.host", target.httpAddress);
|
||||
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
|
||||
.put("marvel.agent.exporters._http.connection.keep_alive", false);
|
||||
|
||||
String agentNode = internalCluster().startNode(builder);
|
||||
ensureGreen();
|
||||
HttpExporter exporter = getExporter(agentNode);
|
||||
MarvelDoc doc = newRandomMarvelDoc();
|
||||
exporter.export(Collections.singletonList(doc));
|
||||
|
||||
flush();
|
||||
refresh();
|
||||
assertThat(webServer.getRequestCount(), greaterThanOrEqualTo(4));
|
||||
|
||||
SearchResponse response = client().prepareSearch(".marvel-es-*").setTypes(doc.type()).get();
|
||||
assertThat(response, notNullValue());
|
||||
assertThat(response.getHits().totalHits(), is(1L));
|
||||
}
|
||||
RecordedRequest recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/"));
|
||||
|
||||
@Test
|
||||
public void testTemplateAdditionDespiteOfLateClusterForming() throws Exception {
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
|
||||
TargetNode target = TargetNode.start(internalCluster());
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
|
||||
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put(Node.HTTP_ENABLED, true)
|
||||
.put("discovery.type", "zen")
|
||||
.put("discovery.zen.ping_timeout", "1s")
|
||||
.put("discovery.initial_state_timeout", "100ms")
|
||||
.put("discovery.zen.minimum_master_nodes", 2)
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.put("marvel.agent.exporters._http.host", target.httpAddress)
|
||||
.put("marvel.agent.exporters._http." + HttpExporter.BULK_TIMEOUT_SETTING, "1s")
|
||||
.put("marvel.agent.exporters._http." + HttpExporter.TEMPLATE_CHECK_TIMEOUT_SETTING, "1s");
|
||||
|
||||
String nodeName = internalCluster().startNode(builder);
|
||||
|
||||
HttpExporter exporter = getExporter(nodeName);
|
||||
logger.info("exporting events while there is no cluster");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
logger.info("bringing up a second node");
|
||||
internalCluster().startNode(builder);
|
||||
ensureGreen();
|
||||
logger.info("exporting a second event");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
logger.info("verifying that template has been created");
|
||||
assertMarvelTemplateInstalled();
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("POST"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicHostChange() {
|
||||
|
||||
// disable exporting to be able to use non valid hosts
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.INTERVAL, "-1")
|
||||
@ -159,144 +139,222 @@ public class HttpExporterTests extends MarvelIntegTestCase {
|
||||
@Test
|
||||
public void testHostChangeReChecksTemplate() throws Exception {
|
||||
|
||||
TargetNode targetNode = TargetNode.start(internalCluster());
|
||||
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put(MarvelSettings.INTERVAL, "-1")
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
|
||||
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
|
||||
.put("marvel.agent.exporters._http.connection.keep_alive", false);
|
||||
|
||||
logger.info("--> starting node");
|
||||
|
||||
enqueueGetClusterVersionResponse(Version.CURRENT);
|
||||
enqueueResponse(404, "marvel template does not exist");
|
||||
enqueueResponse(201, "marvel template created");
|
||||
enqueueResponse(200, "successful bulk request ");
|
||||
|
||||
String agentNode = internalCluster().startNode(builder);
|
||||
|
||||
logger.info("--> exporting data");
|
||||
HttpExporter exporter = getExporter(agentNode);
|
||||
|
||||
logger.info("exporting an event");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
logger.info("removing the marvel template");
|
||||
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
|
||||
assertMarvelTemplateMissing();
|
||||
assertThat(webServer.getRequestCount(), greaterThanOrEqualTo(4));
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
|
||||
Settings.builder().putArray("marvel.agent.exporters._http.host", exporter.hosts)).get());
|
||||
RecordedRequest recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/"));
|
||||
|
||||
// a new exporter is created on update, so we need to re-fetch it
|
||||
exporter = getExporter(agentNode);
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
|
||||
logger.info("exporting a second event");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
|
||||
|
||||
logger.info("verifying that template has been created");
|
||||
assertMarvelTemplateInstalled();
|
||||
}
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("POST"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
|
||||
|
||||
@Test
|
||||
public void testHostFailureChecksTemplate() throws Exception {
|
||||
logger.info("--> setting up another web server");
|
||||
MockWebServer secondWebServer = null;
|
||||
int secondWebPort;
|
||||
|
||||
TargetNode target0 = TargetNode.start(internalCluster());
|
||||
assertThat(target0.name, is(internalCluster().getMasterName()));
|
||||
|
||||
TargetNode target1 = TargetNode.start(internalCluster());
|
||||
|
||||
// lets start node0 & node1 first, such that node0 will be the master (it's first to start)
|
||||
final String node0 = internalCluster().startNode(Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.putArray("marvel.agent.exporters._http.host", target0.httpAddress, target1.httpAddress));
|
||||
|
||||
HttpExporter exporter = getExporter(node0);
|
||||
|
||||
logger.info("--> exporting events to have new settings take effect");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
logger.info("verifying that template has been created");
|
||||
assertMarvelTemplateInstalled();
|
||||
|
||||
logger.info("--> removing the marvel template");
|
||||
assertAcked(client().admin().indices().prepareDeleteTemplate("marvel").get());
|
||||
assertMarvelTemplateMissing();
|
||||
|
||||
logger.info("--> shutting down target0");
|
||||
assertThat(target0.name, is(internalCluster().getMasterName())); // just to be sure it's still the master
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
// we use assert busy node because url caching may cause the node failure to be only detected while sending the event
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (secondWebPort = 9250; secondWebPort < 9300; secondWebPort++) {
|
||||
try {
|
||||
logger.info("--> exporting events from node0");
|
||||
getExporter(node0).export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("failed to export event from node0");
|
||||
secondWebServer = new MockWebServer();
|
||||
QueueDispatcher dispatcher = new QueueDispatcher();
|
||||
dispatcher.setFailFast(true);
|
||||
secondWebServer.setDispatcher(dispatcher);
|
||||
secondWebServer.start(secondWebPort);
|
||||
break;
|
||||
} catch (BindException be) {
|
||||
logger.warn("port [{}] was already in use trying next port", secondWebPort);
|
||||
}
|
||||
logger.debug("--> checking for template");
|
||||
assertMarvelTemplateInstalled();
|
||||
logger.debug("--> template exists");
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
assertNotNull("Unable to start the second mock web server", secondWebServer);
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
|
||||
Settings.builder().putArray("marvel.agent.exporters._http.host", secondWebServer.getHostName() + ":" + secondWebServer.getPort())).get());
|
||||
|
||||
// a new exporter is created on update, so we need to re-fetch it
|
||||
exporter = getExporter(agentNode);
|
||||
|
||||
enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT);
|
||||
enqueueResponse(secondWebServer, 404, "marvel template does not exist");
|
||||
enqueueResponse(secondWebServer, 201, "marvel template created");
|
||||
enqueueResponse(secondWebServer, 200, "successful bulk request ");
|
||||
|
||||
logger.info("--> exporting a second event");
|
||||
exporter.export(Collections.singletonList(newRandomMarvelDoc()));
|
||||
|
||||
assertThat(secondWebServer.getRequestCount(), greaterThanOrEqualTo(4));
|
||||
|
||||
recordedRequest = secondWebServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/"));
|
||||
|
||||
recordedRequest = secondWebServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
|
||||
recordedRequest = secondWebServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
|
||||
|
||||
recordedRequest = secondWebServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("POST"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
|
||||
|
||||
} finally {
|
||||
if (secondWebServer != null) {
|
||||
secondWebServer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicIndexFormatChange() throws Exception {
|
||||
|
||||
TargetNode targetNode = TargetNode.start(internalCluster());
|
||||
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put(MarvelSettings.INTERVAL, "-1")
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
|
||||
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
|
||||
.put("marvel.agent.exporters._http.connection.keep_alive", false);
|
||||
|
||||
String agentNode = internalCluster().startNode(builder);
|
||||
|
||||
logger.info("exporting a first event");
|
||||
logger.info("--> exporting a first event");
|
||||
|
||||
enqueueGetClusterVersionResponse(Version.CURRENT);
|
||||
enqueueResponse(404, "marvel template does not exist");
|
||||
enqueueResponse(201, "marvel template created");
|
||||
enqueueResponse(200, "successful bulk request ");
|
||||
|
||||
HttpExporter exporter = getExporter(agentNode);
|
||||
|
||||
MarvelDoc doc = newRandomMarvelDoc();
|
||||
exporter.export(Collections.singletonList(doc));
|
||||
|
||||
assertThat(webServer.getRequestCount(), greaterThanOrEqualTo(4));
|
||||
|
||||
RecordedRequest recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/"));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("POST"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
|
||||
|
||||
String indexName = exporter.indexNameResolver().resolve(doc);
|
||||
logger.info("checks that the index [{}] is created", indexName);
|
||||
assertTrue(client().admin().indices().prepareExists(indexName).get().isExists());
|
||||
logger.info("--> checks that the document in the bulk request is indexed in [{}]", indexName);
|
||||
|
||||
byte[] bytes = recordedRequest.getBody().readByteArray();
|
||||
Map<String, Object> data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2();
|
||||
Map<String, Object> index = (Map<String, Object>) data.get("index");
|
||||
assertThat(index.get("_index"), equalTo(indexName));
|
||||
|
||||
String newTimeFormat = randomFrom("YY", "YYYY", "YYYY.MM", "YYYY-MM", "MM.YYYY", "MM");
|
||||
logger.info("updating index time format setting to {}", newTimeFormat);
|
||||
logger.info("--> updating index time format setting to {}", newTimeFormat);
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put("marvel.agent.exporters._http.index.name.time_format", newTimeFormat)));
|
||||
|
||||
exporter = getExporter(agentNode);
|
||||
|
||||
logger.info("exporting a second event");
|
||||
logger.info("--> exporting a second event");
|
||||
|
||||
enqueueGetClusterVersionResponse(Version.CURRENT);
|
||||
enqueueResponse(404, "marvel template does not exist");
|
||||
enqueueResponse(201, "marvel template created");
|
||||
enqueueResponse(200, "successful bulk request ");
|
||||
|
||||
doc = newRandomMarvelDoc();
|
||||
exporter = getExporter(agentNode);
|
||||
exporter.export(Collections.singletonList(doc));
|
||||
|
||||
String expectedMarvelIndex = MarvelSettings.MARVEL_INDICES_PREFIX
|
||||
+ DateTimeFormat.forPattern(newTimeFormat).withZoneUTC().print(doc.timestamp());
|
||||
|
||||
logger.info("checks that the index [{}] is created", expectedMarvelIndex);
|
||||
assertTrue(client().admin().indices().prepareExists(expectedMarvelIndex).get().isExists());
|
||||
assertThat(webServer.getRequestCount(), greaterThanOrEqualTo(4));
|
||||
|
||||
logger.info("verifying that template has been created");
|
||||
assertMarvelTemplateInstalled();
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/"));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("GET"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("PUT"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_template/marvel"));
|
||||
assertThat(recordedRequest.getBody().readByteArray(), equalTo(MarvelTemplateUtils.loadDefaultTemplate()));
|
||||
|
||||
recordedRequest = webServer.takeRequest();
|
||||
assertThat(recordedRequest.getMethod(), equalTo("POST"));
|
||||
assertThat(recordedRequest.getPath(), equalTo("/_bulk"));
|
||||
|
||||
logger.info("--> checks that the document in the bulk request is indexed in [{}]", expectedMarvelIndex);
|
||||
|
||||
bytes = recordedRequest.getBody().readByteArray();
|
||||
data = XContentHelper.convertToMap(new BytesArray(bytes), false).v2();
|
||||
index = (Map<String, Object>) data.get("index");
|
||||
assertThat(index.get("_index"), equalTo(expectedMarvelIndex));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadRemoteClusterVersion() {
|
||||
|
||||
TargetNode targetNode = TargetNode.start(internalCluster());
|
||||
public void testLoadRemoteClusterVersion() throws IOException {
|
||||
final String host = webServer.getHostName() + ":" + webServer.getPort();
|
||||
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(MarvelSettings.STARTUP_DELAY, "200m")
|
||||
.put(MarvelSettings.INTERVAL, "-1")
|
||||
.put("marvel.agent.exporters._http.type", "http")
|
||||
.put("marvel.agent.exporters._http.host", targetNode.httpAddress);
|
||||
.put("marvel.agent.exporters._http.host", host)
|
||||
.put("marvel.agent.exporters._http.connection.keep_alive", false);
|
||||
|
||||
String agentNode = internalCluster().startNode(builder);
|
||||
|
||||
HttpExporter exporter = getExporter(agentNode);
|
||||
|
||||
logger.info("--> loading remote cluster version");
|
||||
Version resolved = exporter.loadRemoteClusterVersion(targetNode.httpAddress);
|
||||
enqueueGetClusterVersionResponse(Version.CURRENT);
|
||||
Version resolved = exporter.loadRemoteClusterVersion(host);
|
||||
assertTrue(resolved.equals(Version.CURRENT));
|
||||
|
||||
final Version expected = randomFrom(Version.CURRENT, Version.V_0_18_0, Version.V_1_1_0, Version.V_1_2_5, Version.V_1_4_5, Version.V_1_6_0);
|
||||
enqueueGetClusterVersionResponse(expected);
|
||||
resolved = exporter.loadRemoteClusterVersion(host);
|
||||
assertTrue(resolved.equals(expected));
|
||||
}
|
||||
|
||||
private HttpExporter getExporter(String nodeName) {
|
||||
@ -307,29 +365,26 @@ public class HttpExporterTests extends MarvelIntegTestCase {
|
||||
private MarvelDoc newRandomMarvelDoc() {
|
||||
if (randomBoolean()) {
|
||||
return new IndexRecoveryMarvelDoc(internalCluster().getClusterName(),
|
||||
IndexRecoveryCollector.TYPE, timeStampGenerator.incrementAndGet(), new RecoveryResponse());
|
||||
IndexRecoveryCollector.TYPE, System.currentTimeMillis(), new RecoveryResponse());
|
||||
} else {
|
||||
return new ClusterStateMarvelDoc(internalCluster().getClusterName(),
|
||||
ClusterStateCollector.TYPE, timeStampGenerator.incrementAndGet(), ClusterState.PROTO, ClusterHealthStatus.GREEN);
|
||||
ClusterStateCollector.TYPE, System.currentTimeMillis(), ClusterState.PROTO, ClusterHealthStatus.GREEN);
|
||||
}
|
||||
}
|
||||
|
||||
static class TargetNode {
|
||||
private void enqueueGetClusterVersionResponse(Version v) throws IOException {
|
||||
enqueueGetClusterVersionResponse(webServer, v);
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final TransportAddress address;
|
||||
private final String httpAddress;
|
||||
private final Client client;
|
||||
private void enqueueGetClusterVersionResponse(MockWebServer mockWebServer, Version v) throws IOException {
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(jsonBuilder().startObject().startObject("version").field("number", v.number()).endObject().endObject().bytes().toUtf8()));
|
||||
}
|
||||
|
||||
private TargetNode(InternalTestCluster cluster) {
|
||||
name = cluster.startNode(Settings.builder().put(Node.HTTP_ENABLED, true));
|
||||
address = cluster.getInstance(HttpServerTransport.class, name).boundAddress().publishAddress();
|
||||
httpAddress = address.getHost() + ":" + address.getPort();
|
||||
this.client = cluster.client(name);
|
||||
}
|
||||
private void enqueueResponse(int responseCode, String body) throws IOException {
|
||||
enqueueResponse(webServer, responseCode, body);
|
||||
}
|
||||
|
||||
static TargetNode start(InternalTestCluster cluster) {
|
||||
return new TargetNode(cluster);
|
||||
}
|
||||
private void enqueueResponse(MockWebServer mockWebServer, int responseCode, String body) throws IOException {
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(responseCode).setBody(body));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user