Add LocalExporter implementation

Original commit: elastic/x-pack-elasticsearch@36a683294c
This commit is contained in:
Tanguy Leroux 2015-09-23 18:43:09 +02:00 committed by uboness
parent 1c5e03a239
commit 415f6eda79
8 changed files with 460 additions and 24 deletions

View File

@ -36,8 +36,12 @@ public abstract class Exporter {
return false;
}
public abstract void start();
public abstract void export(Collection<MarvelDoc> marvelDocs) throws Exception;
public abstract void stop();
public abstract void close();
protected String settingFQN(String setting) {

View File

@ -48,10 +48,46 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
@Override
protected void doStart() {
exporters = initExporters(settings.getAsSettings(EXPORTERS_SETTING));
ElasticsearchException exception = null;
for (Exporter exporter : exporters) {
try {
exporter.start();
} catch (Exception e) {
logger.error("exporter [{}] failed to start", e, exporter.name());
if (exception == null) {
exception = new ElasticsearchException("failed to start exporters");
}
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
}
@Override
protected void doStop() {
ElasticsearchException exception = null;
for (Exporter exporter : exporters) {
try {
exporter.stop();
} catch (Exception e) {
logger.error("exporter [{}] failed to stop", e, exporter.name());
if (exception == null) {
exception = new ElasticsearchException("failed to stop exporters");
}
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
}
@Override
protected void doClose() {
ElasticsearchException exception = null;
for (Exporter exporter : exporters) {
try {
@ -69,10 +105,6 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
}
}
@Override
protected void doClose() {
}
public Exporter getExporter(String name) {
return exporters.get(name);
}
@ -109,8 +141,10 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
.put(updatedSettings)
.build());
existing.close(logger);
exporters.start(logger);
}
// TODO only rebuild the exporters that need to be updated according to settings
CurrentExporters initExporters(Settings settings) {
Set<String> singletons = new HashSet<>();
Map<String, Exporter> exporters = new HashMap<>();
@ -184,6 +218,16 @@ public class Exporters extends AbstractLifecycleComponent<Exporters> implements
return exporters.get(name);
}
void start(ESLogger logger) {
for (Exporter exporter : exporters.values()) {
try {
exporter.start();
} catch (Exception e) {
logger.error("failed to start exporter [{}]", e, exporter.name());
}
}
}
void close(ESLogger logger) {
for (Exporter exporter : exporters.values()) {
try {

View File

@ -73,10 +73,10 @@ public class HttpExporter extends Exporter {
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
/** Minimum supported version of the remote template **/
static final Version MIN_SUPPORTED_TEMPLATE_VERSOIN = Version.V_2_0_0_beta2;
public static final Version MIN_SUPPORTED_TEMPLATE_VERSION = Version.V_2_0_0_beta2;
/** Minimum supported version of the remote marvel cluster **/
static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2;
public static final Version MIN_SUPPORTED_CLUSTER_VERSION = Version.V_2_0_0_beta2;
volatile String[] hosts;
final TimeValue connectionTimeout;
@ -153,6 +153,16 @@ public class HttpExporter extends Exporter {
MarvelSettings.MARVEL_INDICES_PREFIX, indexTimeFormat, templateVersion);
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
HttpURLConnection connection = openExportingConnection();
@ -466,11 +476,11 @@ public class HttpExporter extends Exporter {
logger.warn("marvel template version cannot be found: template will be updated to version [{}]", templateVersion);
} else {
if (remoteVersion.before(MIN_SUPPORTED_TEMPLATE_VERSOIN)) {
if (remoteVersion.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}] on host [{}]: "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
remoteVersion, MIN_SUPPORTED_TEMPLATE_VERSOIN, host);
remoteVersion, MIN_SUPPORTED_TEMPLATE_VERSION, host);
return false;
}

View File

@ -22,7 +22,7 @@ import java.util.regex.Pattern;
public class HttpExporterUtils {
public static final String MARVEL_TEMPLATE_FILE = "/marvel_index_template.json";
static final String MARVEL_VERSION_FIELD = "marvel_version";
public static final String MARVEL_VERSION_FIELD = "marvel_version";
static final String VERSION_FIELD = "number";
public static URL parseHostWithPath(String host, String path) throws URISyntaxException, MalformedURLException {

View File

@ -5,13 +5,46 @@
*/
package org.elasticsearch.marvel.agent.exporter.local;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.MarvelDoc;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporterUtils;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.shield.SecuredClient;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_CLUSTER_VERSION;
import static org.elasticsearch.marvel.agent.exporter.http.HttpExporter.MIN_SUPPORTED_TEMPLATE_VERSION;
/**
*
@ -20,34 +53,353 @@ public class LocalExporter extends Exporter {
public static final String TYPE = "local";
private final Client client;
public static final String INDEX_TEMPLATE_NAME = "marvel";
public LocalExporter(Exporter.Config config, SecuredClient client) {
public static final String QUEUE_SIZE_SETTING = "queue_max_size";
public static final String BULK_SIZE_SETTING = "bulk_size";
public static final String BULK_FLUSH_INTERVAL_SETTING = "bulk_flush_interval";
public static final String INDEX_NAME_TIME_FORMAT_SETTING = "index.name.time_format";
public static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
public static final int DEFAULT_BULK_SIZE = 1000;
public static final int MAX_BULK_SIZE = 10000;
public static final TimeValue DEFAULT_BULK_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
public static final String DEFAULT_INDEX_NAME_TIME_FORMAT = "YYYY.MM.dd";
private final Client client;
private final ClusterService clusterService;
private final RendererRegistry registry;
private final QueueConsumer queueConsumer;
private final DateTimeFormatter indexTimeFormatter;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final LinkedBlockingQueue<IndexRequest> queue;
/**
* Version of the built-in template
**/
private final Version builtInTemplateVersion;
public LocalExporter(Exporter.Config config, SecuredClient client, ClusterService clusterService, RendererRegistry registry) {
super(TYPE, config);
this.client = client;
this.clusterService = clusterService;
this.registry = registry;
this.queueConsumer = new QueueConsumer(EsExecutors.threadName(config.settings(), "marvel-queue-consumer-" + config.name()));
int maxQueueSize = config.settings().getAsInt(QUEUE_SIZE_SETTING, DEFAULT_MAX_QUEUE_SIZE);
if (maxQueueSize <= 0) {
logger.warn("invalid value [{}] for setting [{}]. using default value [{}]", maxQueueSize, QUEUE_SIZE_SETTING, DEFAULT_MAX_QUEUE_SIZE);
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
}
this.queue = new LinkedBlockingQueue<>(maxQueueSize);
String indexTimeFormat = config.settings().get(INDEX_NAME_TIME_FORMAT_SETTING, DEFAULT_INDEX_NAME_TIME_FORMAT);
try {
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("invalid marvel index name time format [" + indexTimeFormat + "] set for [" + settingFQN(INDEX_NAME_TIME_FORMAT_SETTING) + "]", e);
}
// Checks that the built-in template is versioned
builtInTemplateVersion = HttpExporterUtils.parseTemplateVersion(HttpExporterUtils.loadDefaultTemplate());
if (builtInTemplateVersion == null) {
throw new IllegalStateException("unable to find built-in template version");
}
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
public void start() {
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
queueConsumer.start();
state.set(State.STARTED);
}
}
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING) || state.compareAndSet(State.EXPORTING, State.STOPPING)) {
try {
queueConsumer.interrupt();
} finally {
state.set(State.STOPPED);
}
}
}
@Override
public void close() {
if (state.get() != State.STOPPED) {
stop();
}
}
private boolean canExport() {
if (state.get() == State.EXPORTING) {
return true;
}
if (state.get() != State.STARTED) {
return false;
}
ClusterState clusterState = clusterState();
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .marvel-es-
// indices but they may not have been restored from the cluster state on disk
logger.debug("exporter [{}] waiting until gateway has recovered from disk", name());
return false;
}
Version clusterVersion = clusterVersion();
if ((clusterVersion == null) || clusterVersion.before(MIN_SUPPORTED_CLUSTER_VERSION)) {
logger.error("cluster version [" + clusterVersion + "] is not supported, please use a cluster with minimum version [" + MIN_SUPPORTED_CLUSTER_VERSION + "]");
state.set(State.FAILED);
return false;
}
Version templateVersion = templateVersion();
if (clusterService.state().nodes().localNodeMaster() == false) {
if (templateVersion == null) {
logger.debug("marvel index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return false;
}
if (clusterState.routingTable().index(indexName()).allPrimaryShardsActive() == false) {
logger.debug("marvel index [{}] has some primary shards not yet started, so service cannot start", indexName());
return false;
}
} else if (shouldUpdateTemplate(templateVersion, builtInTemplateVersion)) {
putTemplate(config.settings().getAsSettings("template.settings"));
}
logger.debug("exporter [{}] can now export marvel data", name());
state.set(State.EXPORTING);
return true;
}
ClusterState clusterState() {
return client.admin().cluster().prepareState().get().getState();
}
Version clusterVersion() {
return Version.CURRENT;
}
Version templateVersion() {
for (IndexTemplateMetaData template : client.admin().indices().prepareGetTemplates(INDEX_TEMPLATE_NAME).get().getIndexTemplates()) {
if (template.getName().equals(INDEX_TEMPLATE_NAME)) {
String version = template.settings().get("index." + HttpExporterUtils.MARVEL_VERSION_FIELD);
if (Strings.hasLength(version)) {
return Version.fromString(version);
}
}
}
return null;
}
boolean shouldUpdateTemplate(Version current, Version expected) {
// Always update a template even if its version is not found
if (current == null) {
return true;
}
// Never update a template in an unknown version
if (expected == null) {
return false;
}
// Never update a very old template
if (current.before(MIN_SUPPORTED_TEMPLATE_VERSION)) {
logger.error("marvel template version [{}] is below the minimum compatible version [{}]: "
+ "please manually update the marvel template to a more recent version"
+ "and delete the current active marvel index (don't forget to back up it first if needed)",
current, MIN_SUPPORTED_TEMPLATE_VERSION);
return false;
}
// Always update a template to the last up-to-date version
if (expected.after(current)) {
logger.info("marvel template version will be updated to a newer version [current:{}, expected:{}]", current, expected);
return true;
// When the template is up-to-date, force an update for snapshot versions only
} else if (expected.equals(current)) {
logger.debug("marvel template version is up-to-date [current:{}, expected:{}]", current, expected);
return expected.snapshot();
// Never update a template that is newer than the expected one
} else {
logger.debug("marvel template version is newer than the one required by the marvel agent [current:{}, expected:{}]", current, expected);
return false;
}
}
void putTemplate(Settings customSettings) {
try (InputStream is = getClass().getResourceAsStream("/marvel_index_template.json")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
final byte[] template = out.toByteArray();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
.put(customSettings)
.build();
request.settings(updatedSettings);
}
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet();
if (!response.isAcknowledged()) {
throw new IllegalStateException("failed to put marvel index template");
}
} catch (Exception e) {
throw new IllegalStateException("failed to update marvel index template", e);
}
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) {
if (marvelDocs == null) {
logger.debug("no marvel documents to export");
return;
}
if (canExport() == false) {
logger.debug("exporter [{}] can not export data", name());
return;
}
BytesStreamOutput buffer = null;
for (MarvelDoc marvelDoc : marvelDocs) {
try {
IndexRequestBuilder request = client.prepareIndex();
if (marvelDoc.index() != null) {
request.setIndex(marvelDoc.index());
} else {
request.setIndex(indexName());
}
if (marvelDoc.type() != null) {
request.setType(marvelDoc.type());
}
if (marvelDoc.id() != null) {
request.setId(marvelDoc.id());
}
// Get the appropriate renderer in order to render the MarvelDoc
Renderer renderer = registry.renderer(marvelDoc.type());
if (renderer == null) {
logger.warn("unable to render marvel document of type [{}]. no renderer found in registry", marvelDoc.type());
return;
}
if (buffer == null) {
buffer = new BytesStreamOutput();
}
renderer.render(marvelDoc, XContentType.SMILE, buffer);
request.setSource(buffer.bytes().toBytes());
queue.add(request.request());
} catch (IOException e) {
logger.error("failed to export marvel data", e);
} finally {
if (buffer != null) {
buffer.reset();
}
}
}
}
String indexName() {
return MarvelSettings.MARVEL_INDICES_PREFIX + indexTimeFormatter.print(System.currentTimeMillis());
}
public static class Factory extends Exporter.Factory<LocalExporter> {
private final SecuredClient client;
private final RendererRegistry registry;
private final ClusterService clusterService;
@Inject
public Factory(SecuredClient client) {
public Factory(SecuredClient client, ClusterService clusterService, RendererRegistry registry) {
super(TYPE, true);
this.client = client;
this.clusterService = clusterService;
this.registry = registry;
}
@Override
public LocalExporter create(Config config) {
return new LocalExporter(config, client);
return new LocalExporter(config, client, clusterService, registry);
}
}
class QueueConsumer extends Thread {
private volatile boolean running = true;
QueueConsumer(String name) {
super(name);
setDaemon(true);
}
@Override
public void run() {
try (BulkProcessor bulkProcessor = createBulkProcessor(config)) {
while (running) {
try {
IndexRequest request = queue.take();
if (request != null) {
bulkProcessor.add(request);
}
} catch (InterruptedException e) {
logger.debug("marvel queue consumer interrupted, flushing bulk processor", e);
bulkProcessor.flush();
running = false;
Thread.currentThread().interrupt();
} catch (Exception e) {
// log the exception and keep going
logger.warn("failed to index marvel documents from queue", e);
}
}
}
}
private BulkProcessor createBulkProcessor(Config config) {
int bulkSize = Math.min(config.settings().getAsInt(BULK_SIZE_SETTING, DEFAULT_BULK_SIZE), MAX_BULK_SIZE);
bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize;
TimeValue interval = config.settings().getAsTime(BULK_FLUSH_INTERVAL_SETTING, DEFAULT_BULK_FLUSH_INTERVAL);
interval = (interval.millis() < 1) ? DEFAULT_BULK_FLUSH_INTERVAL : interval;
return BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.debug("executing [{}] bulk index requests", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.info("failed to bulk index marvel documents: [{}]", response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("failed to bulk index marvel documents: [{}]", failure, failure.getMessage());
}
}).setName("marvel-bulk-processor-" + config.name())
.setBulkActions(bulkSize)
.setFlushInterval(interval)
.setConcurrentRequests(1)
.build();
}
}
public enum State {
INITIALIZED,
STARTING,
STARTED,
EXPORTING,
STOPPING,
STOPPED,
FAILED
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.marvel.shield;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.shield.User;
import org.elasticsearch.shield.authz.Permission;
@ -20,7 +22,8 @@ public class MarvelInternalUserHolder {
static final String[] ROLE_NAMES = new String[] { "__marvel_role" };
public static final Permission.Global.Role ROLE = Permission.Global.Role.builder(ROLE_NAMES[0])
.cluster(Privilege.Cluster.action("indices:admin/template/put"))
.cluster(Privilege.Cluster.action(PutIndexTemplateAction.NAME))
.cluster(Privilege.Cluster.action(GetIndexTemplatesAction.NAME))
// we need all monitoring access
.cluster(Privilege.Cluster.MONITOR)

View File

@ -284,8 +284,8 @@ public class SecuredClient implements Client {
@Inject
public SecuredClient(Client client, MarvelShieldIntegration shieldIntegration) {
this.client = client;
this.admin = new Admin(client);
this.shieldIntegration = shieldIntegration;
this.admin = new Admin(this.client, this.shieldIntegration);
}
@Override
@ -666,16 +666,20 @@ public class SecuredClient implements Client {
static class IndicesAdmin implements IndicesAdminClient {
private final ElasticsearchClient client;
private final MarvelShieldIntegration shieldIntegration;
public IndicesAdmin(ElasticsearchClient client) {
public IndicesAdmin(ElasticsearchClient client, MarvelShieldIntegration shieldIntegration) {
this.client = client;
this.shieldIntegration = shieldIntegration;
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder> action, Request request) {
shieldIntegration.bindInternalMarvelUser(request);
return this.client.execute(action, request);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
shieldIntegration.bindInternalMarvelUser(request);
this.client.execute(action, request, listener);
}
@ -1106,16 +1110,20 @@ public class SecuredClient implements Client {
static class ClusterAdmin implements ClusterAdminClient {
private final ElasticsearchClient client;
private final MarvelShieldIntegration shieldIntegration;
public ClusterAdmin(ElasticsearchClient client) {
public ClusterAdmin(ElasticsearchClient client, MarvelShieldIntegration shieldIntegration) {
this.client = client;
this.shieldIntegration = shieldIntegration;
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder> action, Request request) {
shieldIntegration.bindInternalMarvelUser(request);
return this.client.execute(action, request);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
shieldIntegration.bindInternalMarvelUser(request);
this.client.execute(action, request, listener);
}
@ -1369,9 +1377,9 @@ public class SecuredClient implements Client {
private final ClusterAdmin clusterAdmin;
private final IndicesAdmin indicesAdmin;
public Admin(ElasticsearchClient client) {
this.clusterAdmin = new ClusterAdmin(client);
this.indicesAdmin = new IndicesAdmin(client);
public Admin(ElasticsearchClient client, MarvelShieldIntegration shieldIntegration) {
this.clusterAdmin = new ClusterAdmin(client, shieldIntegration);
this.indicesAdmin = new IndicesAdmin(client, shieldIntegration);
}
public ClusterAdminClient cluster() {

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.marvel.agent.exporter.local.LocalExporter;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.elasticsearch.marvel.shield.SecuredClient;
import org.elasticsearch.node.settings.NodeSettingsService;
@ -38,11 +39,14 @@ public class ExportersTests extends ESTestCase {
public void init() throws Exception {
factories = new HashMap<>();
SecuredClient securedClient = mock(SecuredClient.class);
when(securedClient.settings()).thenReturn(Settings.EMPTY);
clusterService = mock(ClusterService.class);
// we always need to have the local exporter as it serves as the default one
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(mock(SecuredClient.class)));
factories.put(LocalExporter.TYPE, new LocalExporter.Factory(securedClient, clusterService, mock(RendererRegistry.class)));
settingsFilter = mock(MarvelSettingsFilter.class);
clusterService = mock(ClusterService.class);
nodeSettingsService = mock(NodeSettingsService.class);
exporters = new Exporters(Settings.EMPTY, factories, settingsFilter, clusterService, nodeSettingsService);
}
@ -239,9 +243,10 @@ public class ExportersTests extends ESTestCase {
exporters.export(docsList);
verify(exporters.getExporter("_name0"), times(1)).masterOnly();
verify(exporters.getExporter("_name0"), times(1)).start();
verify(exporters.getExporter("_name0"), times(1)).export(docsList);
verify(exporters.getExporter("_name1"), times(1)).masterOnly();
verifyNoMoreInteractions(exporters.getExporter("_name1"));
verify(exporters.getExporter("_name1"), times(1)).start();
}
static class TestFactory extends Exporter.Factory<TestFactory.TestExporter> {
@ -261,10 +266,20 @@ public class ExportersTests extends ESTestCase {
super(type, config);
}
@Override
public void start() {
}
@Override
public void export(Collection<MarvelDoc> marvelDocs) throws Exception {
}
@Override
public void stop() {
}
@Override
public void close() {
}