diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataRequest.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataRequest.java index d18f67ecc..ea4874946 100644 --- a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataRequest.java +++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataRequest.java @@ -20,6 +20,7 @@ package org.apache.olingo.server.api; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -97,6 +98,15 @@ public class ODataRequest { return values == null ? null : values.get(0); } + /** + * Gets all headers. + * @return an unmodifiable Map of header names/values + */ + public Map> getAllHeaders() { + return Collections.unmodifiableMap(headers); + } + + /** * Gets the body of the request. * @return the request payload as {@link InputStream} or null diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServlet.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServlet.java index 9c1d50286..c88e0e7a0 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServlet.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServlet.java @@ -35,6 +35,7 @@ import org.apache.olingo.server.api.ODataHttpHandler; import org.apache.olingo.server.api.ServiceMetadata; import org.apache.olingo.server.api.edmx.EdmxReference; import org.apache.olingo.server.api.edmx.EdmxReferenceInclude; +import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService; import org.apache.olingo.server.tecsvc.data.DataProvider; import org.apache.olingo.server.tecsvc.processor.TechnicalActionProcessor; import org.apache.olingo.server.tecsvc.processor.TechnicalBatchProcessor; @@ -63,6 +64,12 @@ public class TechnicalServlet extends HttpServlet { protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { try { + if(TechnicalAsyncService.getInstance().isStatusMonitorResource(request)) { + TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + asyncService.handle(request, response); + return; + } + OData odata = OData.newInstance(); EdmxReference reference = new EdmxReference(URI.create("../v4.0/cs02/vocabularies/Org.OData.Core.V1.xml")); reference.addInclude(new EdmxReferenceInclude("Org.OData.Core.V1", "Core")); diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServletContextListener.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServletContextListener.java new file mode 100644 index 000000000..71aef570b --- /dev/null +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/TechnicalServletContextListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.server.tecsvc; + +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService; + +/** + */ +public class TechnicalServletContextListener implements ServletContextListener { + @Override + public void contextInitialized(ServletContextEvent servletContextEvent) { + } + + @Override + public void contextDestroyed(ServletContextEvent servletContextEvent) { + TechnicalAsyncService.getInstance().shutdownThreadPool(); + } +} diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java new file mode 100644 index 000000000..5f3fb103b --- /dev/null +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.server.tecsvc.async; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.olingo.commons.api.ODataRuntimeException; +import org.apache.olingo.commons.api.http.HttpHeader; +import org.apache.olingo.commons.api.http.HttpStatusCode; +import org.apache.olingo.server.api.ODataApplicationException; +import org.apache.olingo.server.api.ODataLibraryException; +import org.apache.olingo.server.api.ODataRequest; +import org.apache.olingo.server.api.ODataResponse; +import org.apache.olingo.server.api.processor.Processor; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class TechnicalAsyncService { + + public static final String TEC_ASYNC_SLEEP = "tec.sleep"; + + private static final Map LOCATION_2_ASYNC_RUNNER = + Collections.synchronizedMap(new HashMap()); + private static final ExecutorService ASYNC_REQUEST_EXECUTOR = Executors.newFixedThreadPool(10); + private static final AtomicInteger ID_GENERATOR = new AtomicInteger(); + public static final String STATUS_MONITOR_TOKEN = "async"; + + private static class MyInvocationHandler implements InvocationHandler { + private final Object wrappedInstance; + private Method invokeMethod; + private Object[] invokeParameters; + + public MyInvocationHandler(Object wrappedInstance) { + this.wrappedInstance = wrappedInstance; + } + + @Override + public Object invoke(Object o, Method method, Object[] objects) throws Throwable { + if(Processor.class.isAssignableFrom(method.getDeclaringClass())) { + invokeMethod = method; + invokeParameters = objects; + } + + return null; + } + + ODataResponse processResponse; + + public Object process() throws InvocationTargetException, IllegalAccessException { + processResponse = new ODataResponse(); + replaceInvokeParameter(processResponse); + return invokeMethod.invoke(wrappedInstance, invokeParameters); + } + + public Object[] getInvokeParameters() { + return invokeParameters; + } + + public

void replaceInvokeParameter(P replacement) { + if(replacement == null) { + return; + } + + List copy = new ArrayList(); + for (Object parameter : invokeParameters) { + if (replacement.getClass() == parameter.getClass()) { + copy.add(replacement); + } else { + copy.add(parameter); + } + } + invokeParameters = copy.toArray(); + } + + private

P getParameter(Class

parameterClass) { + for (Object parameter : invokeParameters) { + if (parameter != null && parameterClass == parameter.getClass()) { + return parameterClass.cast(parameter); + } + } + return null; + } + + public ODataResponse getProcessResponse() { + return processResponse; + } + } + + public class AsyncProcessor { + private final MyInvocationHandler handler; + private final TechnicalAsyncService service; + private final Object proxyInstance; + private final T proxyProcessor; + private String location; + private String preferHeader; + + public AsyncProcessor(T processor, Class processorInterface, TechnicalAsyncService service) { + Class aClass = processor.getClass(); + Class[] interfaces = aClass.getInterfaces(); + handler = new MyInvocationHandler(processor); + proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler); + proxyProcessor = processorInterface.cast(proxyInstance); + this.service = service; + } + + public T prepareFor() { + return proxyProcessor; + } + + public ODataRequest getRequest() { + return getParameter(ODataRequest.class); + } + + public ODataResponse getResponse() { + return getParameter(ODataResponse.class); + } + + public ODataResponse getProcessResponse() { + return handler.getProcessResponse(); + } + + private

P getParameter(Class

parameterClass) { + for (Object parameter : handler.getInvokeParameters()) { + if (parameter != null && parameterClass == parameter.getClass()) { + return parameterClass.cast(parameter); + } + } + return null; + } + + public String processAsync() throws ODataApplicationException, ODataLibraryException { + preferHeader = getRequest().getHeader(HttpHeader.PREFER); + ODataRequest request = copyRequest(getRequest()); + handler.replaceInvokeParameter(request); + handler.replaceInvokeParameter(new ODataResponse()); + return service.processAsynchronous(this); + } + + private Object process() throws InvocationTargetException, IllegalAccessException { + return handler.process(); + } + + private ODataRequest copyRequest(ODataRequest request) { + ODataRequest req = new ODataRequest(); + req.setBody(request.getBody()); + req.setMethod(request.getMethod()); + req.setRawBaseUri(request.getRawBaseUri()); + req.setRawODataPath(request.getRawODataPath()); + req.setRawQueryPath(request.getRawQueryPath()); + req.setRawRequestUri(request.getRawRequestUri()); + req.setRawServiceResolutionUri(request.getRawServiceResolutionUri()); + + for (Map.Entry> header : request.getAllHeaders().entrySet()) { + if(HttpHeader.PREFER.toLowerCase().equals( + header.getKey().toLowerCase())) { + preferHeader = header.getValue().get(0); + } else { + req.addHeader(header.getKey(), header.getValue()); + } + } + + return req; + } + + public String getPreferHeader() { + return preferHeader; + } + + public String getLocation() { + return location; + } + + private void setLocation(String loc) { + this.location = loc; + } + } + + + public AsyncProcessor register(T processor, Class processorInterface) { + return new AsyncProcessor(processor, processorInterface, this); + } + + + private static final class AsyncProcessorHolder { + private static final TechnicalAsyncService INSTANCE = new TechnicalAsyncService(); + } + + public static TechnicalAsyncService getInstance() { + return AsyncProcessorHolder.INSTANCE; + } + + public void shutdownThreadPool() { + ASYNC_REQUEST_EXECUTOR.shutdown(); + } + + public boolean isStatusMonitorResource(HttpServletRequest request) { + return request.getRequestURI() != null && request.getRequestURI().contains(STATUS_MONITOR_TOKEN); + } + + private String processAsynchronous(AsyncProcessor dispatchedProcessor) + throws ODataApplicationException, ODataLibraryException { + // use executor thread pool + String location = createNewAsyncLocation(dispatchedProcessor.getRequest()); + dispatchedProcessor.setLocation(location); + AsyncRunner run = new AsyncRunner(dispatchedProcessor); + LOCATION_2_ASYNC_RUNNER.put(location, run); + ASYNC_REQUEST_EXECUTOR.execute(run); + // + return location; + } + + public void status(ODataRequest request, ODataResponse response) + throws ODataApplicationException, ODataLibraryException { + + } + + public void cancel(ODataRequest request, ODataResponse response) + throws ODataApplicationException, ODataLibraryException { + + } + + public void handle(HttpServletRequest request, HttpServletResponse response) { + String location = getAsyncLocation(request); + AsyncRunner runner = LOCATION_2_ASYNC_RUNNER.get(location); + + if(runner == null) { + response.setStatus(HttpStatusCode.NOT_FOUND.getStatusCode()); + } else { + if(runner.isFinished()) { + ODataResponse wrapResult = runner.getDispatched().getProcessResponse(); + convertToHttp(response, wrapResult); + LOCATION_2_ASYNC_RUNNER.remove(location); + } else { + response.setStatus(HttpStatusCode.ACCEPTED.getStatusCode()); + response.setHeader(HttpHeader.LOCATION, location); + String content = "In progress for async location = " + location; + writeToResponse(response, content); + } + } + } + + private void writeToResponse(HttpServletResponse response, String content) { + OutputStream output = null; + try { + output = response.getOutputStream(); + output.write(content.getBytes()); + } catch (IOException e) { + throw new ODataRuntimeException(e); + } finally { + closeStream(output); + } + } + + static void convertToHttp(final HttpServletResponse response, final ODataResponse odResponse) { + response.setStatus(odResponse.getStatusCode()); + + for (Map.Entry entry : odResponse.getHeaders().entrySet()) { + response.setHeader(entry.getKey(), entry.getValue()); + } + + InputStream input = odResponse.getContent(); + if (input != null) { + OutputStream output = null; + try { + output = response.getOutputStream(); + byte[] buffer = new byte[1024]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + } catch (IOException e) { + throw new ODataRuntimeException(e); + } finally { + closeStream(output); + closeStream(input); + } + } + } + + private static void closeStream(final Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + + private String createNewAsyncLocation(ODataRequest request) { + return request.getRawBaseUri() + "/" + STATUS_MONITOR_TOKEN + request.getRawODataPath() + + "?" + STATUS_MONITOR_TOKEN + "=" + ID_GENERATOR.incrementAndGet(); + } + + private String getAsyncLocation(HttpServletRequest request) { + return "http://localhost:8080" + request.getRequestURI() + "?" + request.getQueryString(); + } + + private String getAsyncQueryPart(ODataRequest request) { + String rawQueryPath = request.getRawQueryPath(); + if(rawQueryPath != null) { + Matcher m = Pattern.compile("(" + STATUS_MONITOR_TOKEN + "=\\d*)").matcher(rawQueryPath); + if(m.find()) { + return m.group(); + } + } + return ""; + } + + private static class AsyncRunner implements Runnable { + private final AsyncProcessor dispatched; + private int defaultSleepTimeInSeconds = 0; + private Exception exception; + boolean finished = false; + + public AsyncRunner(AsyncProcessor wrap) { + this(wrap, 0); + } + + public AsyncRunner(AsyncProcessor wrap, int defaultSleepTimeInSeconds) { + this.dispatched = wrap; + if(defaultSleepTimeInSeconds > 0) { + this.defaultSleepTimeInSeconds = defaultSleepTimeInSeconds; + } + } + + @Override + public void run() { + try { + int sleep = getSleepTime(dispatched); + TimeUnit.SECONDS.sleep(sleep); + dispatched.process(); + } catch (Exception e) { + exception = e; + } + finished = true; + } + + private int getSleepTime(AsyncProcessor wrap) { +// String preferHeader = wrap.getRequest().getHeader(HttpHeader.PREFER); + String preferHeader = wrap.getPreferHeader(); + Matcher matcher = Pattern.compile("(" + TEC_ASYNC_SLEEP + + "=)(\\d*)").matcher(preferHeader); + if (matcher.find()) { + String waitTimeAsString = matcher.group(2); + return Integer.parseInt(waitTimeAsString); + } + return defaultSleepTimeInSeconds; + } + + public Exception getException() { + return exception; + } + + public boolean isFinished() { + return finished; + } + + public AsyncProcessor getDispatched() { + return dispatched; + } + } +} diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java index 9844d71ae..de332c1a4 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java @@ -60,6 +60,7 @@ import org.apache.olingo.server.api.uri.queryoption.CountOption; import org.apache.olingo.server.api.uri.queryoption.ExpandOption; import org.apache.olingo.server.api.uri.queryoption.IdOption; import org.apache.olingo.server.api.uri.queryoption.SelectOption; +import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService; import org.apache.olingo.server.tecsvc.data.DataProvider; import org.apache.olingo.server.tecsvc.data.RequestValidator; import org.apache.olingo.server.tecsvc.processor.queryoptions.ExpandSystemQueryOptionHandler; @@ -143,6 +144,25 @@ public class TechnicalEntityProcessor extends TechnicalProcessor HttpStatusCode.NOT_IMPLEMENTED.getStatusCode(), Locale.ROOT); } checkRequestFormat(requestFormat); + + // + if(odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).hasRespondAsync()) { + TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata); + processor.init(odata, serviceMetadata); + TechnicalAsyncService.AsyncProcessor asyncProcessor = + asyncService.register(processor, EntityProcessor.class); + asyncProcessor.prepareFor().createEntity(request, response, uriInfo, requestFormat, responseFormat); + String location = asyncProcessor.processAsync(); + // + response.setStatusCode(HttpStatusCode.ACCEPTED.getStatusCode()); + response.setHeader(HttpHeader.LOCATION, location); + // + return; + } + // + + final UriResourceEntitySet resourceEntitySet = (UriResourceEntitySet) uriInfo.getUriResourceParts().get(0); final EdmEntitySet edmEntitySet = resourceEntitySet.getEntitySet(); final EdmEntityType edmEntityType = edmEntitySet.getEntityType(); @@ -167,7 +187,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor final Return returnPreference = odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).getReturn(); if (returnPreference == null || returnPreference == Return.REPRESENTATION) { response.setContent(serializeEntity(entity, edmEntitySet, edmEntityType, responseFormat, expand, null) - .getContent()); + .getContent()); response.setHeader(HttpHeader.CONTENT_TYPE, responseFormat.toContentTypeString()); response.setStatusCode(HttpStatusCode.CREATED.getStatusCode()); } else { @@ -217,13 +237,13 @@ public class TechnicalEntityProcessor extends TechnicalProcessor request.getRawBaseUri()).validate(edmEntitySet, changedEntity); dataProvider.update(request.getRawBaseUri(), edmEntitySet, entity, changedEntity, - request.getMethod() == HttpMethod.PATCH, false); + request.getMethod() == HttpMethod.PATCH, false); final Return returnPreference = odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).getReturn(); if (returnPreference == null || returnPreference == Return.REPRESENTATION) { response.setStatusCode(HttpStatusCode.OK.getStatusCode()); response.setContent(serializeEntity(entity, edmEntitySet, edmEntityType, responseFormat, null, null) - .getContent()); + .getContent()); response.setHeader(HttpHeader.CONTENT_TYPE, responseFormat.toContentTypeString()); } else { response.setStatusCode(HttpStatusCode.NO_CONTENT.getStatusCode()); @@ -337,7 +357,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor final Entity entity = readEntity(uriInfo, true); final UriResourceNavigation navigationProperty = getLastNavigation(uriInfo); dataProvider.createReference(entity, navigationProperty.getProperty(), references.getEntityReferences().get(0), - request.getRawBaseUri()); + request.getRawBaseUri()); response.setStatusCode(HttpStatusCode.NO_CONTENT.getStatusCode()); } @@ -377,6 +397,23 @@ public class TechnicalEntityProcessor extends TechnicalProcessor private void readEntity(final ODataRequest request, final ODataResponse response, final UriInfo uriInfo, final ContentType requestedFormat, final boolean isReference) throws ODataApplicationException, ODataLibraryException { + // + if(odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).hasRespondAsync()) { + TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata); + processor.init(odata, serviceMetadata); + TechnicalAsyncService.AsyncProcessor asyncProcessor = + asyncService.register(processor, EntityProcessor.class); + asyncProcessor.prepareFor().readEntity(request, response, uriInfo, requestedFormat); + String location = asyncProcessor.processAsync(); + // + response.setStatusCode(HttpStatusCode.ACCEPTED.getStatusCode()); + response.setHeader(HttpHeader.LOCATION, location); + // + return; + } + // + final EdmEntitySet edmEntitySet = getEdmEntitySet(uriInfo); final EdmEntityType edmEntityType = edmEntitySet == null ? (EdmEntityType) ((UriResourcePartTyped) uriInfo.getUriResourceParts() diff --git a/lib/server-tecsvc/src/main/webapp/WEB-INF/web.xml b/lib/server-tecsvc/src/main/webapp/WEB-INF/web.xml index 3c68d1644..216d4fdca 100644 --- a/lib/server-tecsvc/src/main/webapp/WEB-INF/web.xml +++ b/lib/server-tecsvc/src/main/webapp/WEB-INF/web.xml @@ -39,4 +39,7 @@ /odata.svc/* + + org.apache.olingo.server.tecsvc.TechnicalServletContextListener +