[Monitoring] Ignore _bulk if Collection is Disabled (elastic/x-pack-elasticsearch#3910)
This blocks incoming requests from Kibana, Logstash, and Beats when X-Pack monitoring is effectively disabled by setting `xpack.monitoring.collection.interval: -1`. Original commit: elastic/x-pack-elasticsearch@016a9472f1
This commit is contained in:
parent
00dec27d9f
commit
176411e55e
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.monitoring.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -23,16 +24,18 @@ public class MonitoringBulkResponse extends ActionResponse {
|
|||
|
||||
private long tookInMillis;
|
||||
private Error error;
|
||||
private boolean ignored;
|
||||
|
||||
public MonitoringBulkResponse() {
|
||||
}
|
||||
|
||||
public MonitoringBulkResponse(long tookInMillis) {
|
||||
this(tookInMillis, null);
|
||||
public MonitoringBulkResponse(final long tookInMillis, final boolean ignored) {
|
||||
this.tookInMillis = tookInMillis;
|
||||
this.ignored = ignored;
|
||||
}
|
||||
|
||||
public MonitoringBulkResponse(long tookInMillis, Error error) {
|
||||
this.tookInMillis = tookInMillis;
|
||||
public MonitoringBulkResponse(final long tookInMillis, final Error error) {
|
||||
this(tookInMillis, false);
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
|
@ -44,15 +47,30 @@ public class MonitoringBulkResponse extends ActionResponse {
|
|||
return tookInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the request was ignored.
|
||||
*
|
||||
* @return {@code true} if the request was ignored because collection was disabled.
|
||||
*/
|
||||
public boolean isIgnored() {
|
||||
return ignored;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns HTTP status
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@link RestStatus#OK} if monitoring bulk request was successful</li>
|
||||
* <li>{@link RestStatus#ACCEPTED} if monitoring bulk request was ignored because collection is disabled</li>
|
||||
* <li>{@link RestStatus#INTERNAL_SERVER_ERROR} if monitoring bulk request was partially successful or failed completely</li>
|
||||
* </ul>
|
||||
*/
|
||||
public RestStatus status() {
|
||||
return error == null ? RestStatus.OK : RestStatus.INTERNAL_SERVER_ERROR;
|
||||
if (error == null) {
|
||||
return ignored ? RestStatus.ACCEPTED : RestStatus.OK;
|
||||
}
|
||||
|
||||
return RestStatus.INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
|
||||
public Error getError() {
|
||||
|
@ -64,6 +82,10 @@ public class MonitoringBulkResponse extends ActionResponse {
|
|||
super.readFrom(in);
|
||||
tookInMillis = in.readVLong();
|
||||
error = in.readOptionalWriteable(Error::new);
|
||||
|
||||
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
ignored = in.readBoolean();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,6 +93,10 @@ public class MonitoringBulkResponse extends ActionResponse {
|
|||
super.writeTo(out);
|
||||
out.writeVLong(tookInMillis);
|
||||
out.writeOptionalWriteable(error);
|
||||
|
||||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
out.writeBoolean(ignored);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Error implements Writeable, ToXContentObject {
|
||||
|
|
|
@ -90,7 +90,7 @@ public class MonitoringService extends AbstractLifecycleComponent {
|
|||
return interval;
|
||||
}
|
||||
|
||||
boolean isMonitoringActive() {
|
||||
public boolean isMonitoringActive() {
|
||||
return isStarted()
|
||||
&& interval != null
|
||||
&& interval.millis() >= MIN_INTERVAL.millis();
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkDoc;
|
|||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequest;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.BytesReferenceMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
||||
|
||||
|
@ -35,21 +36,30 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
|
|||
|
||||
private final ClusterService clusterService;
|
||||
private final Exporters exportService;
|
||||
private final MonitoringService monitoringService;
|
||||
|
||||
@Inject
|
||||
public TransportMonitoringBulkAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Exporters exportService) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Exporters exportService,
|
||||
MonitoringService monitoringService) {
|
||||
super(settings, MonitoringBulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
|
||||
MonitoringBulkRequest::new);
|
||||
this.clusterService = clusterService;
|
||||
this.exportService = exportService;
|
||||
this.monitoringService = monitoringService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(MonitoringBulkRequest request, ActionListener<MonitoringBulkResponse> listener) {
|
||||
clusterService.state().blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
|
||||
|
||||
// ignore incoming bulk requests when collection is disabled in ES
|
||||
if (monitoringService.isMonitoringActive() == false) {
|
||||
listener.onResponse(new MonitoringBulkResponse(0, true));
|
||||
return;
|
||||
}
|
||||
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
final String cluster = clusterService.state().metaData().clusterUUID();
|
||||
|
||||
|
@ -138,7 +148,7 @@ public class TransportMonitoringBulkAction extends HandledTransportAction<Monito
|
|||
}
|
||||
|
||||
private static MonitoringBulkResponse response(final long start) {
|
||||
return new MonitoringBulkResponse(took(start));
|
||||
return new MonitoringBulkResponse(took(start), false);
|
||||
}
|
||||
|
||||
private static MonitoringBulkResponse response(final long start, final Exception e) {
|
||||
|
|
|
@ -106,6 +106,7 @@ public class RestMonitoringBulkAction extends MonitoringRestHandler {
|
|||
builder.startObject();
|
||||
{
|
||||
builder.field("took", response.getTookInMillis());
|
||||
builder.field("ignored", response.isIgnored());
|
||||
|
||||
final MonitoringBulkResponse.Error error = response.getError();
|
||||
builder.field("errors", error != null);
|
||||
|
|
|
@ -25,12 +25,19 @@ public class MonitoringBulkResponseTests extends ESTestCase {
|
|||
|
||||
public void testResponseStatus() {
|
||||
final long took = Math.abs(randomLong());
|
||||
MonitoringBulkResponse response = new MonitoringBulkResponse(took);
|
||||
|
||||
MonitoringBulkResponse response = new MonitoringBulkResponse(took, false);
|
||||
|
||||
assertThat(response.getTookInMillis(), equalTo(took));
|
||||
assertThat(response.getError(), is(nullValue()));
|
||||
assertThat(response.status(), equalTo(RestStatus.OK));
|
||||
|
||||
response = new MonitoringBulkResponse(took, true);
|
||||
|
||||
assertThat(response.getTookInMillis(), equalTo(took));
|
||||
assertThat(response.getError(), is(nullValue()));
|
||||
assertThat(response.status(), equalTo(RestStatus.ACCEPTED));
|
||||
|
||||
ExportException exception = new ExportException(randomAlphaOfLength(10));
|
||||
response = new MonitoringBulkResponse(took, new MonitoringBulkResponse.Error(exception));
|
||||
|
||||
|
@ -44,7 +51,7 @@ public class MonitoringBulkResponseTests extends ESTestCase {
|
|||
for (int i = 0; i < iterations; i++) {
|
||||
MonitoringBulkResponse response;
|
||||
if (randomBoolean()) {
|
||||
response = new MonitoringBulkResponse(Math.abs(randomLong()));
|
||||
response = new MonitoringBulkResponse(Math.abs(randomLong()), randomBoolean());
|
||||
} else {
|
||||
Exception exception = randomFrom(
|
||||
new ExportException(randomAlphaOfLength(5), new IllegalStateException(randomAlphaOfLength(5))),
|
||||
|
@ -69,6 +76,12 @@ public class MonitoringBulkResponseTests extends ESTestCase {
|
|||
} else {
|
||||
assertThat(response2.getError(), is(notNullValue()));
|
||||
}
|
||||
|
||||
if (version.onOrAfter(Version.V_6_3_0)) {
|
||||
assertThat(response2.isIgnored(), is(response.isIgnored()));
|
||||
} else {
|
||||
assertThat(response2.isIgnored(), is(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.TaskAwareRequest;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -37,6 +38,7 @@ import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkDoc;
|
|||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequest;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringService;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoringTestUtils;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.BytesReferenceMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporters;
|
||||
|
@ -56,6 +58,7 @@ import static org.hamcrest.Matchers.hasToString;
|
|||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
@ -81,6 +84,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
private IndexNameExpressionResolver resolver;
|
||||
private XPackLicenseState licenseState;
|
||||
private TaskManager taskManager;
|
||||
private final MonitoringService monitoringService = mock(MonitoringService.class);
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -105,7 +109,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).blocks(clusterBlock).build());
|
||||
|
||||
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
|
||||
transportService, filters, resolver, exporters);
|
||||
transportService, filters, resolver, exporters,
|
||||
monitoringService);
|
||||
|
||||
final MonitoringBulkRequest request = randomRequest();
|
||||
final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get());
|
||||
|
@ -113,9 +118,35 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
|
||||
}
|
||||
|
||||
public void testExecuteEmptyRequest() {
|
||||
public void testExecuteIgnoresRequestWhenCollectionIsDisabled() throws Exception {
|
||||
when(clusterService.state()).thenReturn(ClusterState.builder(ClusterName.DEFAULT).build());
|
||||
when(monitoringService.isMonitoringActive()).thenReturn(false);
|
||||
|
||||
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
|
||||
transportService, filters, resolver, exporters);
|
||||
transportService, filters, resolver, exporters,
|
||||
monitoringService);
|
||||
|
||||
final MonitoringBulkDoc doc = mock(MonitoringBulkDoc.class);
|
||||
when(doc.getSource()).thenReturn(new BytesArray("test"));
|
||||
|
||||
final MonitoringBulkRequest request = new MonitoringBulkRequest();
|
||||
request.add(doc);
|
||||
|
||||
final MonitoringBulkResponse response = action.execute(request).get();
|
||||
|
||||
assertThat(response.status(), is(RestStatus.ACCEPTED));
|
||||
assertThat(response.isIgnored(), is(true));
|
||||
assertThat(response.getTookInMillis(), is(0L));
|
||||
assertThat(response.getError(), nullValue());
|
||||
}
|
||||
|
||||
public void testExecuteEmptyRequest() {
|
||||
// it validates the request before it tries to execute it
|
||||
when(monitoringService.isMonitoringActive()).thenReturn(randomBoolean());
|
||||
|
||||
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
|
||||
transportService, filters, resolver, exporters,
|
||||
monitoringService);
|
||||
|
||||
final MonitoringBulkRequest request = new MonitoringBulkRequest();
|
||||
final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get());
|
||||
|
@ -125,6 +156,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testExecuteRequest() throws Exception {
|
||||
when(monitoringService.isMonitoringActive()).thenReturn(true);
|
||||
|
||||
final DiscoveryNode discoveryNode = new DiscoveryNode("_id", new TransportAddress(TransportAddress.META_ADDRESS, 9300), CURRENT);
|
||||
when(clusterService.localNode()).thenReturn(discoveryNode);
|
||||
|
||||
|
@ -176,7 +209,8 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
}).when(exporters).export(any(Collection.class), any(ActionListener.class));
|
||||
|
||||
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
|
||||
transportService, filters, resolver, exporters);
|
||||
transportService, filters, resolver, exporters,
|
||||
monitoringService);
|
||||
action.execute(request).get();
|
||||
|
||||
verify(exporters).export(any(Collection.class), any(ActionListener.class));
|
||||
|
|
|
@ -66,7 +66,7 @@ public class LocalExporterIntegTests extends LocalExporterIntegTestCase {
|
|||
super();
|
||||
}
|
||||
|
||||
private void stopMonitoring() throws Exception {
|
||||
private void stopMonitoring() {
|
||||
// Now disabling the monitoring service, so that no more collection are started
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
|
||||
Settings.builder().putNull(MonitoringService.INTERVAL.getKey())
|
||||
|
@ -87,7 +87,9 @@ public class LocalExporterIntegTests extends LocalExporterIntegTestCase {
|
|||
indexRandom(true, indexRequestBuilders);
|
||||
}
|
||||
|
||||
Settings.Builder exporterSettings = Settings.builder()
|
||||
// start the monitoring service so that _xpack/monitoring/_bulk is not ignored
|
||||
final Settings.Builder exporterSettings = Settings.builder()
|
||||
.put(MonitoringService.INTERVAL.getKey(), 3L, TimeUnit.SECONDS)
|
||||
.put("xpack.monitoring.exporters._local.enabled", true)
|
||||
.put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", false);
|
||||
|
||||
|
@ -124,11 +126,6 @@ public class LocalExporterIntegTests extends LocalExporterIntegTestCase {
|
|||
checkMonitoringDocs();
|
||||
}
|
||||
|
||||
// monitoring service is started
|
||||
exporterSettings = Settings.builder()
|
||||
.put(MonitoringService.INTERVAL.getKey(), 3L, TimeUnit.SECONDS);
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(exporterSettings));
|
||||
|
||||
final int numNodes = internalCluster().getNodeNames().length;
|
||||
assertBusy(() -> {
|
||||
assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
|
||||
|
|
|
@ -113,12 +113,23 @@ public class RestMonitoringBulkActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testNoErrors() throws Exception {
|
||||
final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong());
|
||||
final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong(), false);
|
||||
final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s");
|
||||
final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response);
|
||||
|
||||
assertThat(restResponse.status(), is(RestStatus.OK));
|
||||
assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"errors\":false}"));
|
||||
assertThat(restResponse.content().utf8ToString(),
|
||||
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":false}"));
|
||||
}
|
||||
|
||||
public void testNoErrorsButIgnored() throws Exception {
|
||||
final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong(), true);
|
||||
final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s");
|
||||
final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response);
|
||||
|
||||
assertThat(restResponse.status(), is(RestStatus.ACCEPTED));
|
||||
assertThat(restResponse.content().utf8ToString(),
|
||||
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":true,\"errors\":false}"));
|
||||
}
|
||||
|
||||
public void testWithErrors() throws Exception {
|
||||
|
@ -137,7 +148,7 @@ public class RestMonitoringBulkActionTests extends ESTestCase {
|
|||
|
||||
assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR));
|
||||
assertThat(restResponse.content().utf8ToString(),
|
||||
is("{\"took\":" + response.getTookInMillis() + ",\"errors\":true,\"error\":" + errorJson + "}"));
|
||||
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":true,\"error\":" + errorJson + "}"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue