Allow for plugins to register REST filter (better support with async execution and some renaming), closes #1658.

This commit is contained in:
Shay Banon 2012-02-02 20:19:15 +02:00
parent a784f9fcdf
commit 81e6ff5162
4 changed files with 171 additions and 61 deletions

View File

@ -51,6 +51,8 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
private final boolean disableSites;
private final PluginSiteFilter pluginSiteFilter = new PluginSiteFilter();
@Inject
public HttpServer(Settings settings, Environment environment, HttpServerTransport transport,
RestController restController,
@ -111,33 +113,33 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
if (request.rawPath().startsWith("/_plugin/")) {
for (RestPreProcessor preProcessor : restController.preProcessors()) {
if (!preProcessor.handleExternal()) {
continue;
}
if (!preProcessor.process(request, channel)) {
return;
}
}
handlePluginSite(request, channel);
RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
filterChain.continueProcessing(request, channel);
return;
}
if (!restController.dispatchRequest(request, channel)) {
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
StringRestResponse response = new StringRestResponse(OK);
channel.sendResponse(response);
} else {
channel.sendResponse(new StringRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
}
restController.dispatchRequest(request, channel);
}
class PluginSiteFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) {
handlePluginSite((HttpRequest) request, (HttpChannel) channel);
}
}
private void handlePluginSite(HttpRequest request, HttpChannel channel) {
void handlePluginSite(HttpRequest request, HttpChannel channel) {
if (disableSites) {
channel.sendResponse(new StringRestResponse(FORBIDDEN));
return;
}
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
StringRestResponse response = new StringRestResponse(OK);
channel.sendResponse(response);
return;
}
if (request.method() != RestRequest.Method.GET) {
channel.sendResponse(new StringRestResponse(FORBIDDEN));
return;

View File

@ -21,6 +21,8 @@ package org.elasticsearch.rest;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.path.PathTrie;
@ -31,6 +33,9 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
*/
@ -43,8 +48,10 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
private final PathTrie<RestHandler> headHandlers = new PathTrie<RestHandler>(RestUtils.REST_DECODER);
private final PathTrie<RestHandler> optionsHandlers = new PathTrie<RestHandler>(RestUtils.REST_DECODER);
private final RestHandlerFilter handlerFilter = new RestHandlerFilter();
// non volatile since the assumption is that pre processors are registered on startup
private RestPreProcessor[] preProcessors = new RestPreProcessor[0];
private RestFilter[] filters = new RestFilter[0];
@Inject
public RestController(Settings settings) {
@ -61,22 +68,25 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
@Override
protected void doClose() throws ElasticSearchException {
for (RestFilter filter : filters) {
filter.close();
}
}
/**
* Registers a pre processor to be executed before the rest request is actually handled.
*/
public synchronized void registerPreProcessor(RestPreProcessor preProcessor) {
RestPreProcessor[] copy = new RestPreProcessor[preProcessors.length + 1];
System.arraycopy(preProcessors, 0, copy, 0, preProcessors.length);
copy[preProcessors.length] = preProcessor;
Arrays.sort(copy, new Comparator<RestPreProcessor>() {
public synchronized void registerFilter(RestFilter preProcessor) {
RestFilter[] copy = new RestFilter[filters.length + 1];
System.arraycopy(filters, 0, copy, 0, filters.length);
copy[filters.length] = preProcessor;
Arrays.sort(copy, new Comparator<RestFilter>() {
@Override
public int compare(RestPreProcessor o1, RestPreProcessor o2) {
public int compare(RestFilter o1, RestFilter o2) {
return o2.order() - o1.order();
}
});
preProcessors = copy;
filters = copy;
}
/**
@ -107,30 +117,55 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
}
}
public RestPreProcessor[] preProcessors() {
return preProcessors;
/**
* Returns a filter chain (if needed) to execute. If this method returns null, simply execute
* as usual.
*/
@Nullable
public RestFilterChain filterChainOrNull(RestFilter executionFilter) {
if (filters.length == 0) {
return null;
}
return new ControllerFilterChain(executionFilter);
}
public boolean dispatchRequest(final RestRequest request, final RestChannel channel) {
try {
for (RestPreProcessor preProcessor : preProcessors) {
if (!preProcessor.process(request, channel)) {
return true;
/**
* Returns a filter chain with the final filter being the provided filter.
*/
public RestFilterChain filterChain(RestFilter executionFilter) {
return new ControllerFilterChain(executionFilter);
}
public void dispatchRequest(final RestRequest request, final RestChannel channel) {
if (filters.length == 0) {
try {
executeHandler(request, channel);
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response for uri [" + request.uri() + "]", e1);
}
}
final RestHandler handler = getHandler(request);
if (handler == null) {
return false;
}
} else {
ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter);
filterChain.continueProcessing(request, channel);
}
}
void executeHandler(RestRequest request, RestChannel channel) {
final RestHandler handler = getHandler(request);
if (handler != null) {
handler.handleRequest(request, channel);
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response for uri [" + request.uri() + "]", e1);
} else {
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
StringRestResponse response = new StringRestResponse(OK);
channel.sendResponse(response);
} else {
channel.sendResponse(new StringRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
}
}
return true;
}
private RestHandler getHandler(RestRequest request) {
@ -159,4 +194,45 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
// my_index/my_type/http%3A%2F%2Fwww.google.com
return request.rawPath();
}
class ControllerFilterChain implements RestFilterChain {
private final RestFilter executionFilter;
private volatile int index;
ControllerFilterChain(RestFilter executionFilter) {
this.executionFilter = executionFilter;
}
@Override
public void continueProcessing(RestRequest request, RestChannel channel) {
try {
int loc = index;
if (loc > filters.length) {
throw new ElasticSearchIllegalStateException("filter continueProcessing was called more than expected");
} else if (loc == filters.length) {
executionFilter.process(request, channel, this);
} else {
RestFilter preProcessor = filters[loc];
preProcessor.process(request, channel, this);
}
index++;
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response for uri [" + request.uri() + "]", e1);
}
}
}
}
class RestHandlerFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) {
executeHandler(request, channel);
}
}
}

View File

@ -19,30 +19,30 @@
package org.elasticsearch.rest;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.CloseableComponent;
/**
* Rest pre processor allowing to pre process REST requests.
* <p/>
* Experimental interface.
* A filter allowing to filter rest operations.
*/
public interface RestPreProcessor {
public abstract class RestFilter implements CloseableComponent {
/**
* Optionally, the order the processor will work on. Execution is done from lowest value to highest.
* It is a good practice to allow to configure this for the relevant processor.
* Optionally, the order of the filter. Execution is done from lowest value to highest.
* It is a good practice to allow to configure this for the relevant filter.
*/
int order();
public int order() {
return 0;
}
@Override
public void close() throws ElasticSearchException {
// a no op
}
/**
* Should this processor also process external (non REST) requests, like plugin site requests.
* Process the rest request. Using the channel to send a response, or the filter chain to continue
* processing the request.
*/
boolean handleExternal();
/**
* Process the request, returning <tt>false</tt> if no further processing should be done. Note,
* make sure to send a response if returning <tt>false</tt>, otherwise, no response will be sent.
* <p/>
* It is recommended that the process method will not do blocking calls, or heavily cache data
* if a blocking call is done.
*/
boolean process(RestRequest request, RestChannel channel);
public abstract void process(RestRequest request, RestChannel channel, RestFilterChain filterChain);
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest;
/**
* A filter chain allowing to continue and process the rest request.
*/
public interface RestFilterChain {
/**
* Continue processing the request. Should only be called if a response has not been sent
* through the channel.
*/
void continueProcessing(RestRequest request, RestChannel channel);
}