Marvel: Add Recovery collector

closes elastic/elasticsearch-marvelelastic/elasticsearch#452

Original commit: elastic/x-pack-elasticsearch@0cfb9b5c57
This commit is contained in:
Tanguy Leroux 2015-07-30 16:22:33 +02:00
parent 7ec8a7ab27
commit 7f42d04bf2
11 changed files with 523 additions and 1 deletions

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.marvel.agent.collector.node.NodeStatsCollector;
@ -25,6 +26,7 @@ public class CollectorModule extends AbstractModule {
registerCollector(ClusterStatsCollector.class);
registerCollector(ClusterStateCollector.class);
registerCollector(NodeStatsCollector.class);
registerCollector(IndexRecoveryCollector.class);
}
@Override

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.agent.collector.AbstractCollector;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettingsService;
import java.util.Collection;
/**
* Collector for the Recovery API.
* <p/>
* This collector runs on the master node only and collects a {@link IndexRecoveryMarvelDoc} document
* for every index that has on-going shard recoveries.
*/
public class IndexRecoveryCollector extends AbstractCollector<IndexRecoveryCollector> {
public static final String NAME = "index-recovery-collector";
public static final String TYPE = "marvel_index_recovery";
private final Client client;
@Inject
public IndexRecoveryCollector(Settings settings, ClusterService clusterService,
ClusterName clusterName, MarvelSettingsService marvelSettings, Client client) {
super(settings, NAME, clusterService, clusterName, marvelSettings);
this.client = client;
}
@Override
protected boolean masterOnly() {
return true;
}
@Override
protected Collection<MarvelDoc> doCollect() throws Exception {
ImmutableList.Builder<MarvelDoc> results = ImmutableList.builder();
RecoveryResponse recoveryResponse = client.admin().indices().prepareRecoveries()
.setActiveOnly(marvelSettings.recoveryActiveOnly())
.get(marvelSettings.recoveryTimeout());
if (recoveryResponse.hasRecoveries()) {
results.add(buildMarvelDoc(clusterName.value(), TYPE, System.currentTimeMillis(), recoveryResponse));
}
return results.build();
}
protected MarvelDoc buildMarvelDoc(String clusterName, String type, long timestamp, RecoveryResponse recoveryResponse) {
return IndexRecoveryMarvelDoc.createMarvelDoc(clusterName, type, timestamp, recoveryResponse);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
public class IndexRecoveryMarvelDoc extends MarvelDoc<IndexRecoveryMarvelDoc.Payload> {
private final Payload payload;
public IndexRecoveryMarvelDoc(String clusterName, String type, long timestamp, Payload payload) {
super(clusterName, type, timestamp);
this.payload = payload;
}
@Override
public IndexRecoveryMarvelDoc.Payload payload() {
return payload;
}
public static IndexRecoveryMarvelDoc createMarvelDoc(String clusterName, String type, long timestamp,
RecoveryResponse recoveryResponse) {
return new IndexRecoveryMarvelDoc(clusterName, type, timestamp, new Payload(recoveryResponse));
}
public static class Payload {
RecoveryResponse recoveryResponse;
public Payload(RecoveryResponse recoveryResponse) {
this.recoveryResponse = recoveryResponse;
}
public RecoveryResponse getRecoveryResponse() {
return recoveryResponse;
}
}
}

View File

@ -9,10 +9,12 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.marvel.agent.collector.indices.IndexStatsCollector;
import org.elasticsearch.marvel.agent.collector.node.NodeStatsCollector;
import org.elasticsearch.marvel.agent.renderer.cluster.ClusterStateRenderer;
import org.elasticsearch.marvel.agent.renderer.cluster.ClusterStatsRenderer;
import org.elasticsearch.marvel.agent.renderer.indices.IndexRecoveryRenderer;
import org.elasticsearch.marvel.agent.renderer.indices.IndexStatsRenderer;
import org.elasticsearch.marvel.agent.renderer.node.NodeStatsRenderer;
@ -44,6 +46,9 @@ public class RendererModule extends AbstractModule {
bind(NodeStatsRenderer.class).asEagerSingleton();
mbinder.addBinding(NodeStatsCollector.TYPE).to(NodeStatsRenderer.class);
bind(IndexRecoveryRenderer.class).asEagerSingleton();
mbinder.addBinding(IndexRecoveryCollector.TYPE).to(IndexRecoveryRenderer.class);
for (Map.Entry<String, Class<? extends Renderer>> entry : renderers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
mbinder.addBinding(entry.getKey()).to(entry.getValue());

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.renderer.indices;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.renderer.AbstractRenderer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class IndexRecoveryRenderer extends AbstractRenderer<IndexRecoveryMarvelDoc> {
public IndexRecoveryRenderer() {
super(null, false);
}
@Override
protected void doRender(IndexRecoveryMarvelDoc marvelDoc, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.INDEX_RECOVERY);
IndexRecoveryMarvelDoc.Payload payload = marvelDoc.payload();
if (payload != null) {
RecoveryResponse recovery = payload.getRecoveryResponse();
if (recovery != null) {
builder.startArray(Fields.SHARDS);
Map<String, List<ShardRecoveryResponse>> shards = recovery.shardResponses();
if (shards != null) {
for (Map.Entry<String, List<ShardRecoveryResponse>> shard : shards.entrySet()) {
List<ShardRecoveryResponse> indexShards = shard.getValue();
if (indexShards != null) {
for (ShardRecoveryResponse indexShard : indexShards) {
builder.startObject();
builder.field(Fields.INDEX_NAME, shard.getKey());
indexShard.toXContent(builder, params);
builder.endObject();
}
}
}
}
builder.endArray();
}
}
builder.endObject();
}
static final class Fields {
static final XContentBuilderString INDEX_RECOVERY = new XContentBuilderString("index_recovery");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString INDEX_NAME = new XContentBuilderString("index_name");
}
}

View File

@ -35,6 +35,12 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
final TimeValueSetting clusterStateTimeout = MarvelSetting.timeSetting(PREFIX + "cluster.state.timeout", TimeValue.timeValueMinutes(10),
"Timeout value when collecting the cluster state (default to 10m)");
final TimeValueSetting recoveryTimeout = MarvelSetting.timeSetting(PREFIX + "index.recovery.timeout", TimeValue.timeValueMinutes(10),
"Timeout value when collecting the recovery information (default to 10m)");
final MarvelSetting.BooleanSetting recoveryActiveOnly = MarvelSetting.booleanSetting(PREFIX + "index.recovery.active_only", Boolean.FALSE,
"Flag to indicate if only active recoveries should be collected (default to false: all recoveries are collected)");
MarvelSettingsService(Settings clusterSettings) {
super(clusterSettings);
@ -43,6 +49,8 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
builder.add(indexStatsTimeout);
builder.add(indices);
builder.add(clusterStateTimeout);
builder.add(recoveryTimeout);
builder.add(recoveryActiveOnly);
this.settings = builder.build();
logger.trace("initializing marvel settings:");
@ -92,4 +100,12 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
public TimeValue clusterStateTimeout() {
return clusterStateTimeout.getValue();
}
public TimeValue recoveryTimeout() {
return recoveryTimeout.getValue();
}
public boolean recoveryActiveOnly() {
return recoveryActiveOnly.getValue();
}
}

View File

@ -142,6 +142,13 @@
}
}
}
},
"marvel_index_recovery": {
"properties": {
"shards": {
"type": "object"
}
}
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.collector.indices;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettingsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
@Slow
@ElasticsearchIntegrationTest.ClusterScope(numDataNodes = 0)
public class IndexRecoveryCollectorTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return super.nodeSettings(nodeOrdinal);
}
@Test
public void testIndexRecoveryCollector() throws Exception {
final String indexName = "test";
logger.info("--> start first node");
final String node1 = internalCluster().startNode();
ensureYellow();
logger.info("--> collect index recovery data");
Collection<MarvelDoc> results = newIndexRecoveryCollector().doCollect();
logger.info("--> no indices created, expecting 0 marvel documents");
assertNotNull(results);
assertThat(results, is(empty()));
logger.info("--> create index on node: {}", node1);
assertAcked(prepareCreate(indexName, 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0)));
ensureGreen(indexName);
logger.info("--> indexing sample data");
final int numDocs = between(50, 150);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName, "foo").setSource("value", randomInt()).get();
}
flushAndRefresh(indexName);
assertHitCount(client().prepareCount(indexName).get(), numDocs);
ByteSizeValue storeSize = client().admin().indices().prepareStats(indexName).get().getTotal().getStore().getSize();
logger.info("--> start another node with very low recovery settings");
internalCluster().startNode(settingsBuilder()
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, 1)
.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, 1)
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, storeSize.bytes() / 10, ByteSizeUnit.BYTES)
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, storeSize.bytes() / 10, ByteSizeUnit.BYTES)
);
logger.info("--> wait for at least 1 shard relocation");
results = assertBusy(new Callable<Collection<MarvelDoc>>() {
@Override
public Collection<MarvelDoc> call() throws Exception {
RecoveryResponse response = client().admin().indices().prepareRecoveries().setActiveOnly(true).get();
assertTrue(response.hasRecoveries());
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(indexName);
assertFalse(shardResponses.isEmpty());
boolean foundRelocation = false;
for (ShardRecoveryResponse shardResponse : shardResponses) {
if (RecoveryState.Type.RELOCATION.equals(shardResponse.recoveryState().getType())) {
foundRelocation = true;
break;
}
}
assertTrue("found at least one relocation", foundRelocation);
logger.info("--> collect index recovery data");
return newIndexRecoveryCollector().doCollect();
}
});
logger.info("--> we should have at least 1 shard in relocation state");
assertNotNull(results);
assertThat(results, hasSize(1));
MarvelDoc marvelDoc = results.iterator().next();
assertNotNull(marvelDoc);
assertThat(marvelDoc, instanceOf(IndexRecoveryMarvelDoc.class));
IndexRecoveryMarvelDoc indexRecoveryMarvelDoc = (IndexRecoveryMarvelDoc) marvelDoc;
assertThat(indexRecoveryMarvelDoc.clusterName(), equalTo(client().admin().cluster().prepareHealth().get().getClusterName()));
assertThat(indexRecoveryMarvelDoc.timestamp(), greaterThan(0L));
assertThat(indexRecoveryMarvelDoc.type(), equalTo(IndexRecoveryCollector.TYPE));
IndexRecoveryMarvelDoc.Payload payload = indexRecoveryMarvelDoc.payload();
assertNotNull(payload);
RecoveryResponse recovery = payload.getRecoveryResponse();
assertNotNull(recovery);
Map<String, List<ShardRecoveryResponse>> shards = recovery.shardResponses();
assertThat(shards.size(), greaterThan(0));
for (Map.Entry<String, List<ShardRecoveryResponse>> shard : shards.entrySet()) {
List<ShardRecoveryResponse> shardRecoveries = shard.getValue();
assertNotNull(shardRecoveries);
assertThat(shardRecoveries.size(), greaterThan(0));
for (ShardRecoveryResponse shardRecovery : shardRecoveries) {
assertThat(shardRecovery.recoveryState().getType(), equalTo(RecoveryState.Type.RELOCATION));
}
}
}
private IndexRecoveryCollector newIndexRecoveryCollector() {
return new IndexRecoveryCollector(internalCluster().getInstance(Settings.class),
internalCluster().getInstance(ClusterService.class),
internalCluster().getInstance(ClusterName.class),
internalCluster().getInstance(MarvelSettingsService.class),
client());
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.renderer.indices;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererTestUtils;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class IndexRecoveryRendererTests extends ElasticsearchTestCase {
private static final String SAMPLE_FILE = "/samples/marvel_index_recovery.json";
@Test
public void testIndexRecoveryRenderer() throws Exception {
logger.debug("--> creating the index recovery marvel document");
String indexName = "index-0";
DiscoveryNode source = new DiscoveryNode("node-src", DummyTransportAddress.INSTANCE, Version.CURRENT);
DiscoveryNode target = new DiscoveryNode("node-tgt", DummyTransportAddress.INSTANCE, Version.CURRENT);
List<ShardRecoveryResponse> shards = new ArrayList<>();
// Shard 0
ShardRecoveryResponse shard0 = new ShardRecoveryResponse();
shard0.recoveryState(new RecoveryState(new ShardId(indexName, 0), true, RecoveryState.Type.RELOCATION, source, target));
shards.add(shard0);
// Shard 1
ShardRecoveryResponse shard1 = new ShardRecoveryResponse();
shard1.recoveryState(new RecoveryState(new ShardId(indexName, 1), true, RecoveryState.Type.STORE, source, target));
shards.add(shard1);
Map<String, List<ShardRecoveryResponse>> shardResponses = new HashMap<>(1);
shardResponses.put(indexName, shards);
RecoveryResponse recoveryResponse = new RecoveryResponse(2, 2, 2, false, shardResponses, null);
IndexRecoveryMarvelDoc marvelDoc = IndexRecoveryMarvelDoc.createMarvelDoc("test", "marvel_index_recovery", 1437580442979L, recoveryResponse);
logger.debug("--> rendering the document");
Renderer renderer = new IndexRecoveryRenderer();
String result = RendererTestUtils.renderAsJSON(marvelDoc, renderer);
logger.debug("--> loading sample document from file {}", SAMPLE_FILE);
String expected = Streams.copyToStringFromClasspath(SAMPLE_FILE);
logger.debug("--> comparing both documents, they must be identical");
RendererTestUtils.assertJSONStructureAndValues(result, expected);
}
}

View File

@ -24,7 +24,7 @@ public class IndexStatsRendererTests extends ElasticsearchTestCase {
@Test
public void testIndexStatsRenderer() throws Exception {
logger.debug("--> creating the cluster stats marvel document");
logger.debug("--> creating the index stats marvel document");
IndexStatsMarvelDoc marvelDoc = IndexStatsMarvelDoc.createMarvelDoc("test", "marvel_index_stats", 1437580442979L,
new IndexStats("index-0", new ShardStats[0]) {
@Override

View File

@ -0,0 +1,110 @@
{
"cluster_name": "test",
"timestamp": "2015-07-22T15:54:02.979Z",
"index_recovery": {
"shards": [
{
"index_name": "books",
"id": 0,
"type": "STORE",
"stage": "DONE",
"primary": true,
"start_time_in_millis": 1438264984291,
"total_time_in_millis": 111,
"source": {
"id": "hMwZHqa2RJuBWJ4rCaygeA",
"host": "portable",
"transport_address": "inet[/127.0.0.1:9300]",
"ip": "127.0.1.1",
"name": ""
},
"target": {
"id": "hMwZHqa2RJuBWJ4rCaygeA",
"host": "portable",
"transport_address": "inet[/127.0.0.1:9300]",
"ip": "127.0.1.1",
"name": ""
},
"index": {
"size": {
"total_in_bytes": 127,
"reused_in_bytes": 127,
"recovered_in_bytes": 0,
"percent": "100.0%"
},
"files": {
"total": 1,
"reused": 1,
"recovered": 0,
"percent": "100.0%"
},
"total_time_in_millis": 8,
"source_throttle_time_in_millis": 0,
"target_throttle_time_in_millis": 0
},
"translog": {
"recovered": 0,
"total": -1,
"percent": "-1.0%",
"total_on_start": -1,
"total_time_in_millis": 101
},
"verify_index": {
"check_index_time_in_millis": 0,
"total_time_in_millis": 0
}
},
{
"index_name": "dvds",
"id": 0,
"type": "STORE",
"stage": "DONE",
"primary": true,
"start_time_in_millis": 1438264984279,
"total_time_in_millis": 124,
"source": {
"id": "hMwZHqa2RJuBWJ4rCaygeA",
"host": "portable",
"transport_address": "inet[/127.0.0.1:9300]",
"ip": "127.0.1.1",
"name": ""
},
"target": {
"id": "hMwZHqa2RJuBWJ4rCaygeA",
"host": "portable",
"transport_address": "inet[/127.0.0.1:9300]",
"ip": "127.0.1.1",
"name": ""
},
"index": {
"size": {
"total_in_bytes": 127,
"reused_in_bytes": 127,
"recovered_in_bytes": 0,
"percent": "100.0%"
},
"files": {
"total": 1,
"reused": 1,
"recovered": 0,
"percent": "100.0%"
},
"total_time_in_millis": 19,
"source_throttle_time_in_millis": 0,
"target_throttle_time_in_millis": 0
},
"translog": {
"recovered": 0,
"total": -1,
"percent": "-1.0%",
"total_on_start": -1,
"total_time_in_millis": 101
},
"verify_index": {
"check_index_time_in_millis": 0,
"total_time_in_millis": 0
}
}
]
}
}