diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java index feb3a1b3007..8255bd0e40c 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/ExportBulk.java @@ -6,13 +6,16 @@ package org.elasticsearch.marvel.agent.exporter; import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; /** - * + * An export bulk holds one of more documents until it got flushed. The {@link ExportBulk#flush()} usually triggers the exporting of the + * documents to their final destination. */ public abstract class ExportBulk { protected final String name; + private final AtomicReference state = new AtomicReference<>(State.INITIALIZING); public ExportBulk(String name) { this.name = name; @@ -23,36 +26,73 @@ public abstract class ExportBulk { return name; } - public abstract ExportBulk add(Collection docs) throws ExportException; - - public abstract void flush() throws ExportException; - - public final void close(boolean flush) throws ExportException { - ExportException exception = null; - if (flush) { - flush(); + /** + * Add documents to the exporting bulk + */ + public void add(Collection docs) throws ExportException { + if (state.get() == State.INITIALIZING) { + doAdd(docs); } + } - // now closing - try { - onClose(); - } catch (Exception e) { + protected abstract void doAdd(Collection docs) throws ExportException; + + /** + * Flush the exporting bulk + */ + public void flush() throws ExportException { + if (state.compareAndSet(State.INITIALIZING, State.FLUSHING)) { + doFlush(); + } + } + + protected abstract void doFlush(); + + /** + * Close the exporting bulk + */ + public void close(boolean flush) throws ExportException { + if (state.getAndSet(State.CLOSED) != State.CLOSED) { + + ExportException exception = null; + try { + if (flush) { + doFlush(); + } + } catch (ExportException e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = e; + } + } finally { + try { + doClose(); + } catch (Exception e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = new ExportException("Exception when closing export bulk", e); + } + } + } + + // rethrow exception if (exception != null) { - exception.addSuppressed(e); - } else { - exception = new ExportException("Exception when closing export bulk", e); + throw exception; } } - - // rethrow exception - if (exception != null) { - throw exception; - } } - protected void onClose() throws Exception { + protected abstract void doClose() throws ExportException; + + protected boolean isClosed() { + return state.get() == State.CLOSED; } + /** + * This class holds multiple export bulks exposed as a single compound bulk. + */ public static class Compound extends ExportBulk { private final Collection bulks; @@ -63,7 +103,7 @@ public abstract class ExportBulk { } @Override - public ExportBulk add(Collection docs) throws ExportException { + protected void doAdd(Collection docs) throws ExportException { ExportException exception = null; for (ExportBulk bulk : bulks) { try { @@ -78,11 +118,10 @@ public abstract class ExportBulk { if (exception != null) { throw exception; } - return this; } @Override - public void flush() throws ExportException { + protected void doFlush() { ExportException exception = null; for (ExportBulk bulk : bulks) { try { @@ -100,11 +139,13 @@ public abstract class ExportBulk { } @Override - protected void onClose() throws Exception { + protected void doClose() throws ExportException { ExportException exception = null; for (ExportBulk bulk : bulks) { try { - bulk.onClose(); + // We can close without flushing since doFlush() + // would have been called by the parent class + bulk.close(false); } catch (ExportException e) { if (exception == null) { exception = new ExportException("failed to close export bulks"); @@ -117,4 +158,10 @@ public abstract class ExportBulk { } } } + + private enum State { + INITIALIZING, + FLUSHING, + CLOSED + } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java index 872673ff443..0bf61765478 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/http/HttpExporter.java @@ -237,8 +237,7 @@ public class HttpExporter extends Exporter { @SuppressWarnings("unchecked") private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException { logger.trace("sending content"); - OutputStream os = conn.getOutputStream(); - os.close(); + closeExportingConnection(conn); if (conn.getResponseCode() != 200) { logConnectionError("remote target didn't respond with 200 OK", conn); return; @@ -263,6 +262,12 @@ public class HttpExporter extends Exporter { } } + private void closeExportingConnection(HttpURLConnection connection) throws IOException { + try (OutputStream os = connection.getOutputStream()) { + logger.debug("closing exporting connection [{}]", connection); + } + } + /** * open a connection to any host, validating it has the template installed if needed * @@ -701,7 +706,7 @@ public class HttpExporter extends Exporter { } @Override - public Bulk add(Collection docs) throws ExportException { + public void doAdd(Collection docs) throws ExportException { try { if ((docs != null) && (!docs.isEmpty())) { if (connection == null) { @@ -731,14 +736,13 @@ public class HttpExporter extends Exporter { } catch (Exception e) { throw new ExportException("failed to add documents to export bulk [{}]", name); } - return this; } @Override - public void flush() throws ExportException { + public void doFlush() throws ExportException { if (connection != null) { try { - flush(connection); + sendCloseExportingConnection(connection); } catch (Exception e) { throw new ExportException("failed to flush export bulk [{}]", e, name); } finally { @@ -747,8 +751,17 @@ public class HttpExporter extends Exporter { } } - private void flush(HttpURLConnection connection) throws IOException { - sendCloseExportingConnection(connection); + @Override + protected void doClose() throws ExportException { + if (connection != null) { + try { + closeExportingConnection(connection); + } catch (Exception e) { + throw new ExportException("failed to close export bulk [{}]", e, name); + } finally { + connection = null; + } + } } } diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java index 2b739d324ec..84e51538398 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/agent/exporter/local/LocalBulk.java @@ -20,18 +20,16 @@ import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy; import java.util.Arrays; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; /** * LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the - * {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#onClose()} methods are not synchronized. + * {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#doClose()} methods are not synchronized. */ public class LocalBulk extends ExportBulk { private final ESLogger logger; private final MonitoringClientProxy client; private final ResolversRegistry resolvers; - private final AtomicBoolean closed; private BulkRequestBuilder requestBuilder; @@ -41,16 +39,15 @@ public class LocalBulk extends ExportBulk { this.logger = logger; this.client = client; this.resolvers = resolvers; - this.closed = new AtomicBoolean(false); } @Override - public ExportBulk add(Collection docs) throws ExportException { + public void doAdd(Collection docs) throws ExportException { ExportException exception = null; for (MonitoringDoc doc : docs) { - if (closed.get()) { - return this; + if (isClosed()) { + return; } if (requestBuilder == null) { requestBuilder = client.prepareBulk(); @@ -77,13 +74,11 @@ public class LocalBulk extends ExportBulk { if (exception != null) { throw exception; } - - return this; } @Override - public void flush() throws ExportException { - if (closed.get() || requestBuilder == null || requestBuilder.numberOfActions() == 0) { + public void doFlush() throws ExportException { + if (requestBuilder == null || requestBuilder.numberOfActions() == 0 || isClosed()) { return; } try { @@ -114,8 +109,8 @@ public class LocalBulk extends ExportBulk { } @Override - protected void onClose() throws Exception { - if (closed.compareAndSet(false, true)) { + protected void doClose() throws ExportException { + if (isClosed() == false) { requestBuilder = null; } } diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java index 9a46c3e727d..39d2b631bb0 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/agent/exporter/ExportersTests.java @@ -413,13 +413,16 @@ public class ExportersTests extends ESTestCase { } @Override - public ExportBulk add(Collection docs) throws ExportException { + protected void doAdd(Collection docs) throws ExportException { count.addAndGet(docs.size()); - return this; } @Override - public void flush() throws ExportException { + protected void doFlush() { + } + + @Override + protected void doClose() throws ExportException { } AtomicInteger getCount() {