Make AgentService.stopCollection wait till things are stopped

Closes elastic/elasticsearch#1848

Original commit: elastic/x-pack-elasticsearch@bc1f9b203f
This commit is contained in:
Boaz Leskes 2016-03-30 13:30:48 +02:00
parent c41fc7dc1d
commit 15e9edc2f4
1 changed files with 47 additions and 25 deletions

View File

@ -8,12 +8,14 @@ package org.elasticsearch.marvel.agent;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.MarvelSettings;
import org.elasticsearch.marvel.agent.collector.Collector; import org.elasticsearch.marvel.agent.collector.Collector;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStatsCollector;
@ -28,6 +30,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Locale; import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* The {@code AgentService} is a service that does the work of publishing the details to the monitoring cluster. * The {@code AgentService} is a service that does the work of publishing the details to the monitoring cluster.
@ -100,15 +103,18 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
} }
} }
/** stop collection and exporting. this method blocks until all background activity is guaranteed to be stopped */
public void stopCollection() { public void stopCollection() {
if (exportingWorker != null) { final ExportingWorker worker = this.exportingWorker;
exportingWorker.collecting = false; if (worker != null) {
worker.stopCollecting();
} }
} }
public void startCollection() { public void startCollection() {
if (exportingWorker != null) { final ExportingWorker worker = this.exportingWorker;
exportingWorker.collecting = true; if (worker != null) {
worker.collecting = true;
} }
} }
@ -164,6 +170,8 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
volatile boolean closed = false; volatile boolean closed = false;
volatile boolean collecting = true; volatile boolean collecting = true;
final ReleasableLock collectionLock = new ReleasableLock(new ReentrantLock(false));
@Override @Override
public void run() { public void run() {
while (!closed) { while (!closed) {
@ -175,29 +183,13 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
continue; continue;
} }
if (logger.isTraceEnabled()) { try (Releasable ignore = collectionLock.acquire()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
Collection<MonitoringDoc> docs = new ArrayList<>(); Collection<MonitoringDoc> docs = collect();
for (Collector collector : collectors) {
if (collecting) {
Collection<MonitoringDoc> result = collector.collect();
if (result != null) {
logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name());
docs.addAll(result);
} else {
logger.trace("skipping collected docs from [{}] collector", collector.name());
}
}
if (closed) {
// Stop collecting if the worker is marked as closed
break;
}
}
if ((docs.isEmpty() == false) && (closed == false)) { if ((docs.isEmpty() == false) && (closed == false)) {
exporters.export(docs); exporters.export(docs);
}
} }
} catch (ExportException e) { } catch (ExportException e) {
@ -211,5 +203,35 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
} }
logger.debug("worker shutdown"); logger.debug("worker shutdown");
} }
/** stop collection and exporting. this method will be block until background collection is actually stopped */
public void stopCollecting() {
collecting = false;
collectionLock.acquire().close();
}
private Collection<MonitoringDoc> collect() {
if (logger.isTraceEnabled()) {
logger.trace("collecting data - collectors [{}]", Strings.collectionToCommaDelimitedString(collectors));
}
Collection<MonitoringDoc> docs = new ArrayList<>();
for (Collector collector : collectors) {
if (collecting) {
Collection<MonitoringDoc> result = collector.collect();
if (result != null) {
logger.trace("adding [{}] collected docs from [{}] collector", result.size(), collector.name());
docs.addAll(result);
} else {
logger.trace("skipping collected docs from [{}] collector", collector.name());
}
}
if (closed) {
// Stop collecting if the worker is marked as closed
break;
}
}
return docs;
}
} }
} }