SOLR-14942: Move request registration to ContentStreamHandlerBase (#2112)

This addresses review feedback from David Smiley on Jira. It moves the request registration to the ContentStreamHandlerBase class instead of doing a hack-ish instanceof check inside HttpSolrCall.
This commit is contained in:
Shalin Shekhar Mangar 2020-12-02 10:11:23 +05:30 committed by GitHub
parent feb897a962
commit d99c1667a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 72 deletions

View File

@ -22,6 +22,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.loader.ContentStreamLoader; import org.apache.solr.handler.loader.ContentStreamLoader;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.update.processor.UpdateRequestProcessorChain;
@ -47,38 +48,54 @@ public abstract class ContentStreamHandlerBase extends RequestHandlerBase {
@Override @Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams(); /*
UpdateRequestProcessorChain processorChain = We track update requests so that we can preserve consistency by waiting for them to complete
req.getCore().getUpdateProcessorChain(params); on a node shutdown and then immediately trigger a leader election without waiting for the core to close.
See how the SolrCoreState#pauseUpdatesAndAwaitInflightRequests() method is used in CoreContainer#shutdown()
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
Also see https://issues.apache.org/jira/browse/SOLR-14942 for details on why we do not care for
other kinds of requests.
*/
SolrCoreState solrCoreState = req.getCore().getSolrCoreState();
if (!solrCoreState.registerInFlightUpdate()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Updates are temporarily paused for core: " + req.getCore().getName());
}
try { try {
ContentStreamLoader documentLoader = newLoader(req, processor); SolrParams params = req.getParams();
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessorChain(params);
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
try {
ContentStreamLoader documentLoader = newLoader(req, processor);
Iterable<ContentStream> streams = req.getContentStreams(); Iterable<ContentStream> streams = req.getContentStreams();
if (streams == null) { if (streams == null) {
if (!RequestHandlerUtils.handleCommit(req, processor, params, false) && !RequestHandlerUtils.handleRollback(req, processor, params, false)) { if (!RequestHandlerUtils.handleCommit(req, processor, params, false) && !RequestHandlerUtils.handleRollback(req, processor, params, false)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
}
} else {
for (ContentStream stream : streams) {
documentLoader.load(req, rsp, stream, processor);
}
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit(req, processor, params, false);
RequestHandlerUtils.handleRollback(req, processor, params, false);
} }
} else { } finally {
// finish the request
for (ContentStream stream : streams) { try {
documentLoader.load(req, rsp, stream, processor); processor.finish();
} finally {
processor.close();
} }
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit(req, processor, params, false);
RequestHandlerUtils.handleRollback(req, processor, params, false);
} }
} finally { } finally {
// finish the request solrCoreState.deregisterInFlightUpdate();
try {
processor.finish();
} finally {
processor.close();
}
} }
} }

View File

@ -86,7 +86,6 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ContentStreamHandlerBase; import org.apache.solr.handler.ContentStreamHandlerBase;
import org.apache.solr.handler.UpdateRequestHandler;
import org.apache.solr.logging.MDCLoggingContext; import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
@ -551,57 +550,39 @@ public class HttpSolrCall {
remoteQuery(coreUrl + path, resp); remoteQuery(coreUrl + path, resp);
return RETURN; return RETURN;
case PROCESS: case PROCESS:
/* final Method reqMethod = Method.getMethod(req.getMethod());
We track update requests so that we can preserve consistency by waiting for them to complete HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
on a node shutdown and then immediately trigger a leader election without waiting for the core to close. // unless we have been explicitly told not to, do cache validation
See how the SolrCoreState#pauseUpdatesAndAwaitInflightRequests() method is used in CoreContainer#shutdown() // if we fail cache validation, execute the query
if (config.getHttpCachingConfig().isNever304() ||
Also see https://issues.apache.org/jira/browse/SOLR-14942 for details on why we do not care for !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
other kinds of requests. SolrQueryResponse solrRsp = new SolrQueryResponse();
*/ /* even for HEAD requests, we need to execute the handler to
if (handler instanceof UpdateRequestHandler && !core.getSolrCoreState().registerInFlightUpdate()) { * ensure we don't get an error (and to make sure the correct
throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporarily paused for core: " + core.getName()); * QueryResponseWriter is selected and we get the correct
} * Content-Type)
try { */
final Method reqMethod = Method.getMethod(req.getMethod()); SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod); mustClearSolrRequestInfo = true;
// unless we have been explicitly told not to, do cache validation execute(solrRsp);
// if we fail cache validation, execute the query if (shouldAudit()) {
if (config.getHttpCachingConfig().isNever304() || EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) { if (shouldAudit(eventType)) {
SolrQueryResponse solrRsp = new SolrQueryResponse(); cores.getAuditLoggerPlugin().doAudit(
/* even for HEAD requests, we need to execute the handler to new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
* ensure we don't get an error (and to make sure the correct
* QueryResponseWriter is selected and we get the correct
* Content-Type)
*/
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
mustClearSolrRequestInfo = true;
execute(solrRsp);
if (shouldAudit()) {
EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
if (shouldAudit(eventType)) {
cores.getAuditLoggerPlugin().doAudit(
new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
}
} }
HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
while (headers.hasNext()) {
Map.Entry<String, String> entry = headers.next();
resp.addHeader(entry.getKey(), entry.getValue());
}
QueryResponseWriter responseWriter = getResponseWriter();
if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
writeResponse(solrRsp, responseWriter, reqMethod);
} }
return RETURN; HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
} finally { Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
if (handler instanceof UpdateRequestHandler) { while (headers.hasNext()) {
// every registered request must also be de-registered Map.Entry<String, String> entry = headers.next();
core.getSolrCoreState().deregisterInFlightUpdate(); resp.addHeader(entry.getKey(), entry.getValue());
} }
QueryResponseWriter responseWriter = getResponseWriter();
if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
writeResponse(solrRsp, responseWriter, reqMethod);
} }
return RETURN;
default: return action; default: return action;
} }
} catch (Throwable ex) { } catch (Throwable ex) {