Monitoring: Change ExportBulk so that it has states

Original commit: elastic/x-pack-elasticsearch@8dc55dc0d2
This commit is contained in:
Tanguy Leroux 2016-04-04 17:47:00 +02:00
parent b65787f7dd
commit 4df6f0f701
4 changed files with 109 additions and 51 deletions

View File

@ -6,13 +6,16 @@
package org.elasticsearch.marvel.agent.exporter; package org.elasticsearch.marvel.agent.exporter;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* * An export bulk holds one of more documents until it got flushed. The {@link ExportBulk#flush()} usually triggers the exporting of the
* documents to their final destination.
*/ */
public abstract class ExportBulk { public abstract class ExportBulk {
protected final String name; protected final String name;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZING);
public ExportBulk(String name) { public ExportBulk(String name) {
this.name = name; this.name = name;
@ -23,36 +26,73 @@ public abstract class ExportBulk {
return name; return name;
} }
public abstract ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException; /**
* Add documents to the exporting bulk
public abstract void flush() throws ExportException; */
public void add(Collection<MonitoringDoc> docs) throws ExportException {
public final void close(boolean flush) throws ExportException { if (state.get() == State.INITIALIZING) {
ExportException exception = null; doAdd(docs);
if (flush) {
flush();
} }
}
// now closing protected abstract void doAdd(Collection<MonitoringDoc> docs) throws ExportException;
try {
onClose(); /**
} catch (Exception e) { * Flush the exporting bulk
*/
public void flush() throws ExportException {
if (state.compareAndSet(State.INITIALIZING, State.FLUSHING)) {
doFlush();
}
}
protected abstract void doFlush();
/**
* Close the exporting bulk
*/
public void close(boolean flush) throws ExportException {
if (state.getAndSet(State.CLOSED) != State.CLOSED) {
ExportException exception = null;
try {
if (flush) {
doFlush();
}
} catch (ExportException e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
} finally {
try {
doClose();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = new ExportException("Exception when closing export bulk", e);
}
}
}
// rethrow exception
if (exception != null) { if (exception != null) {
exception.addSuppressed(e); throw exception;
} else {
exception = new ExportException("Exception when closing export bulk", e);
} }
} }
// rethrow exception
if (exception != null) {
throw exception;
}
} }
protected void onClose() throws Exception { protected abstract void doClose() throws ExportException;
protected boolean isClosed() {
return state.get() == State.CLOSED;
} }
/**
* This class holds multiple export bulks exposed as a single compound bulk.
*/
public static class Compound extends ExportBulk { public static class Compound extends ExportBulk {
private final Collection<ExportBulk> bulks; private final Collection<ExportBulk> bulks;
@ -63,7 +103,7 @@ public abstract class ExportBulk {
} }
@Override @Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException { protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
ExportException exception = null; ExportException exception = null;
for (ExportBulk bulk : bulks) { for (ExportBulk bulk : bulks) {
try { try {
@ -78,11 +118,10 @@ public abstract class ExportBulk {
if (exception != null) { if (exception != null) {
throw exception; throw exception;
} }
return this;
} }
@Override @Override
public void flush() throws ExportException { protected void doFlush() {
ExportException exception = null; ExportException exception = null;
for (ExportBulk bulk : bulks) { for (ExportBulk bulk : bulks) {
try { try {
@ -100,11 +139,13 @@ public abstract class ExportBulk {
} }
@Override @Override
protected void onClose() throws Exception { protected void doClose() throws ExportException {
ExportException exception = null; ExportException exception = null;
for (ExportBulk bulk : bulks) { for (ExportBulk bulk : bulks) {
try { try {
bulk.onClose(); // We can close without flushing since doFlush()
// would have been called by the parent class
bulk.close(false);
} catch (ExportException e) { } catch (ExportException e) {
if (exception == null) { if (exception == null) {
exception = new ExportException("failed to close export bulks"); exception = new ExportException("failed to close export bulks");
@ -117,4 +158,10 @@ public abstract class ExportBulk {
} }
} }
} }
private enum State {
INITIALIZING,
FLUSHING,
CLOSED
}
} }

View File

@ -237,8 +237,7 @@ public class HttpExporter extends Exporter {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException { private void sendCloseExportingConnection(HttpURLConnection conn) throws IOException {
logger.trace("sending content"); logger.trace("sending content");
OutputStream os = conn.getOutputStream(); closeExportingConnection(conn);
os.close();
if (conn.getResponseCode() != 200) { if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn); logConnectionError("remote target didn't respond with 200 OK", conn);
return; return;
@ -263,6 +262,12 @@ public class HttpExporter extends Exporter {
} }
} }
private void closeExportingConnection(HttpURLConnection connection) throws IOException {
try (OutputStream os = connection.getOutputStream()) {
logger.debug("closing exporting connection [{}]", connection);
}
}
/** /**
* open a connection to any host, validating it has the template installed if needed * open a connection to any host, validating it has the template installed if needed
* *
@ -701,7 +706,7 @@ public class HttpExporter extends Exporter {
} }
@Override @Override
public Bulk add(Collection<MonitoringDoc> docs) throws ExportException { public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
try { try {
if ((docs != null) && (!docs.isEmpty())) { if ((docs != null) && (!docs.isEmpty())) {
if (connection == null) { if (connection == null) {
@ -731,14 +736,13 @@ public class HttpExporter extends Exporter {
} catch (Exception e) { } catch (Exception e) {
throw new ExportException("failed to add documents to export bulk [{}]", name); throw new ExportException("failed to add documents to export bulk [{}]", name);
} }
return this;
} }
@Override @Override
public void flush() throws ExportException { public void doFlush() throws ExportException {
if (connection != null) { if (connection != null) {
try { try {
flush(connection); sendCloseExportingConnection(connection);
} catch (Exception e) { } catch (Exception e) {
throw new ExportException("failed to flush export bulk [{}]", e, name); throw new ExportException("failed to flush export bulk [{}]", e, name);
} finally { } finally {
@ -747,8 +751,17 @@ public class HttpExporter extends Exporter {
} }
} }
private void flush(HttpURLConnection connection) throws IOException { @Override
sendCloseExportingConnection(connection); protected void doClose() throws ExportException {
if (connection != null) {
try {
closeExportingConnection(connection);
} catch (Exception e) {
throw new ExportException("failed to close export bulk [{}]", e, name);
} finally {
connection = null;
}
}
} }
} }

View File

@ -20,18 +20,16 @@ import org.elasticsearch.marvel.support.init.proxy.MonitoringClientProxy;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the * LocalBulk exports monitoring data in the local cluster using bulk requests. Its usage is not thread safe since the
* {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#onClose()} methods are not synchronized. * {@link LocalBulk#add(Collection)}, {@link LocalBulk#flush()} and {@link LocalBulk#doClose()} methods are not synchronized.
*/ */
public class LocalBulk extends ExportBulk { public class LocalBulk extends ExportBulk {
private final ESLogger logger; private final ESLogger logger;
private final MonitoringClientProxy client; private final MonitoringClientProxy client;
private final ResolversRegistry resolvers; private final ResolversRegistry resolvers;
private final AtomicBoolean closed;
private BulkRequestBuilder requestBuilder; private BulkRequestBuilder requestBuilder;
@ -41,16 +39,15 @@ public class LocalBulk extends ExportBulk {
this.logger = logger; this.logger = logger;
this.client = client; this.client = client;
this.resolvers = resolvers; this.resolvers = resolvers;
this.closed = new AtomicBoolean(false);
} }
@Override @Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException { public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
ExportException exception = null; ExportException exception = null;
for (MonitoringDoc doc : docs) { for (MonitoringDoc doc : docs) {
if (closed.get()) { if (isClosed()) {
return this; return;
} }
if (requestBuilder == null) { if (requestBuilder == null) {
requestBuilder = client.prepareBulk(); requestBuilder = client.prepareBulk();
@ -77,13 +74,11 @@ public class LocalBulk extends ExportBulk {
if (exception != null) { if (exception != null) {
throw exception; throw exception;
} }
return this;
} }
@Override @Override
public void flush() throws ExportException { public void doFlush() throws ExportException {
if (closed.get() || requestBuilder == null || requestBuilder.numberOfActions() == 0) { if (requestBuilder == null || requestBuilder.numberOfActions() == 0 || isClosed()) {
return; return;
} }
try { try {
@ -114,8 +109,8 @@ public class LocalBulk extends ExportBulk {
} }
@Override @Override
protected void onClose() throws Exception { protected void doClose() throws ExportException {
if (closed.compareAndSet(false, true)) { if (isClosed() == false) {
requestBuilder = null; requestBuilder = null;
} }
} }

View File

@ -413,13 +413,16 @@ public class ExportersTests extends ESTestCase {
} }
@Override @Override
public ExportBulk add(Collection<MonitoringDoc> docs) throws ExportException { protected void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
count.addAndGet(docs.size()); count.addAndGet(docs.size());
return this;
} }
@Override @Override
public void flush() throws ExportException { protected void doFlush() {
}
@Override
protected void doClose() throws ExportException {
} }
AtomicInteger getCount() { AtomicInteger getCount() {