From 8f6ceeae16733798cd27ae2384ed58780e2424d0 Mon Sep 17 00:00:00 2001 From: mibo Date: Wed, 8 Jul 2015 20:57:03 +0200 Subject: [PATCH] [OLINGO-708] Minor code clean up in TecAsyncSvc --- .../server/tecsvc/async/AsyncProcessor.java | 194 ++++++++++-------- .../tecsvc/async/TechnicalAsyncService.java | 127 ++++++++---- .../async/TechnicalStatusMonitorServlet.java | 31 +-- 3 files changed, 205 insertions(+), 147 deletions(-) diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java index 99b81d625..e6d662efd 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java @@ -18,6 +18,14 @@ */ package org.apache.olingo.server.tecsvc.async; +import org.apache.olingo.commons.api.http.HttpHeader; +import org.apache.olingo.commons.api.http.HttpStatusCode; +import org.apache.olingo.server.api.ODataApplicationException; +import org.apache.olingo.server.api.ODataLibraryException; +import org.apache.olingo.server.api.ODataRequest; +import org.apache.olingo.server.api.ODataResponse; +import org.apache.olingo.server.api.processor.Processor; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -35,33 +43,35 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import org.apache.olingo.commons.api.http.HttpHeader; -import org.apache.olingo.commons.api.http.HttpStatusCode; -import org.apache.olingo.server.api.ODataApplicationException; -import org.apache.olingo.server.api.ODataLibraryException; -import org.apache.olingo.server.api.ODataRequest; -import org.apache.olingo.server.api.ODataResponse; -import org.apache.olingo.server.api.processor.Processor; - +/** + * Async processor "wraps" an Processor (or subclass of) to provide asynchronous support functionality + * in combination with the TechnicalAsyncService. + * + * @param "wrapped" Processor + */ public class AsyncProcessor { - private final MyInvocationHandler handler; - private final TechnicalAsyncService service; - private final T proxyProcessor; - private String location; - private String preferHeader; + private final ProcessorInvocationHandler handler; + private final TechnicalAsyncService service; + private final T proxyProcessor; + private String location; + private String preferHeader; - private static class MyInvocationHandler implements InvocationHandler { + /** + * InvocationHandler which is used as proxy for the Processor method. + */ + private static class ProcessorInvocationHandler implements InvocationHandler { private final Object wrappedInstance; private Method invokeMethod; private Object[] invokeParameters; + private ODataResponse processResponse; - public MyInvocationHandler(Object wrappedInstance) { + public ProcessorInvocationHandler(Object wrappedInstance) { this.wrappedInstance = wrappedInstance; } @Override public Object invoke(Object o, Method method, Object[] objects) throws Throwable { - if(Processor.class.isAssignableFrom(method.getDeclaringClass())) { + if (Processor.class.isAssignableFrom(method.getDeclaringClass())) { invokeMethod = method; invokeParameters = objects; } @@ -69,8 +79,6 @@ public class AsyncProcessor { return null; } - ODataResponse processResponse; - public Object process() throws InvocationTargetException, IllegalAccessException { processResponse = new ODataResponse(); replaceInvokeParameter(processResponse); @@ -82,7 +90,7 @@ public class AsyncProcessor { } public

void replaceInvokeParameter(P replacement) { - if(replacement == null) { + if (replacement == null) { return; } @@ -97,81 +105,107 @@ public class AsyncProcessor { invokeParameters = copy.toArray(); } - public ODataResponse getProcessResponse() { return processResponse; } + + Object getWrappedInstance() { + return this.wrappedInstance; + } } public AsyncProcessor(T processor, Class processorInterface, TechnicalAsyncService service) { - Class aClass = processor.getClass(); - Class[] interfaces = aClass.getInterfaces(); - handler = new MyInvocationHandler(processor); - Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler); - proxyProcessor = processorInterface.cast(proxyInstance); - this.service = service; - } + Class aClass = processor.getClass(); + Class[] interfaces = aClass.getInterfaces(); + handler = new ProcessorInvocationHandler(processor); + Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler); + proxyProcessor = processorInterface.cast(proxyInstance); + this.service = service; + } - public T prepareFor() { - return proxyProcessor; - } + public T prepareFor() { + return proxyProcessor; + } - public ODataRequest getRequest() { - return getParameter(ODataRequest.class); - } + public ODataRequest getRequest() { + return getParameter(ODataRequest.class); + } - public ODataResponse getResponse() { - return getParameter(ODataResponse.class); - } + public ODataResponse getResponse() { + return getParameter(ODataResponse.class); + } - public ODataResponse getProcessResponse() { - return handler.getProcessResponse(); - } + public ODataResponse getProcessResponse() { + return handler.getProcessResponse(); + } - private

P getParameter(Class

parameterClass) { - for (Object parameter : handler.getInvokeParameters()) { - if (parameter != null && parameterClass == parameter.getClass()) { - return parameterClass.cast(parameter); - } + private

P getParameter(Class

parameterClass) { + for (Object parameter : handler.getInvokeParameters()) { + if (parameter != null && parameterClass == parameter.getClass()) { + return parameterClass.cast(parameter); } - return null; } + return null; + } - public String processAsync() throws ODataApplicationException, ODataLibraryException { - preferHeader = getRequest().getHeader(HttpHeader.PREFER); - ODataRequest request = copyRequest(getRequest()); - handler.replaceInvokeParameter(request); - handler.replaceInvokeParameter(new ODataResponse()); - return service.processAsynchronous(this); - } + public String getPreferHeader() { + return preferHeader; + } - Object process() throws InvocationTargetException, IllegalAccessException { - return handler.process(); - } + public String getLocation() { + return location; + } - private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException { - ODataRequest req = new ODataRequest(); - req.setBody(copyRequestBody(request)); - req.setMethod(request.getMethod()); - req.setRawBaseUri(request.getRawBaseUri()); - req.setRawODataPath(request.getRawODataPath()); - req.setRawQueryPath(request.getRawQueryPath()); - req.setRawRequestUri(request.getRawRequestUri()); - req.setRawServiceResolutionUri(request.getRawServiceResolutionUri()); + public Class getProcessorClass() { + return handler.getWrappedInstance().getClass(); + } - for (Map.Entry> header : request.getAllHeaders().entrySet()) { - if(HttpHeader.PREFER.toLowerCase().equals( - header.getKey().toLowerCase())) { - preferHeader = header.getValue().get(0); - } else { - req.addHeader(header.getKey(), header.getValue()); - } + /** + * Start the asynchronous processing and returns the id for this process + * + * @return the id for this process + * @throws ODataApplicationException + * @throws ODataLibraryException + */ + public String processAsync() throws ODataApplicationException, ODataLibraryException { + preferHeader = getRequest().getHeader(HttpHeader.PREFER); + ODataRequest request = copyRequest(getRequest()); + handler.replaceInvokeParameter(request); + handler.replaceInvokeParameter(new ODataResponse()); + return service.processAsynchronous(this); + } + + Object process() throws InvocationTargetException, IllegalAccessException { + return handler.process(); + } + + void setLocation(String loc) { + this.location = loc; + } + + private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException { + ODataRequest req = new ODataRequest(); + req.setBody(copyRequestBody(request)); + req.setMethod(request.getMethod()); + req.setRawBaseUri(request.getRawBaseUri()); + req.setRawODataPath(request.getRawODataPath()); + req.setRawQueryPath(request.getRawQueryPath()); + req.setRawRequestUri(request.getRawRequestUri()); + req.setRawServiceResolutionUri(request.getRawServiceResolutionUri()); + + for (Map.Entry> header : request.getAllHeaders().entrySet()) { + if (HttpHeader.PREFER.toLowerCase().equals( + header.getKey().toLowerCase())) { + preferHeader = header.getValue().get(0); + } else { + req.addHeader(header.getKey(), header.getValue()); } - - return req; } + return req; + } + private InputStream copyRequestBody(ODataRequest request) throws ODataApplicationException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); InputStream input = request.getBody(); @@ -193,16 +227,4 @@ public class AsyncProcessor { } return null; } - - public String getPreferHeader() { - return preferHeader; - } - - public String getLocation() { - return location; - } - - void setLocation(String loc) { - this.location = loc; - } - } +} diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java index 6a1cedec6..a7cfedf95 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java @@ -18,20 +18,6 @@ */ package org.apache.olingo.server.tecsvc.async; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import org.apache.olingo.commons.api.ODataRuntimeException; import org.apache.olingo.commons.api.format.ContentType; import org.apache.olingo.commons.api.http.HttpHeader; @@ -46,17 +32,57 @@ import org.apache.olingo.server.api.serializer.SerializerException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +/** + * The TechnicalAsyncService provides asynchronous support for any Processor. + * To use it following steps are necessary: + *

    + *
  • Get the instance
  • + *
  • Create an instance of the Processor which should be wrapped for asynchronous support + * (do not forget to call the init(...) method on the processor)
  • + *
  • register the Processor instance via the register(...) method
  • + *
  • prepare the corresponding method with the request parameters via the + * prepareFor() method at the AsyncProcessor
  • + *
  • start the async processing via the processAsync() methods
  • + *
+ * A short code snippet is shown below: + *

+ * + * TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + * TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata); + * processor.init(odata, serviceMetadata); + * AsyncProcessor asyncProcessor = asyncService.register(processor, EntityProcessor.class); + * asyncProcessor.prepareFor().readEntity(request, response, uriInfo, requestedFormat); + * String location = asyncProcessor.processAsync(); + * + *

+ */ public class TechnicalAsyncService { public static final String TEC_ASYNC_SLEEP = "tec.sleep"; + public static final String STATUS_MONITOR_TOKEN = "status"; private static final Map LOCATION_2_ASYNC_RUNNER = Collections.synchronizedMap(new HashMap()); private static final ExecutorService ASYNC_REQUEST_EXECUTOR = Executors.newFixedThreadPool(10); private static final AtomicInteger ID_GENERATOR = new AtomicInteger(); - public static final String STATUS_MONITOR_TOKEN = "status"; - public AsyncProcessor register(T processor, Class processorInterface) { return new AsyncProcessor(processor, processorInterface, this); @@ -73,7 +99,6 @@ public class TechnicalAsyncService { updateHeader(response, HttpStatusCode.ACCEPTED, location); } - private static final class AsyncProcessorHolder { private static final TechnicalAsyncService INSTANCE = new TechnicalAsyncService(); } @@ -116,12 +141,27 @@ public class TechnicalAsyncService { } else { response.setStatus(HttpStatusCode.ACCEPTED.getStatusCode()); response.setHeader(HttpHeader.LOCATION, location); - String content = "In progress for async location = " + location; - writeToResponse(response, content); } } } + public void listQueue(HttpServletResponse response) { + StringBuilder sb = new StringBuilder(); + sb.append("

Queued requests

    "); + for (Map.Entry entry : LOCATION_2_ASYNC_RUNNER.entrySet()) { + AsyncProcessor asyncProcessor = entry.getValue().getDispatched(); + sb.append("
  • ID: ").append(entry.getKey()).append("
    ") + .append("Location: ").append(asyncProcessor.getLocation()).append("
    ") + .append("Processor: ").append(asyncProcessor.getProcessorClass().getSimpleName()).append("
    ") + .append("Finished: ").append(entry.getValue().isFinished()).append("
    ") + .append("
  • "); + } + sb.append("
"); + + writeToResponse(response, sb.toString()); + } + + private static void writeToResponse(HttpServletResponse response, InputStream input) throws IOException { copy(input, response.getOutputStream()); } @@ -154,36 +194,47 @@ public class TechnicalAsyncService { writeToResponse(response, odResponseStream); } - static void writeHttpResponse(final ODataResponse odResponse, final HttpServletResponse response) throws IOException { - response.setStatus(odResponse.getStatusCode()); - - for (Map.Entry entry : odResponse.getHeaders().entrySet()) { - response.setHeader(entry.getKey(), entry.getValue()); - } - - copy(odResponse.getContent(), response.getOutputStream()); - } +// static void copy(final InputStream input, final OutputStream output) { +// if(output == null || input == null) { +// return; +// } +// +// try { +// byte[] buffer = new byte[1024]; +// int n; +// while (-1 != (n = input.read(buffer))) { +// output.write(buffer, 0, n); +// } +// } catch (IOException e) { +// throw new ODataRuntimeException(e); +// } finally { +// closeStream(output); +// closeStream(input); +// } +// } static void copy(final InputStream input, final OutputStream output) { - if(output == null || input == null) { + if (output == null || input == null) { return; } try { - byte[] buffer = new byte[1024]; - int n; - while (-1 != (n = input.read(buffer))) { - output.write(buffer, 0, n); + ByteBuffer inBuffer = ByteBuffer.allocate(8192); + ReadableByteChannel ic = Channels.newChannel(input); + WritableByteChannel oc = Channels.newChannel(output); + while (ic.read(inBuffer) > 0) { + inBuffer.flip(); + oc.write(inBuffer); + inBuffer.rewind(); } } catch (IOException e) { - throw new ODataRuntimeException(e); + throw new ODataRuntimeException("Error on reading request content"); } finally { - closeStream(output); closeStream(input); + closeStream(output); } } - private static void closeStream(final Closeable closeable) { if (closeable != null) { try { @@ -194,7 +245,6 @@ public class TechnicalAsyncService { } } - private String createNewAsyncLocation(ODataRequest request) { int pos = request.getRawBaseUri().lastIndexOf("/") + 1; return request.getRawBaseUri().substring(0, pos) + STATUS_MONITOR_TOKEN + "/" + ID_GENERATOR.incrementAndGet(); @@ -204,6 +254,9 @@ public class TechnicalAsyncService { return request.getRequestURL().toString(); } + /** + * Runnable for the AsyncProcessor. + */ private static class AsyncRunner implements Runnable { private final AsyncProcessor dispatched; private int defaultSleepTimeInSeconds = 0; diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java index 8fd22cfc5..6ed37226b 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java @@ -18,33 +18,14 @@ */ package org.apache.olingo.server.tecsvc.async; -import java.io.IOException; -import java.net.URI; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; - -import org.apache.olingo.server.api.OData; -import org.apache.olingo.server.api.ODataHttpHandler; -import org.apache.olingo.server.api.ServiceMetadata; -import org.apache.olingo.server.api.edmx.EdmxReference; -import org.apache.olingo.server.api.edmx.EdmxReferenceInclude; -import org.apache.olingo.server.tecsvc.ETagSupport; -import org.apache.olingo.server.tecsvc.MetadataETagSupport; -import org.apache.olingo.server.tecsvc.data.DataProvider; -import org.apache.olingo.server.tecsvc.processor.TechnicalActionProcessor; -import org.apache.olingo.server.tecsvc.processor.TechnicalBatchProcessor; -import org.apache.olingo.server.tecsvc.processor.TechnicalEntityProcessor; -import org.apache.olingo.server.tecsvc.processor.TechnicalPrimitiveComplexProcessor; -import org.apache.olingo.server.tecsvc.provider.EdmTechProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; public class TechnicalStatusMonitorServlet extends HttpServlet { @@ -55,8 +36,10 @@ public class TechnicalStatusMonitorServlet extends HttpServlet { protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { try { - if(TechnicalAsyncService.getInstance().isStatusMonitorResource(request)) { - TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance(); + if("/list".equals(request.getPathInfo())) { + asyncService.listQueue(response); + } else if(asyncService.isStatusMonitorResource(request)) { asyncService.handle(request, response); } } catch (final Exception e) {