Asynchronous export of monitoring data (elastic/x-pack-elasticsearch#718)
This commit removes the blocking invocation of bulk requests in monitoring as in some cases this can lead to exhaustion of the generic threadpool, which effectively prevents the node from operating normally. One behavior change that is made by this commit is the MonitoringService will no longer wait indefinitely when closing as this can lead to a node blocking forever and never shutting down. Instead a wait of 10 seconds is added, which aligns with the security index audit trail's behavior on shutdown. relates elastic/x-pack-elasticsearch#715 Original commit: elastic/x-pack-elasticsearch@5ba7f49aab
This commit is contained in:
parent
58827dd433
commit
72248adcbb
|
@ -113,9 +113,9 @@ public class Monitoring implements ActionPlugin {
|
|||
// TODO: https://github.com/elastic/x-plugins/issues/3117 (remove dynamic need with static exporters)
|
||||
final SSLService dynamicSSLService = sslService.createDynamicSSLService();
|
||||
Map<String, Exporter.Factory> exporterFactories = new HashMap<>();
|
||||
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService));
|
||||
exporterFactories.put(HttpExporter.TYPE, config -> new HttpExporter(config, dynamicSSLService, threadPool.getThreadContext()));
|
||||
exporterFactories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, cleanerService));
|
||||
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService);
|
||||
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, threadPool.getThreadContext());
|
||||
|
||||
Set<Collector> collectors = new HashSet<>();
|
||||
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.monitoring;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -19,12 +20,12 @@ import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
|||
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
|
@ -108,7 +109,7 @@ public class MonitoringService extends AbstractLifecycleComponent {
|
|||
@Override
|
||||
protected void doClose() {
|
||||
logger.debug("monitoring service is closing");
|
||||
closeExecution();
|
||||
monitor.close();
|
||||
|
||||
for (Exporter exporter : exporters) {
|
||||
try {
|
||||
|
@ -139,14 +140,6 @@ public class MonitoringService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
|
||||
void closeExecution() {
|
||||
try {
|
||||
monitor.close();
|
||||
} catch (IOException e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to close monitoring execution"), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link MonitoringExecution} is a scheduled {@link Runnable} that periodically checks if monitoring
|
||||
* data can be collected and exported. It runs at a given interval corresponding to the monitoring
|
||||
|
@ -197,22 +190,21 @@ public class MonitoringService extends AbstractLifecycleComponent {
|
|||
}
|
||||
}
|
||||
if (isMonitoringActive()) {
|
||||
exporters.export(results);
|
||||
exporters.export(results, ActionListener.wrap(r -> semaphore.release(), this::onFailure));
|
||||
} else {
|
||||
semaphore.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("monitoring execution failed", e);
|
||||
semaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
logger.warn("monitoring execution has been rejected", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
semaphore.release();
|
||||
}
|
||||
});
|
||||
|
@ -224,10 +216,13 @@ public class MonitoringService extends AbstractLifecycleComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void close() {
|
||||
try {
|
||||
// Block until the lock can be acquired
|
||||
semaphore.acquire();
|
||||
// Block until the lock can be acquired or 10s. The timed try acquire is necessary as there may be a failure that causes
|
||||
// the semaphore to not get released and then the node will hang forever on shutdown
|
||||
if (semaphore.tryAcquire(10L, TimeUnit.SECONDS) == false) {
|
||||
logger.warn("monitoring execution did not complete after waiting for 10s");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
||||
|
@ -106,22 +105,26 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
|
|||
*/
|
||||
void executeExport(final Collection<MonitoringDoc> docs, final long startTimeNanos,
|
||||
final ActionListener<MonitoringBulkResponse> listener) {
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
exportService.export(docs);
|
||||
listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos)));
|
||||
try {
|
||||
exportService.export(docs, ActionListener.wrap(
|
||||
r -> listener.onResponse(response(startTimeNanos)),
|
||||
e -> listener.onResponse(response(startTimeNanos, e))));
|
||||
} catch (Exception e) {
|
||||
listener.onResponse(response(startTimeNanos, e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onResponse(new MonitoringBulkResponse(buildTookInMillis(startTimeNanos), new MonitoringBulkResponse.Error(e)));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private long buildTookInMillis(long startTimeNanos) {
|
||||
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
|
||||
private MonitoringBulkResponse response(final long start) {
|
||||
return new MonitoringBulkResponse(took(start));
|
||||
}
|
||||
|
||||
private MonitoringBulkResponse response(final long start, final Exception e) {
|
||||
return new MonitoringBulkResponse(took(start), new MonitoringBulkResponse.Error(e));
|
||||
}
|
||||
|
||||
private long took(final long start) {
|
||||
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,21 +5,30 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.monitoring.exporter;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.xpack.common.IteratingActionListener;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* An export bulk holds one of more documents until it got flushed. The {@link ExportBulk#flush()} usually triggers the exporting of the
|
||||
* documents to their final destination.
|
||||
* An export bulk holds one of more documents until it got flushed. The {@link ExportBulk#flush(ActionListener)} usually triggers the
|
||||
* exporting of the documents to their final destination.
|
||||
*/
|
||||
public abstract class ExportBulk {
|
||||
|
||||
protected final String name;
|
||||
protected final ThreadContext threadContext;
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZING);
|
||||
|
||||
public ExportBulk(String name) {
|
||||
public ExportBulk(String name, ThreadContext threadContext) {
|
||||
this.name = Objects.requireNonNull(name);
|
||||
this.threadContext = Objects.requireNonNull(threadContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -45,51 +54,61 @@ public abstract class ExportBulk {
|
|||
/**
|
||||
* Flush the exporting bulk
|
||||
*/
|
||||
public void flush() throws ExportException {
|
||||
public void flush(ActionListener<Void> listener) {
|
||||
if (state.compareAndSet(State.INITIALIZING, State.FLUSHING)) {
|
||||
doFlush();
|
||||
doFlush(listener);
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doFlush();
|
||||
protected abstract void doFlush(ActionListener<Void> listener);
|
||||
|
||||
/**
|
||||
* Close the exporting bulk
|
||||
*/
|
||||
public void close(boolean flush) throws ExportException {
|
||||
public void close(boolean flush, ActionListener<Void> listener) {
|
||||
if (state.getAndSet(State.CLOSED) != State.CLOSED) {
|
||||
|
||||
ExportException exception = null;
|
||||
try {
|
||||
if (flush) {
|
||||
doFlush();
|
||||
}
|
||||
} catch (ExportException e) {
|
||||
if (exception != null) {
|
||||
exception.addSuppressed(e);
|
||||
flushAndClose(listener);
|
||||
} else {
|
||||
exception = e;
|
||||
doClose(listener);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
doClose();
|
||||
} catch (Exception e) {
|
||||
if (exception != null) {
|
||||
exception.addSuppressed(e);
|
||||
} else {
|
||||
exception = new ExportException("Exception when closing export bulk", e);
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
// rethrow exception
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
private void flushAndClose(ActionListener<Void> listener) {
|
||||
doFlush(new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
doClose(listener);
|
||||
}
|
||||
|
||||
protected abstract void doClose() throws ExportException;
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// we need to close in spite of the failure, but we will return the failure
|
||||
doClose(new ActionListener<Void>() {
|
||||
|
||||
private final ExportException exportException = new ExportException("Exception when closing export bulk", e);
|
||||
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
listener.onFailure(exportException);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
exportException.addSuppressed(e);
|
||||
listener.onFailure(exportException);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract void doClose(ActionListener<Void> listener);
|
||||
|
||||
protected boolean isClosed() {
|
||||
return state.get() == State.CLOSED;
|
||||
|
@ -100,10 +119,10 @@ public abstract class ExportBulk {
|
|||
*/
|
||||
public static class Compound extends ExportBulk {
|
||||
|
||||
private final Collection<ExportBulk> bulks;
|
||||
private final List<ExportBulk> bulks;
|
||||
|
||||
public Compound(Collection<ExportBulk> bulks) {
|
||||
super("all");
|
||||
public Compound(List<ExportBulk> bulks, ThreadContext threadContext) {
|
||||
super("all", threadContext);
|
||||
this.bulks = bulks;
|
||||
}
|
||||
|
||||
|
@ -126,41 +145,72 @@ public abstract class ExportBulk {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doFlush() {
|
||||
ExportException exception = null;
|
||||
for (ExportBulk bulk : bulks) {
|
||||
try {
|
||||
bulk.flush();
|
||||
} catch (ExportException e) {
|
||||
if (exception == null) {
|
||||
exception = new ExportException("failed to flush export bulks", e);
|
||||
protected void doFlush(ActionListener<Void> listener) {
|
||||
final SetOnce<ExportException> exceptionRef = new SetOnce<>();
|
||||
final BiConsumer<ExportBulk, ActionListener<Void>> bulkBiConsumer = (exportBulk, iteratingListener) -> {
|
||||
// for every export bulk we flush and pass back the response, which should always be
|
||||
// null. When we have an exception, we wrap the first and then add suppressed exceptions
|
||||
exportBulk.flush(ActionListener.wrap(iteratingListener::onResponse, e -> {
|
||||
if (exceptionRef.get() == null) {
|
||||
exceptionRef.set(new ExportException("failed to flush export bulks", e));
|
||||
} else if (e instanceof ExportException) {
|
||||
exceptionRef.get().addExportException((ExportException) e);
|
||||
} else {
|
||||
exceptionRef.get().addSuppressed(e);
|
||||
}
|
||||
exception.addExportException(e);
|
||||
// this is tricky to understand but basically we suppress the exception for use
|
||||
// later on and call the passed in listener so that iteration continues
|
||||
iteratingListener.onResponse(null);
|
||||
}));
|
||||
};
|
||||
IteratingActionListener<Void, ExportBulk> iteratingActionListener =
|
||||
new IteratingActionListener<>(newExceptionHandlingListener(exceptionRef, listener), bulkBiConsumer, bulks,
|
||||
threadContext);
|
||||
iteratingActionListener.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose(ActionListener<Void> listener) {
|
||||
final SetOnce<ExportException> exceptionRef = new SetOnce<>();
|
||||
final BiConsumer<ExportBulk, ActionListener<Void>> bulkBiConsumer = (exportBulk, iteratingListener) -> {
|
||||
// for every export bulk we close and pass back the response, which should always be
|
||||
// null. When we have an exception, we wrap the first and then add suppressed exceptions
|
||||
exportBulk.doClose(ActionListener.wrap(iteratingListener::onResponse, e -> {
|
||||
if (exceptionRef.get() == null) {
|
||||
exceptionRef.set(new ExportException("failed to close export bulks", e));
|
||||
} else if (e instanceof ExportException) {
|
||||
exceptionRef.get().addExportException((ExportException) e);
|
||||
} else {
|
||||
exceptionRef.get().addSuppressed(e);
|
||||
}
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
// this is tricky to understand but basically we suppress the exception for use
|
||||
// later on and call the passed in listener so that iteration continues
|
||||
iteratingListener.onResponse(null);
|
||||
}));
|
||||
};
|
||||
IteratingActionListener<Void, ExportBulk> iteratingActionListener =
|
||||
new IteratingActionListener<>(newExceptionHandlingListener(exceptionRef, listener), bulkBiConsumer, bulks,
|
||||
threadContext);
|
||||
iteratingActionListener.run();
|
||||
}
|
||||
|
||||
private static ActionListener<Void> newExceptionHandlingListener(SetOnce<ExportException> exceptionRef,
|
||||
ActionListener<Void> listener) {
|
||||
return new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
if (exceptionRef.get() == null) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
listener.onFailure(exceptionRef.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ExportException {
|
||||
ExportException exception = null;
|
||||
for (ExportBulk bulk : bulks) {
|
||||
try {
|
||||
// We can close without flushing since doFlush()
|
||||
// would have been called by the parent class
|
||||
bulk.close(false);
|
||||
} catch (ExportException e) {
|
||||
if (exception == null) {
|
||||
exception = new ExportException("failed to close export bulks");
|
||||
}
|
||||
exception.addExportException(e);
|
||||
}
|
||||
}
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,11 +8,13 @@ package org.elasticsearch.xpack.monitoring.exporter;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter;
|
||||
|
||||
|
@ -23,6 +25,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -32,12 +35,15 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
|
|||
|
||||
private final Map<String, Exporter.Factory> factories;
|
||||
private final AtomicReference<Map<String, Exporter>> exporters;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
public Exporters(Settings settings, Map<String, Exporter.Factory> factories, ClusterService clusterService) {
|
||||
public Exporters(Settings settings, Map<String, Exporter.Factory> factories, ClusterService clusterService,
|
||||
ThreadContext threadContext) {
|
||||
super(settings);
|
||||
|
||||
this.factories = factories;
|
||||
this.exporters = new AtomicReference<>(emptyMap());
|
||||
this.threadContext = Objects.requireNonNull(threadContext);
|
||||
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(MonitoringSettings.EXPORTERS_SETTINGS, this::setExportersSetting);
|
||||
}
|
||||
|
@ -100,7 +106,7 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
|
|||
(Supplier<?>) () -> new ParameterizedMessage("exporter [{}] failed to open exporting bulk", exporter.name()), e);
|
||||
}
|
||||
}
|
||||
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks);
|
||||
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks, threadContext);
|
||||
}
|
||||
|
||||
Map<String, Exporter> initExporters(Settings settings) {
|
||||
|
@ -154,20 +160,32 @@ public class Exporters extends AbstractLifecycleComponent implements Iterable<Ex
|
|||
/**
|
||||
* Exports a collection of monitoring documents using the configured exporters
|
||||
*/
|
||||
public void export(Collection<MonitoringDoc> docs) throws ExportException {
|
||||
public void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) throws ExportException {
|
||||
if (this.lifecycleState() != Lifecycle.State.STARTED) {
|
||||
throw new ExportException("Export service is not started");
|
||||
}
|
||||
if (docs != null && docs.size() > 0) {
|
||||
listener.onFailure(new ExportException("Export service is not started"));
|
||||
} else if (docs != null && docs.size() > 0) {
|
||||
final ExportBulk bulk = openBulk();
|
||||
|
||||
if (bulk != null) {
|
||||
final AtomicReference<ExportException> exceptionRef = new AtomicReference<>();
|
||||
try {
|
||||
bulk.add(docs);
|
||||
} catch (ExportException e) {
|
||||
exceptionRef.set(e);
|
||||
} finally {
|
||||
bulk.close(lifecycleState() == Lifecycle.State.STARTED);
|
||||
bulk.close(lifecycleState() == Lifecycle.State.STARTED, ActionListener.wrap(r -> {
|
||||
if (exceptionRef.get() == null) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
listener.onFailure(exceptionRef.get());
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,12 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -57,8 +59,8 @@ class HttpExportBulk extends ExportBulk {
|
|||
private byte[] payload = null;
|
||||
|
||||
HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
|
||||
final ResolversRegistry registry) {
|
||||
super(name);
|
||||
final ResolversRegistry registry, ThreadContext threadContext) {
|
||||
super(name, threadContext);
|
||||
|
||||
this.client = client;
|
||||
this.params = parameters;
|
||||
|
@ -85,9 +87,9 @@ class HttpExportBulk extends ExportBulk {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void doFlush() throws ExportException {
|
||||
public void doFlush(ActionListener<Void> listener) throws ExportException {
|
||||
if (payload == null) {
|
||||
throw new ExportException("unable to send documents because none were loaded for export bulk [{}]", name);
|
||||
listener.onFailure(new ExportException("unable to send documents because none were loaded for export bulk [{}]", name));
|
||||
} else if (payload.length != 0) {
|
||||
final HttpEntity body = new ByteArrayEntity(payload, ContentType.APPLICATION_JSON);
|
||||
|
||||
|
@ -95,13 +97,15 @@ class HttpExportBulk extends ExportBulk {
|
|||
|
||||
// free the memory
|
||||
payload = null;
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
protected void doClose(ActionListener<Void> listener) {
|
||||
// nothing serious to do at this stage
|
||||
assert payload == null;
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
private byte[] toBulkBytes(final MonitoringDoc doc) throws IOException {
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
|
||||
|
@ -157,6 +158,7 @@ public class HttpExporter extends Exporter {
|
|||
private final HttpResource resource;
|
||||
|
||||
private final ResolversRegistry resolvers;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
/**
|
||||
* Create an {@link HttpExporter}.
|
||||
|
@ -165,8 +167,8 @@ public class HttpExporter extends Exporter {
|
|||
* @param sslService The SSL Service used to create the SSL Context necessary for TLS / SSL communication
|
||||
* @throws SettingsException if any setting is malformed
|
||||
*/
|
||||
public HttpExporter(final Config config, final SSLService sslService) {
|
||||
this(config, sslService, new NodeFailureListener());
|
||||
public HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext) {
|
||||
this(config, sslService, threadContext, new NodeFailureListener());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -177,8 +179,8 @@ public class HttpExporter extends Exporter {
|
|||
* @param listener The node failure listener used to notify an optional sniffer and resources
|
||||
* @throws SettingsException if any setting is malformed
|
||||
*/
|
||||
HttpExporter(final Config config, final SSLService sslService, final NodeFailureListener listener) {
|
||||
this(config, createRestClient(config, sslService, listener), listener);
|
||||
HttpExporter(final Config config, final SSLService sslService, final ThreadContext threadContext, final NodeFailureListener listener) {
|
||||
this(config, createRestClient(config, sslService, listener), threadContext, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -189,8 +191,8 @@ public class HttpExporter extends Exporter {
|
|||
* @param listener The node failure listener used to notify an optional sniffer and resources
|
||||
* @throws SettingsException if any setting is malformed
|
||||
*/
|
||||
HttpExporter(final Config config, final RestClient client, final NodeFailureListener listener) {
|
||||
this(config, client, createSniffer(config, client, listener), listener, new ResolversRegistry(config.settings()));
|
||||
HttpExporter(final Config config, final RestClient client, final ThreadContext threadContext, final NodeFailureListener listener) {
|
||||
this(config, client, createSniffer(config, client, listener), threadContext, listener, new ResolversRegistry(config.settings()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,9 +204,9 @@ public class HttpExporter extends Exporter {
|
|||
* @param resolvers The resolver registry used to load templates and resolvers
|
||||
* @throws SettingsException if any setting is malformed
|
||||
*/
|
||||
HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final NodeFailureListener listener,
|
||||
final ResolversRegistry resolvers) {
|
||||
this(config, client, sniffer, listener, resolvers, createResources(config, resolvers));
|
||||
HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final ThreadContext threadContext,
|
||||
final NodeFailureListener listener, final ResolversRegistry resolvers) {
|
||||
this(config, client, sniffer, threadContext, listener, resolvers, createResources(config, resolvers));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -218,8 +220,8 @@ public class HttpExporter extends Exporter {
|
|||
* @param resource Blocking HTTP resource to prevent bulks until all requirements are met
|
||||
* @throws SettingsException if any setting is malformed
|
||||
*/
|
||||
HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final NodeFailureListener listener,
|
||||
final ResolversRegistry resolvers, final HttpResource resource) {
|
||||
HttpExporter(final Config config, final RestClient client, @Nullable final Sniffer sniffer, final ThreadContext threadContext,
|
||||
final NodeFailureListener listener, final ResolversRegistry resolvers, final HttpResource resource) {
|
||||
super(config);
|
||||
|
||||
this.client = Objects.requireNonNull(client);
|
||||
|
@ -227,6 +229,7 @@ public class HttpExporter extends Exporter {
|
|||
this.resolvers = resolvers;
|
||||
this.resource = resource;
|
||||
this.defaultParams = createDefaultParams(config);
|
||||
this.threadContext = threadContext;
|
||||
|
||||
// mark resources as dirty after any node failure
|
||||
listener.setResource(resource);
|
||||
|
@ -565,7 +568,7 @@ public class HttpExporter extends Exporter {
|
|||
public HttpExportBulk openBulk() {
|
||||
// block until all resources are verified to exist
|
||||
if (resource.checkAndPublishIfDirty(client)) {
|
||||
return new HttpExportBulk(settingFQN(config), client, defaultParams, resolvers);
|
||||
return new HttpExportBulk(settingFQN(config), client, defaultParams, resolvers, threadContext);
|
||||
}
|
||||
|
||||
return null;
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.monitoring.exporter.local;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
|
@ -25,7 +26,8 @@ import static org.elasticsearch.xpack.monitoring.exporter.Exporter.EXPORT_PIPELI
|
|||
|
||||
/**
|
||||
* LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the
|
||||
* {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#doClose()} methods are not synchronized.
|
||||
* {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush(org.elasticsearch.action.ActionListener)} and
|
||||
* {@link LocalBulk#doClose(ActionListener)} methods are not synchronized.
|
||||
*/
|
||||
public class LocalBulk extends ExportBulk {
|
||||
|
||||
|
@ -38,7 +40,7 @@ public class LocalBulk extends ExportBulk {
|
|||
|
||||
|
||||
public LocalBulk(String name, Logger logger, InternalClient client, ResolversRegistry resolvers, boolean usePipeline) {
|
||||
super(name);
|
||||
super(name, client.threadPool().getThreadContext());
|
||||
this.logger = logger;
|
||||
this.client = client;
|
||||
this.resolvers = resolvers;
|
||||
|
@ -87,25 +89,26 @@ public class LocalBulk extends ExportBulk {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void doFlush() throws ExportException {
|
||||
public void doFlush(ActionListener<Void> listener) {
|
||||
if (requestBuilder == null || requestBuilder.numberOfActions() == 0 || isClosed()) {
|
||||
return;
|
||||
}
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
try {
|
||||
logger.trace("exporter [{}] - exporting {} documents", name, requestBuilder.numberOfActions());
|
||||
BulkResponse bulkResponse = requestBuilder.get();
|
||||
|
||||
requestBuilder.execute(ActionListener.wrap(bulkResponse -> {
|
||||
if (bulkResponse.hasFailures()) {
|
||||
throwExportException(bulkResponse.getItems());
|
||||
throwExportException(bulkResponse.getItems(), listener);
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ExportException("failed to flush export bulk [{}]", e, name);
|
||||
}, e -> listener.onFailure(new ExportException("failed to flush export bulk [{}]", e, name))));
|
||||
} finally {
|
||||
requestBuilder = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void throwExportException(BulkItemResponse[] bulkItemResponses) {
|
||||
void throwExportException(BulkItemResponse[] bulkItemResponses, ActionListener<Void> listener) {
|
||||
ExportException exception = new ExportException("bulk [{}] reports failures when exporting documents", name);
|
||||
|
||||
Arrays.stream(bulkItemResponses)
|
||||
|
@ -114,14 +117,17 @@ public class LocalBulk extends ExportBulk {
|
|||
.forEach(exception::addExportException);
|
||||
|
||||
if (exception.hasExportExceptions()) {
|
||||
throw exception;
|
||||
listener.onFailure(exception);
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ExportException {
|
||||
protected void doClose(ActionListener<Void> listener) {
|
||||
if (isClosed() == false) {
|
||||
requestBuilder = null;
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,14 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.monitoring;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.xpack.ml.action.CloseJobAction.Response;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
|
||||
|
@ -124,12 +126,13 @@ public class MonitoringServiceTests extends ESTestCase {
|
|||
private final AtomicInteger exports = new AtomicInteger(0);
|
||||
|
||||
CountingExporter() {
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void export(Collection<MonitoringDoc> docs) throws ExportException {
|
||||
public void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) {
|
||||
exports.incrementAndGet();
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
int getExportsCount() {
|
||||
|
@ -159,13 +162,16 @@ public class MonitoringServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void export(Collection<MonitoringDoc> docs) throws ExportException {
|
||||
super.export(docs);
|
||||
public void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) {
|
||||
super.export(docs, ActionListener.wrap(r -> {
|
||||
try {
|
||||
latch.await();
|
||||
listener.onResponse(null);
|
||||
} catch (InterruptedException e) {
|
||||
throw new ExportException("BlockingExporter failed", e);
|
||||
listener.onFailure(new ExportException("BlockingExporter failed", e));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.monitoring.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -261,12 +262,13 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
private final Collection<MonitoringDoc> exported = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
CapturingExporters() {
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
|
||||
public synchronized void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) throws ExportException {
|
||||
exported.addAll(docs);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
public Collection<MonitoringDoc> getExported() {
|
||||
|
@ -282,13 +284,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
private final Consumer<Collection<? extends MonitoringDoc>> consumer;
|
||||
|
||||
ConsumingExporters(Consumer<Collection<? extends MonitoringDoc>> consumer) {
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService);
|
||||
super(Settings.EMPTY, Collections.emptyMap(), clusterService, threadPool.getThreadContext());
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void export(Collection<MonitoringDoc> docs) throws ExportException {
|
||||
public synchronized void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) throws ExportException {
|
||||
consumer.accept(docs);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,13 +6,17 @@
|
|||
package org.elasticsearch.xpack.monitoring.exporter;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.local.LocalExporter;
|
||||
|
@ -50,21 +54,27 @@ public class ExportersTests extends ESTestCase {
|
|||
private Map<String, Exporter.Factory> factories;
|
||||
private ClusterService clusterService;
|
||||
private ClusterSettings clusterSettings;
|
||||
private ThreadContext threadContext;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
factories = new HashMap<>();
|
||||
|
||||
InternalClient client = mock(InternalClient.class);
|
||||
Client client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
|
||||
clusterService = mock(ClusterService.class);
|
||||
clusterSettings = new ClusterSettings(Settings.EMPTY,
|
||||
new HashSet<>(Arrays.asList(MonitoringSettings.INTERVAL, MonitoringSettings.EXPORTERS_SETTINGS)));
|
||||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
|
||||
// we always need to have the local exporter as it serves as the default one
|
||||
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, client, clusterService, mock(CleanerService.class)));
|
||||
factories.put(LocalExporter.TYPE, config -> new LocalExporter(config, internalClient, clusterService, mock(CleanerService.class)));
|
||||
|
||||
exporters = new Exporters(Settings.EMPTY, factories, clusterService);
|
||||
exporters = new Exporters(Settings.EMPTY, factories, clusterService, threadContext);
|
||||
}
|
||||
|
||||
public void testInitExportersDefault() throws Exception {
|
||||
|
@ -165,7 +175,7 @@ public class ExportersTests extends ESTestCase {
|
|||
clusterSettings = new ClusterSettings(nodeSettings, new HashSet<>(Arrays.asList(MonitoringSettings.EXPORTERS_SETTINGS)));
|
||||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
|
||||
exporters = new Exporters(nodeSettings, factories, clusterService) {
|
||||
exporters = new Exporters(nodeSettings, factories, clusterService, threadContext) {
|
||||
@Override
|
||||
Map<String, Exporter> initExporters(Settings settings) {
|
||||
settingsHolder.set(settings);
|
||||
|
@ -215,9 +225,9 @@ public class ExportersTests extends ESTestCase {
|
|||
settings.put("xpack.monitoring.exporters._name" + String.valueOf(i) + ".type", "record");
|
||||
}
|
||||
|
||||
factories.put("record", CountingExporter::new);
|
||||
factories.put("record", (s) -> new CountingExporter(s, threadContext));
|
||||
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService);
|
||||
Exporters exporters = new Exporters(settings.build(), factories, clusterService, threadContext);
|
||||
exporters.start();
|
||||
|
||||
final Thread[] threads = new Thread[3 + randomInt(7)];
|
||||
|
@ -249,12 +259,9 @@ public class ExportersTests extends ESTestCase {
|
|||
docs.add(new MonitoringDoc(MonitoredSystem.ES.getSystem(), Version.CURRENT.toString()));
|
||||
}
|
||||
barrier.await(10, TimeUnit.SECONDS);
|
||||
try {
|
||||
exporters.export(docs);
|
||||
logger.debug("--> thread [{}] successfully exported {} documents", threadNum, threadDocs);
|
||||
} catch (Exception e) {
|
||||
logger.debug("--> thread [{}] failed to export {} documents", threadNum, threadDocs);
|
||||
}
|
||||
exporters.export(docs, ActionListener.wrap(
|
||||
r -> logger.debug("--> thread [{}] successfully exported {} documents", threadNum, threadDocs),
|
||||
e -> logger.debug("--> thread [{}] failed to export {} documents", threadNum, threadDocs)));
|
||||
|
||||
}
|
||||
}, "export_thread_" + i);
|
||||
|
@ -318,14 +325,16 @@ public class ExportersTests extends ESTestCase {
|
|||
|
||||
private static final AtomicInteger count = new AtomicInteger(0);
|
||||
private List<CountingBulk> bulks = new CopyOnWriteArrayList<>();
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
CountingExporter(Config config) {
|
||||
CountingExporter(Config config, ThreadContext threadContext) {
|
||||
super(config);
|
||||
this.threadContext = threadContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExportBulk openBulk() {
|
||||
CountingBulk bulk = new CountingBulk(config.type() + "#" + count.getAndIncrement());
|
||||
CountingBulk bulk = new CountingBulk(config.type() + "#" + count.getAndIncrement(), threadContext);
|
||||
bulks.add(bulk);
|
||||
return bulk;
|
||||
}
|
||||
|
@ -347,8 +356,8 @@ public class ExportersTests extends ESTestCase {
|
|||
|
||||
private final AtomicInteger count = new AtomicInteger();
|
||||
|
||||
CountingBulk(String name) {
|
||||
super(name);
|
||||
CountingBulk(String name, ThreadContext threadContext) {
|
||||
super(name, threadContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -357,11 +366,13 @@ public class ExportersTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doFlush() {
|
||||
protected void doFlush(ActionListener<Void> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ExportException {
|
||||
protected void doClose(ActionListener<Void> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.DocWriteRequest;
|
|||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
|
@ -550,7 +551,9 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
|
|||
|
||||
// Wait for exporting bulks to be ready to export
|
||||
assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue())));
|
||||
exporters.export(docs);
|
||||
PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
exporters.export(docs, future);
|
||||
future.get();
|
||||
}
|
||||
|
||||
private HttpExporter getExporter(String nodeName) {
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.client.sniff.Sniffer;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporter.Config;
|
||||
|
@ -51,6 +52,7 @@ import static org.mockito.Mockito.when;
|
|||
public class HttpExporterTests extends ESTestCase {
|
||||
|
||||
private final SSLService sslService = mock(SSLService.class);
|
||||
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
|
||||
public void testExporterWithBlacklistedHeaders() {
|
||||
final String blacklistedHeader = randomFrom(HttpExporter.BLACKLISTED_HEADERS);
|
||||
|
@ -67,7 +69,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception =
|
||||
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(), equalTo(expected));
|
||||
}
|
||||
|
@ -86,7 +89,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception =
|
||||
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(), equalTo(expected));
|
||||
}
|
||||
|
@ -101,7 +105,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception = expectThrows(SettingsException.class,
|
||||
() -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(), equalTo(expected));
|
||||
}
|
||||
|
@ -121,7 +126,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception =
|
||||
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(), equalTo("missing required setting [xpack.monitoring.exporters._http.host]"));
|
||||
}
|
||||
|
@ -133,7 +139,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception =
|
||||
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(),
|
||||
equalTo("[xpack.monitoring.exporters._http.host] must use a consistent scheme: http or https"));
|
||||
|
@ -158,7 +165,8 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
final SettingsException exception = expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService));
|
||||
final SettingsException exception =
|
||||
expectThrows(SettingsException.class, () -> new HttpExporter(config, sslService, threadContext));
|
||||
|
||||
assertThat(exception.getMessage(), equalTo("[xpack.monitoring.exporters._http.host] invalid host: [" + invalidHost + "]"));
|
||||
}
|
||||
|
@ -173,7 +181,7 @@ public class HttpExporterTests extends ESTestCase {
|
|||
|
||||
final Config config = createConfig(builder.build());
|
||||
|
||||
new HttpExporter(config, sslService).close();
|
||||
new HttpExporter(config, sslService, threadContext).close();
|
||||
}
|
||||
|
||||
public void testCreateRestClient() throws IOException {
|
||||
|
@ -365,7 +373,7 @@ public class HttpExporterTests extends ESTestCase {
|
|||
final ResolversRegistry resolvers = mock(ResolversRegistry.class);
|
||||
final HttpResource resource = new MockHttpResource(exporterName(), true, PublishableHttpResource.CheckResponse.ERROR, false);
|
||||
|
||||
try (HttpExporter exporter = new HttpExporter(config, client, sniffer, listener, resolvers, resource)) {
|
||||
try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, listener, resolvers, resource)) {
|
||||
verify(listener).setResource(resource);
|
||||
|
||||
assertThat(exporter.openBulk(), nullValue());
|
||||
|
@ -381,7 +389,7 @@ public class HttpExporterTests extends ESTestCase {
|
|||
// sometimes dirty to start with and sometimes not; but always succeeds on checkAndPublish
|
||||
final HttpResource resource = new MockHttpResource(exporterName(), randomBoolean());
|
||||
|
||||
try (HttpExporter exporter = new HttpExporter(config, client, sniffer, listener, resolvers, resource)) {
|
||||
try (HttpExporter exporter = new HttpExporter(config, client, sniffer, threadContext, listener, resolvers, resource)) {
|
||||
verify(listener).setResource(resource);
|
||||
|
||||
final HttpExportBulk bulk = exporter.openBulk();
|
||||
|
@ -406,7 +414,7 @@ public class HttpExporterTests extends ESTestCase {
|
|||
doThrow(randomFrom(new IOException("expected"), new RuntimeException("expected"))).when(client).close();
|
||||
}
|
||||
|
||||
new HttpExporter(config, client, sniffer, listener, resolvers, resource).close();
|
||||
new HttpExporter(config, client, sniffer, threadContext, listener, resolvers, resource).close();
|
||||
|
||||
// order matters; sniffer must close first
|
||||
if (sniffer != null) {
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.monitoring.exporter.local;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -132,15 +133,16 @@ public class LocalExporterTemplateTests extends MonitoringIntegTestCase {
|
|||
|
||||
protected void doExporting() throws Exception {
|
||||
// TODO: these should be unit tests, not using guice (copied from now-deleted AbstractExporterTemplateTestCase)
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
|
||||
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class);
|
||||
LicenseService licenseService = internalCluster().getInstance(LicenseService.class);
|
||||
InternalClient client = internalCluster().getInstance(InternalClient.class);
|
||||
final String node = randomFrom(internalCluster().getNodeNames());
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node);
|
||||
XPackLicenseState licenseState = internalCluster().getInstance(XPackLicenseState.class, node);
|
||||
LicenseService licenseService = internalCluster().getInstance(LicenseService.class, node);
|
||||
InternalClient client = internalCluster().getInstance(InternalClient.class, node);
|
||||
Collector collector = new ClusterStatsCollector(clusterService.getSettings(), clusterService,
|
||||
new MonitoringSettings(clusterService.getSettings(), clusterService.getClusterSettings()),
|
||||
licenseState, client, licenseService);
|
||||
|
||||
Exporters exporters = internalCluster().getInstance(Exporters.class);
|
||||
Exporters exporters = internalCluster().getInstance(Exporters.class, node);
|
||||
assertNotNull(exporters);
|
||||
|
||||
Exporter exporter = exporters.getExporter("_exporter");
|
||||
|
@ -148,7 +150,9 @@ public class LocalExporterTemplateTests extends MonitoringIntegTestCase {
|
|||
// Wait for exporting bulks to be ready to export
|
||||
Runnable busy = () -> assertThat(exporter.openBulk(), notNullValue());
|
||||
assertBusy(busy);
|
||||
exporters.export(collector.collect());
|
||||
PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
exporters.export(collector.collect(), future);
|
||||
future.get();
|
||||
}
|
||||
|
||||
private String dataTemplateName() {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -166,8 +167,9 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
|
|||
|
||||
LocalBulk bulk = (LocalBulk) exporter.openBulk();
|
||||
bulk.add(Collections.singletonList(newRandomMonitoringDoc()));
|
||||
bulk.close(true);
|
||||
|
||||
PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
bulk.close(true, future);
|
||||
future.get();
|
||||
} catch (ElasticsearchException e) {
|
||||
assertThat(e.getMessage(), containsString("failed to flush export bulk [_local]"));
|
||||
assertThat(e.getCause(), instanceOf(ExportException.class));
|
||||
|
@ -188,7 +190,9 @@ public class LocalExporterTests extends MonitoringIntegTestCase {
|
|||
|
||||
// Wait for exporting bulks to be ready to export
|
||||
assertBusy(() -> exporters.forEach(exporter -> assertThat(exporter.openBulk(), notNullValue())));
|
||||
exporters.export(docs);
|
||||
PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
exporters.export(docs, future);
|
||||
future.get();
|
||||
}
|
||||
|
||||
private LocalExporter getLocalExporter(String name) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue