Enhanced local exporter

- fixed concurrency issues
- started to work on fixing the tests

Original commit: elastic/x-pack-elasticsearch@3cfc4d2cda
This commit is contained in:
uboness 2015-09-24 21:14:54 +02:00
parent a86bb7b140
commit 899f359946
13 changed files with 140 additions and 63 deletions

View File

@ -88,6 +88,18 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
}
}
public void stopCollection() {
if (exportingWorker != null) {
exportingWorker.collecting = false;
}
}
public void startCollection() {
if (exportingWorker != null) {
exportingWorker.collecting = true;
}
}
@Override
protected void doStart() {
for (Collector collector : collectors) {
@ -140,6 +152,7 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
class ExportingWorker implements Runnable {
volatile boolean closed = false;
volatile boolean collecting = true;
@Override
public void run() {
@ -162,9 +175,11 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
try {
for (Collector collector : collectors) {
logger.trace("collecting [{}]", collector.name());
Collection<MarvelDoc> docs = collector.collect();
if (docs != null) {
bulk.add(docs);
if (collecting) {
Collection<MarvelDoc> docs = collector.collect();
if (docs != null) {
bulk.add(docs);
}
}
if (closed) {
// Stop collecting if the worker is marked as closed
@ -172,7 +187,7 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> imple
}
}
} finally {
bulk.close(!closed);
bulk.close(!closed && collecting);
}
} catch (InterruptedException e) {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.agent.exporter.ExportBulk;
import org.elasticsearch.marvel.agent.exporter.IndexNameResolver;
@ -20,12 +21,14 @@ import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class LocalBulk extends ExportBulk {
private final ESLogger logger;
private final Client client;
private final IndexNameResolver indexNameResolver;
private final RendererRegistry renderers;
@ -33,17 +36,24 @@ public class LocalBulk extends ExportBulk {
private BytesStreamOutput buffer = null;
private BulkRequestBuilder requestBuilder;
public LocalBulk(String name, Client client, IndexNameResolver indexNameResolver, RendererRegistry renderers) {
AtomicReference<State> state = new AtomicReference<>();
public LocalBulk(String name, ESLogger logger, Client client, IndexNameResolver indexNameResolver, RendererRegistry renderers) {
super(name);
this.logger = logger;
this.client = client;
this.indexNameResolver = indexNameResolver;
this.renderers = renderers;
state.set(State.ACTIVE);
}
@Override
public ExportBulk add(Collection<MarvelDoc> docs) throws Exception {
public synchronized ExportBulk add(Collection<MarvelDoc> docs) throws Exception {
for (MarvelDoc marvelDoc : docs) {
if (state.get() != State.ACTIVE) {
return this;
}
if (requestBuilder == null) {
requestBuilder = client.prepareBulk();
}
@ -81,13 +91,27 @@ public class LocalBulk extends ExportBulk {
@Override
public void flush() throws IOException {
if (requestBuilder == null) {
if (state.get() != State.ACTIVE || requestBuilder == null) {
return;
}
BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) {
throw new ElasticsearchException(bulkResponse.buildFailureMessage());
}
requestBuilder = null;
}
void terminate() {
state.set(State.TERMINATING);
synchronized (this) {
requestBuilder = null;
state.compareAndSet(State.TERMINATING, State.TERMINATED);
}
}
enum State {
ACTIVE,
TERMINATING,
TERMINATED
}
}

View File

@ -48,6 +48,8 @@ public class LocalExporter extends Exporter {
private final ClusterService clusterService;
private final RendererRegistry renderers;
private final LocalBulk bulk;
final @Nullable TimeValue bulkTimeout;
private final AtomicReference<State> state = new AtomicReference<>();
@ -72,11 +74,25 @@ public class LocalExporter extends Exporter {
bulkTimeout = config.settings().getAsTime(BULK_TIMEOUT_SETTING, null);
state.set(State.STARTING);
bulk = new LocalBulk(name(), logger, client, indexNameResolver, renderers);
}
@Override
public ExportBulk openBulk() {
if (!canExport()) {
return null;
}
return bulk;
}
@Override
public void close() {
if (state.compareAndSet(State.STARTING, State.STOPPING) || state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
bulk.terminate();
} catch (Exception e) {
logger.error("failed to cleanly close open bulk for [{}] exporter", e, name());
}
state.set(State.STOPPED);
}
}
@ -196,7 +212,6 @@ public class LocalExporter extends Exporter {
// return false;
// }
}
//TODO this is erroneous
// the check may figure out that the existing version is too old and therefore
// it can't and won't update the template (prompting the user to delete the template).
@ -213,14 +228,6 @@ public class LocalExporter extends Exporter {
return true;
}
@Override
public ExportBulk openBulk() {
if (!canExport()) {
return null;
}
return new LocalBulk(name(), client, indexNameResolver, renderers);
}
public enum State {
STARTING,
STARTED,

View File

@ -7,6 +7,7 @@ package org.elasticsearch.marvel;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
@ -14,7 +15,7 @@ import org.junit.Ignore;
import java.io.IOException;
@Ignore
@LuceneTestCase.AwaitsFix(bugUrl = "why do we have this rest test in the first place???")
public class MarvelRestIT extends ESRestTestCase {
public MarvelRestIT(@Name("yaml") RestTestCandidate testCandidate) {

View File

@ -96,7 +96,8 @@ public class ClusterStateCollectorTests extends AbstractCollectorTestCase {
}
}
client().admin().indices().prepareRefresh().get();
securedFlush();
securedRefresh();
for (int i = 0; i < nbIndices; i++) {
assertHitCount(client().prepareCount("test-" + i).get(), docsPerIndex[i]);
}

View File

@ -11,12 +11,9 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateCollector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMarvelDoc;
import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryCollector;
@ -24,41 +21,47 @@ import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMarvelDoc;
import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.SuppressLocalMode;
import org.elasticsearch.test.InternalTestCluster;
import org.hamcrest.Matchers;
import org.joda.time.format.DateTimeFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
// Transport Client instantiation also calls the marvel plugin, which then fails to find modules
@ClusterScope(transportClientRatio = 0.0, scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class HttpExporterTests extends ESIntegTestCase {
@SuppressLocalMode
@ClusterScope(scope = TEST, transportClientRatio = 0.0, numDataNodes = 0, numClientNodes = 0)
public class HttpExporterTests extends MarvelIntegTestCase {
final static AtomicLong timeStampGenerator = new AtomicLong();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LicensePlugin.class, MarvelPlugin.class);
protected boolean enableShield() {
return false;
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
@Before
public void init() throws Exception {
startCollection();
}
@After
public void cleanup() throws Exception {
stopCollection();
}
@Override
@ -229,7 +232,7 @@ public class HttpExporterTests extends ESIntegTestCase {
assertMarvelTemplateExists();
logger.debug("--> template exists");
}
}, 10, TimeUnit.SECONDS);
}, 30, TimeUnit.SECONDS);
}
@Test
@ -308,23 +311,6 @@ public class HttpExporterTests extends ESIntegTestCase {
}
}
private void assertMarvelTemplateExists() {
assertTrue("marvel template must exists", isTemplateExists("marvel"));
}
private void assertMarvelTemplateNotExists() {
assertFalse("marvel template must not exists", isTemplateExists("marvel"));
}
private boolean isTemplateExists(String templateName) {
for (IndexTemplateMetaData template : client().admin().indices().prepareGetTemplates(templateName).get().getIndexTemplates()) {
if (template.getName().equals(templateName)) {
return true;
}
}
return false;
}
static class TargetNode {
private final String name;
@ -333,7 +319,7 @@ public class HttpExporterTests extends ESIntegTestCase {
private final Client client;
private TargetNode(InternalTestCluster cluster) {
name = cluster.startNode();
name = cluster.startNode(Settings.builder().put(Node.HTTP_ENABLED, true));
address = cluster.getInstance(HttpServerTransport.class, name).boundAddress().publishAddress();
httpAddress = address.getHost() + ":" + address.getPort();
this.client = cluster.client(name);

View File

@ -12,6 +12,8 @@ import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Map;
@ -19,7 +21,7 @@ import java.util.Map;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = ESIntegTestCase.Scope.SUITE, randomDynamicTemplates = false, transportClientRatio = 0.0)
@ClusterScope(scope = ESIntegTestCase.Scope.TEST, randomDynamicTemplates = false, transportClientRatio = 0.0)
public abstract class AbstractRendererTestCase extends MarvelIntegTestCase {
@Override
@ -33,6 +35,16 @@ public abstract class AbstractRendererTestCase extends MarvelIntegTestCase {
.build();
}
@Before
public void init() throws Exception {
startCollection();
}
@After
public void cleanup() throws Exception {
stopCollection();
}
protected abstract Collection<String> collectors ();
/**

View File

@ -36,6 +36,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
logger.debug("--> searching for marvel documents of type [{}]", ClusterStateCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(ClusterStateCollector.TYPE).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");
@ -63,7 +64,7 @@ public class ClusterStateIT extends AbstractRendererTestCase {
assertAcked(client().admin().indices().preparePutTemplate("marvel").setSource(HttpExporterUtils.loadDefaultTemplate()).execute().actionGet());
logger.debug("--> deleting all marvel indices");
cluster().wipeIndices(MarvelSettings.MARVEL_INDICES_PREFIX + "*");
deleteMarvelIndices();
logger.debug("--> checking for template existence");
assertMarvelTemplateExists();

View File

@ -18,7 +18,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
@Seed("B3CB5D1CDFA878F7:888A4AA279DFFE81")

View File

@ -30,8 +30,10 @@ public class IndexStatsIT extends AbstractRendererTestCase {
public void testIndexStats() throws Exception {
logger.debug("--> creating some indices for future index stats");
final int nbIndices = randomIntBetween(1, 5);
String[] indices = new String[nbIndices];
for (int i = 0; i < nbIndices; i++) {
createIndex("stat" + i);
indices[i] = "stat" + i;
createIndex(indices[i]);
}
final long[] nbDocsPerIndex = new long[nbIndices];
@ -48,10 +50,12 @@ public class IndexStatsIT extends AbstractRendererTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
securedFlush(indices);
securedRefresh();
for (int i = 0; i < nbIndices; i++) {
CountResponse count = client().prepareCount()
.setTypes(IndexStatsCollector.TYPE)
.setQuery(QueryBuilders.termQuery("index_stats.index", "stat" + i))
.setQuery(QueryBuilders.termQuery("index_stats.index", indices[i]))
.get();
assertThat(count.getCount(), greaterThan(0L));
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.marvel.agent.renderer.indices;
import com.carrotsearch.hppc.ObjectLongHashMap;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;

View File

@ -36,6 +36,7 @@ public class NodeStatsIT extends AbstractRendererTestCase {
logger.debug("--> searching for marvel documents of type [{}]", NodeStatsCollector.TYPE);
SearchResponse response = client().prepareSearch().setTypes(NodeStatsCollector.TYPE).get();
assertThat(response.getHits().getTotalHits(), greaterThan(0L));
logger.debug("--> checking that every document contains the expected fields");

View File

@ -13,6 +13,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.license.plugin.LicensePlugin;
import org.elasticsearch.marvel.MarvelPlugin;
import org.elasticsearch.marvel.agent.AgentService;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.plugins.Plugin;
@ -34,7 +35,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -48,7 +48,8 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
if (shieldEnabled == null) {
shieldEnabled = true; // enableShield();
shieldEnabled = enableShield();
logger.info("--> shield {}", shieldEnabled ? "enabled" : "disabled");
}
return super.buildTestCluster(scope, seed);
}
@ -63,7 +64,6 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
if (shieldEnabled) {
ShieldSettings.apply(builder);
}
return builder.build();
}
@ -96,7 +96,19 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
* Override and returns {@code false} to force running without shield
*/
protected boolean enableShield() {
return randomBoolean();
return true; //randomBoolean();
}
protected void stopCollection() {
for (AgentService agent : internalCluster().getInstances(AgentService.class)) {
agent.stopCollection();
}
}
protected void startCollection() {
for (AgentService agent : internalCluster().getInstances(AgentService.class)) {
agent.startCollection();
}
}
protected void deleteMarvelIndices() {
@ -115,13 +127,14 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
}
protected void awaitMarvelDocsCount(Matcher<Long> matcher, String... types) throws Exception {
securedFlush();
securedRefresh();
assertBusy(new Runnable() {
@Override
public void run() {
assertMarvelDocsCount(matcher, types);
}
}, 5, TimeUnit.SECONDS);
});
}
protected void assertMarvelDocsCount(Matcher<Long> matcher, String... types) {
@ -169,6 +182,20 @@ public abstract class MarvelIntegTestCase extends ESIntegTestCase {
}
}
protected void securedFlush(String... indices) {
if (shieldEnabled) {
try {
flush(indices);
} catch (Exception e) {
if (!(e instanceof IndexNotFoundException)) {
throw e;
}
}
} else {
flush(indices);
}
}
/** Shield related settings */
public static class ShieldSettings {