Handle missing logstash index exceptions (#63753)
This commit updates the APIs in the logstash plugin to handle IndexNotFoundExceptions that are returned by client calls. Until we have the creation of this index in place, we need to handle this case and not let the exception propagate out of the API. Backport of #63698
This commit is contained in:
parent
1dbd3a90ae
commit
4d6daa6e40
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.logstash.action;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse.Result;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -14,6 +15,7 @@ import org.elasticsearch.action.support.WriteRequest;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.logstash.Logstash;
|
||||
|
@ -39,8 +41,17 @@ public class TransportDeletePipelineAction extends HandledTransportAction<Delete
|
|||
.execute(
|
||||
ActionListener.wrap(
|
||||
deleteResponse -> listener.onResponse(new DeletePipelineResponse(deleteResponse.getResult() == Result.DELETED)),
|
||||
listener::onFailure
|
||||
e -> handleFailure(e, listener)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private void handleFailure(Exception e, ActionListener<DeletePipelineResponse> listener) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof IndexNotFoundException) {
|
||||
listener.onResponse(new DeletePipelineResponse(false));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.logstash.action;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
|
@ -23,6 +24,7 @@ import org.elasticsearch.client.OriginSettingClient;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
@ -37,6 +39,7 @@ import java.util.Map;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN;
|
||||
|
||||
public class TransportGetPipelineAction extends HandledTransportAction<GetPipelineRequest, GetPipelineResponse> {
|
||||
|
@ -82,7 +85,7 @@ public class TransportGetPipelineAction extends HandledTransportAction<GetPipeli
|
|||
}
|
||||
};
|
||||
handleSearchResponse(searchResponse, pipelineSources, clearScroll, listener);
|
||||
}, listener::onFailure));
|
||||
}, e -> handleFailure(e, listener)));
|
||||
} else if (request.ids().size() == 1) {
|
||||
client.prepareGet(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, "_doc", request.ids().get(0))
|
||||
.setFetchSource(true)
|
||||
|
@ -96,7 +99,7 @@ public class TransportGetPipelineAction extends HandledTransportAction<GetPipeli
|
|||
} else {
|
||||
listener.onResponse(new GetPipelineResponse(org.elasticsearch.common.collect.Map.of()));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}, e -> handleFailure(e, listener)));
|
||||
} else {
|
||||
MultiGetRequestBuilder requestBuilder = client.prepareMultiGet();
|
||||
for (String id : request.ids()) {
|
||||
|
@ -113,7 +116,16 @@ public class TransportGetPipelineAction extends HandledTransportAction<GetPipeli
|
|||
.collect(Collectors.toMap(GetResponse::getId, GetResponse::getSourceAsBytesRef))
|
||||
)
|
||||
);
|
||||
}, listener::onFailure));
|
||||
}, e -> handleFailure(e, listener)));
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFailure(Exception e, ActionListener<GetPipelineResponse> listener) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof IndexNotFoundException) {
|
||||
listener.onResponse(new GetPipelineResponse(emptyMap()));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.logstash.action;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TransportDeletePipelineActionTests extends ESTestCase {
|
||||
|
||||
public void testDeletePipelineWithMissingIndex() throws Exception {
|
||||
try (Client client = getFailureClient(new IndexNotFoundException("missing .logstash"))) {
|
||||
final TransportDeletePipelineAction action = new TransportDeletePipelineAction(
|
||||
mock(TransportService.class),
|
||||
mock(ActionFilters.class),
|
||||
client
|
||||
);
|
||||
final DeletePipelineRequest request = new DeletePipelineRequest(randomAlphaOfLength(4));
|
||||
final PlainActionFuture<DeletePipelineResponse> future = new PlainActionFuture<>();
|
||||
action.doExecute(null, request, future);
|
||||
assertThat(future.get().isDeleted(), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
private Client getFailureClient(Exception e) {
|
||||
return new NoOpClient(getTestName()) {
|
||||
@Override
|
||||
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
|
||||
ActionType<Response> action,
|
||||
Request request,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
if (randomBoolean()) {
|
||||
listener.onFailure(new RemoteTransportException("failed on other node", e));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -17,14 +17,20 @@ import org.elasticsearch.action.get.GetResponse;
|
|||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.Matchers.anEmptyMap;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -96,6 +102,21 @@ public class TransportGetPipelineActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testMissingIndexHandling() throws Exception {
|
||||
try (Client failureClient = getFailureClient(new IndexNotFoundException("foo"))) {
|
||||
final TransportGetPipelineAction action = new TransportGetPipelineAction(
|
||||
mock(TransportService.class),
|
||||
mock(ActionFilters.class),
|
||||
failureClient
|
||||
);
|
||||
final List<String> pipelines = randomList(0, 10, () -> randomAlphaOfLengthBetween(1, 8));
|
||||
final GetPipelineRequest request = new GetPipelineRequest(pipelines);
|
||||
PlainActionFuture<GetPipelineResponse> future = new PlainActionFuture<>();
|
||||
action.doExecute(null, request, future);
|
||||
assertThat(future.get().pipelines(), anEmptyMap());
|
||||
}
|
||||
}
|
||||
|
||||
private Client getMockClient(ActionResponse response) {
|
||||
return new NoOpClient(getTestName()) {
|
||||
@Override
|
||||
|
@ -109,4 +130,21 @@ public class TransportGetPipelineActionTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Client getFailureClient(Exception e) {
|
||||
return new NoOpClient(getTestName()) {
|
||||
@Override
|
||||
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
|
||||
ActionType<Response> action,
|
||||
Request request,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
if (randomBoolean()) {
|
||||
listener.onFailure(new RemoteTransportException("failed on other node", e));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue