Log any errors in reindex task

Does so by introducing TaskListener which is just like ActionListener but
gets the Task as each parameter. Unlike ActionListener which is used
_everywhere_ you can only use TaskListener directly with TransportAction.
TransportAction under the covers uses an ActionListener implemetation that
closes over the task to call the TaskListener.
This commit is contained in:
Nik Everett 2016-01-29 10:33:08 -05:00
parent 0aeeef87d2
commit ab79ff73af
6 changed files with 138 additions and 69 deletions

View File

@ -21,18 +21,16 @@ package org.elasticsearch.action;
/**
* A listener for action responses or failures.
*
*
*/
public interface ActionListener<Response> {
/**
* A response handler.
* Handle action response. This response may constitute a failure or a
* success but it is up to the listener to make that decision.
*/
void onResponse(Response response);
/**
* A failure handler.
* A failure caused by an exception at some phase of the task.
*/
void onFailure(Throwable e);
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
/**
* An ActionListener that does nothing. Used when we need a listener but don't
* care to listen for the result.
*/
public final class NoopActionListener<Response> implements ActionListener<Response> {
/**
* Get the instance of NoopActionListener cast appropriately.
*/
@SuppressWarnings("unchecked") // Safe because we do nothing with the type.
public static <Response> ActionListener<Response> instance() {
return (ActionListener<Response>) INSTANCE;
}
private static final NoopActionListener<Object> INSTANCE = new NoopActionListener<Object>();
private NoopActionListener() {
}
@Override
public void onResponse(Response response) {
}
@Override
public void onFailure(Throwable e) {
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
@ -67,24 +68,38 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
public final Task execute(Request request, ActionListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
return execute(request, new TaskListener<Response>() {
@Override
public void onResponse(Task task, Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
@Override
public void onFailure(Task task, Throwable e) {
listener.onFailure(e);
}
});
}
public final Task execute(Request request, TaskListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (task != null) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
listener.onResponse(task, response);
}
@Override
public void onFailure(Throwable e) {
if (task != null) {
taskManager.unregister(task);
}
listener.onFailure(task, e);
}
});
return task;
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
/**
* An TaskListener that just logs the response. Used when we need a listener but
* don't care to listen for the result.
*/
public final class LoggingTaskListener<Response> implements TaskListener<Response> {
private final static ESLogger logger = Loggers.getLogger(LoggingTaskListener.class);
/**
* Get the instance of NoopActionListener cast appropriately.
*/
@SuppressWarnings("unchecked") // Safe because we only toString the response
public static <Response> TaskListener<Response> instance() {
return (TaskListener<Response>) INSTANCE;
}
private static final LoggingTaskListener<Object> INSTANCE = new LoggingTaskListener<Object>();
private LoggingTaskListener() {
}
@Override
public void onResponse(Task task, Response response) {
logger.info("{} finished with response {}", task.getId(), response);
}
@Override
public void onFailure(Task task, Throwable e) {
logger.warn("{} failed with exception", e, task.getId());
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
/**
* Listener for Task success or failure.
*/
public interface TaskListener<Response> {
/**
* Handle task response. This response may constitute a failure or a success
* but it is up to the listener to make that decision.
*
* @param task
* the task being executed. May be null if the action doesn't
* create a task
* @param response
* the response from the action that executed the task
*/
void onResponse(Task task, Response response);
/**
* A failure caused by an exception at some phase of the task.
*
* @param task
* the task being executed. May be null if the action doesn't
* create a task
* @param e
* the failure
*/
void onFailure(Task task, Throwable e);
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.NoopActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
@ -36,6 +35,7 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse, TA extends TransportAction<Request, Response>>
@ -67,7 +67,7 @@ public abstract class AbstractBaseReindexRestHandler<Request extends ActionReque
channel.sendResponse(new BytesRestResponse(channel, validationException));
return;
}
Task task = action.execute(internalRequest, NoopActionListener.instance());
Task task = action.execute(internalRequest, LoggingTaskListener.instance());
sendTask(channel, task);
}