[OLINGO-708] Minor code clean up in TecAsyncSvc

This commit is contained in:
mibo 2015-07-08 20:57:03 +02:00
parent 31e3a8bd0a
commit 8f6ceeae16
3 changed files with 205 additions and 147 deletions

View File

@ -18,6 +18,14 @@
*/
package org.apache.olingo.server.tecsvc.async;
import org.apache.olingo.commons.api.http.HttpHeader;
import org.apache.olingo.commons.api.http.HttpStatusCode;
import org.apache.olingo.server.api.ODataApplicationException;
import org.apache.olingo.server.api.ODataLibraryException;
import org.apache.olingo.server.api.ODataRequest;
import org.apache.olingo.server.api.ODataResponse;
import org.apache.olingo.server.api.processor.Processor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -35,33 +43,35 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.olingo.commons.api.http.HttpHeader;
import org.apache.olingo.commons.api.http.HttpStatusCode;
import org.apache.olingo.server.api.ODataApplicationException;
import org.apache.olingo.server.api.ODataLibraryException;
import org.apache.olingo.server.api.ODataRequest;
import org.apache.olingo.server.api.ODataResponse;
import org.apache.olingo.server.api.processor.Processor;
/**
* Async processor "wraps" an Processor (or subclass of) to provide asynchronous support functionality
* in combination with the TechnicalAsyncService.
*
* @param <T> "wrapped" Processor
*/
public class AsyncProcessor<T extends Processor> {
private final MyInvocationHandler handler;
private final TechnicalAsyncService service;
private final T proxyProcessor;
private String location;
private String preferHeader;
private final ProcessorInvocationHandler handler;
private final TechnicalAsyncService service;
private final T proxyProcessor;
private String location;
private String preferHeader;
private static class MyInvocationHandler implements InvocationHandler {
/**
* InvocationHandler which is used as proxy for the Processor method.
*/
private static class ProcessorInvocationHandler implements InvocationHandler {
private final Object wrappedInstance;
private Method invokeMethod;
private Object[] invokeParameters;
private ODataResponse processResponse;
public MyInvocationHandler(Object wrappedInstance) {
public ProcessorInvocationHandler(Object wrappedInstance) {
this.wrappedInstance = wrappedInstance;
}
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
if(Processor.class.isAssignableFrom(method.getDeclaringClass())) {
if (Processor.class.isAssignableFrom(method.getDeclaringClass())) {
invokeMethod = method;
invokeParameters = objects;
}
@ -69,8 +79,6 @@ public class AsyncProcessor<T extends Processor> {
return null;
}
ODataResponse processResponse;
public Object process() throws InvocationTargetException, IllegalAccessException {
processResponse = new ODataResponse();
replaceInvokeParameter(processResponse);
@ -82,7 +90,7 @@ public class AsyncProcessor<T extends Processor> {
}
public <P> void replaceInvokeParameter(P replacement) {
if(replacement == null) {
if (replacement == null) {
return;
}
@ -97,81 +105,107 @@ public class AsyncProcessor<T extends Processor> {
invokeParameters = copy.toArray();
}
public ODataResponse getProcessResponse() {
return processResponse;
}
Object getWrappedInstance() {
return this.wrappedInstance;
}
}
public AsyncProcessor(T processor, Class<T> processorInterface, TechnicalAsyncService service) {
Class<? extends Processor> aClass = processor.getClass();
Class[] interfaces = aClass.getInterfaces();
handler = new MyInvocationHandler(processor);
Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
proxyProcessor = processorInterface.cast(proxyInstance);
this.service = service;
}
Class<? extends Processor> aClass = processor.getClass();
Class[] interfaces = aClass.getInterfaces();
handler = new ProcessorInvocationHandler(processor);
Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
proxyProcessor = processorInterface.cast(proxyInstance);
this.service = service;
}
public T prepareFor() {
return proxyProcessor;
}
public T prepareFor() {
return proxyProcessor;
}
public ODataRequest getRequest() {
return getParameter(ODataRequest.class);
}
public ODataRequest getRequest() {
return getParameter(ODataRequest.class);
}
public ODataResponse getResponse() {
return getParameter(ODataResponse.class);
}
public ODataResponse getResponse() {
return getParameter(ODataResponse.class);
}
public ODataResponse getProcessResponse() {
return handler.getProcessResponse();
}
public ODataResponse getProcessResponse() {
return handler.getProcessResponse();
}
private <P> P getParameter(Class<P> parameterClass) {
for (Object parameter : handler.getInvokeParameters()) {
if (parameter != null && parameterClass == parameter.getClass()) {
return parameterClass.cast(parameter);
}
private <P> P getParameter(Class<P> parameterClass) {
for (Object parameter : handler.getInvokeParameters()) {
if (parameter != null && parameterClass == parameter.getClass()) {
return parameterClass.cast(parameter);
}
return null;
}
return null;
}
public String processAsync() throws ODataApplicationException, ODataLibraryException {
preferHeader = getRequest().getHeader(HttpHeader.PREFER);
ODataRequest request = copyRequest(getRequest());
handler.replaceInvokeParameter(request);
handler.replaceInvokeParameter(new ODataResponse());
return service.processAsynchronous(this);
}
public String getPreferHeader() {
return preferHeader;
}
Object process() throws InvocationTargetException, IllegalAccessException {
return handler.process();
}
public String getLocation() {
return location;
}
private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
ODataRequest req = new ODataRequest();
req.setBody(copyRequestBody(request));
req.setMethod(request.getMethod());
req.setRawBaseUri(request.getRawBaseUri());
req.setRawODataPath(request.getRawODataPath());
req.setRawQueryPath(request.getRawQueryPath());
req.setRawRequestUri(request.getRawRequestUri());
req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
public Class<?> getProcessorClass() {
return handler.getWrappedInstance().getClass();
}
for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
if(HttpHeader.PREFER.toLowerCase().equals(
header.getKey().toLowerCase())) {
preferHeader = header.getValue().get(0);
} else {
req.addHeader(header.getKey(), header.getValue());
}
/**
* Start the asynchronous processing and returns the id for this process
*
* @return the id for this process
* @throws ODataApplicationException
* @throws ODataLibraryException
*/
public String processAsync() throws ODataApplicationException, ODataLibraryException {
preferHeader = getRequest().getHeader(HttpHeader.PREFER);
ODataRequest request = copyRequest(getRequest());
handler.replaceInvokeParameter(request);
handler.replaceInvokeParameter(new ODataResponse());
return service.processAsynchronous(this);
}
Object process() throws InvocationTargetException, IllegalAccessException {
return handler.process();
}
void setLocation(String loc) {
this.location = loc;
}
private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
ODataRequest req = new ODataRequest();
req.setBody(copyRequestBody(request));
req.setMethod(request.getMethod());
req.setRawBaseUri(request.getRawBaseUri());
req.setRawODataPath(request.getRawODataPath());
req.setRawQueryPath(request.getRawQueryPath());
req.setRawRequestUri(request.getRawRequestUri());
req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
if (HttpHeader.PREFER.toLowerCase().equals(
header.getKey().toLowerCase())) {
preferHeader = header.getValue().get(0);
} else {
req.addHeader(header.getKey(), header.getValue());
}
return req;
}
return req;
}
private InputStream copyRequestBody(ODataRequest request) throws ODataApplicationException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
InputStream input = request.getBody();
@ -193,16 +227,4 @@ public class AsyncProcessor<T extends Processor> {
}
return null;
}
public String getPreferHeader() {
return preferHeader;
}
public String getLocation() {
return location;
}
void setLocation(String loc) {
this.location = loc;
}
}
}

View File

@ -18,20 +18,6 @@
*/
package org.apache.olingo.server.tecsvc.async;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.olingo.commons.api.ODataRuntimeException;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.api.http.HttpHeader;
@ -46,17 +32,57 @@ import org.apache.olingo.server.api.serializer.SerializerException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* The TechnicalAsyncService provides asynchronous support for any Processor.
* To use it following steps are necessary:
* <ul>
* <li>Get the instance</li>
* <li>Create an instance of the Processor which should be wrapped for asynchronous support
* (do not forget to call the <code>init(...)</code> method on the processor)</li>
* <li>register the Processor instance via the <code>register(...)</code> method</li>
* <li>prepare the corresponding method with the request parameters via the
* <code>prepareFor()</code> method at the AsyncProcessor</li>
* <li>start the async processing via the <code>processAsync()</code> methods</li>
* </ul>
* A short code snippet is shown below:
* <p>
* <code>
* TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
* TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata);
* processor.init(odata, serviceMetadata);
* AsyncProcessor<EntityProcessor> asyncProcessor = asyncService.register(processor, EntityProcessor.class);
* asyncProcessor.prepareFor().readEntity(request, response, uriInfo, requestedFormat);
* String location = asyncProcessor.processAsync();
* </code>
* </p>
*/
public class TechnicalAsyncService {
public static final String TEC_ASYNC_SLEEP = "tec.sleep";
public static final String STATUS_MONITOR_TOKEN = "status";
private static final Map<String, AsyncRunner> LOCATION_2_ASYNC_RUNNER =
Collections.synchronizedMap(new HashMap<String, AsyncRunner>());
private static final ExecutorService ASYNC_REQUEST_EXECUTOR = Executors.newFixedThreadPool(10);
private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
public static final String STATUS_MONITOR_TOKEN = "status";
public <T extends Processor> AsyncProcessor<T> register(T processor, Class<T> processorInterface) {
return new AsyncProcessor<T>(processor, processorInterface, this);
@ -73,7 +99,6 @@ public class TechnicalAsyncService {
updateHeader(response, HttpStatusCode.ACCEPTED, location);
}
private static final class AsyncProcessorHolder {
private static final TechnicalAsyncService INSTANCE = new TechnicalAsyncService();
}
@ -116,12 +141,27 @@ public class TechnicalAsyncService {
} else {
response.setStatus(HttpStatusCode.ACCEPTED.getStatusCode());
response.setHeader(HttpHeader.LOCATION, location);
String content = "In progress for async location = " + location;
writeToResponse(response, content);
}
}
}
public void listQueue(HttpServletResponse response) {
StringBuilder sb = new StringBuilder();
sb.append("<html><header/><body><h1>Queued requests</h1><ul>");
for (Map.Entry<String, AsyncRunner> entry : LOCATION_2_ASYNC_RUNNER.entrySet()) {
AsyncProcessor asyncProcessor = entry.getValue().getDispatched();
sb.append("<li><b>ID: </b>").append(entry.getKey()).append("<br/>")
.append("<b>Location: </b>").append(asyncProcessor.getLocation()).append("<br/>")
.append("<b>Processor: </b>").append(asyncProcessor.getProcessorClass().getSimpleName()).append("<br/>")
.append("<b>Finished: </b>").append(entry.getValue().isFinished()).append("<br/>")
.append("</li>");
}
sb.append("</ul></body></html>");
writeToResponse(response, sb.toString());
}
private static void writeToResponse(HttpServletResponse response, InputStream input) throws IOException {
copy(input, response.getOutputStream());
}
@ -154,36 +194,47 @@ public class TechnicalAsyncService {
writeToResponse(response, odResponseStream);
}
static void writeHttpResponse(final ODataResponse odResponse, final HttpServletResponse response) throws IOException {
response.setStatus(odResponse.getStatusCode());
for (Map.Entry<String, String> entry : odResponse.getHeaders().entrySet()) {
response.setHeader(entry.getKey(), entry.getValue());
}
copy(odResponse.getContent(), response.getOutputStream());
}
// static void copy(final InputStream input, final OutputStream output) {
// if(output == null || input == null) {
// return;
// }
//
// try {
// byte[] buffer = new byte[1024];
// int n;
// while (-1 != (n = input.read(buffer))) {
// output.write(buffer, 0, n);
// }
// } catch (IOException e) {
// throw new ODataRuntimeException(e);
// } finally {
// closeStream(output);
// closeStream(input);
// }
// }
static void copy(final InputStream input, final OutputStream output) {
if(output == null || input == null) {
if (output == null || input == null) {
return;
}
try {
byte[] buffer = new byte[1024];
int n;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
ByteBuffer inBuffer = ByteBuffer.allocate(8192);
ReadableByteChannel ic = Channels.newChannel(input);
WritableByteChannel oc = Channels.newChannel(output);
while (ic.read(inBuffer) > 0) {
inBuffer.flip();
oc.write(inBuffer);
inBuffer.rewind();
}
} catch (IOException e) {
throw new ODataRuntimeException(e);
throw new ODataRuntimeException("Error on reading request content");
} finally {
closeStream(output);
closeStream(input);
closeStream(output);
}
}
private static void closeStream(final Closeable closeable) {
if (closeable != null) {
try {
@ -194,7 +245,6 @@ public class TechnicalAsyncService {
}
}
private String createNewAsyncLocation(ODataRequest request) {
int pos = request.getRawBaseUri().lastIndexOf("/") + 1;
return request.getRawBaseUri().substring(0, pos) + STATUS_MONITOR_TOKEN + "/" + ID_GENERATOR.incrementAndGet();
@ -204,6 +254,9 @@ public class TechnicalAsyncService {
return request.getRequestURL().toString();
}
/**
* Runnable for the AsyncProcessor.
*/
private static class AsyncRunner implements Runnable {
private final AsyncProcessor dispatched;
private int defaultSleepTimeInSeconds = 0;

View File

@ -18,33 +18,14 @@
*/
package org.apache.olingo.server.tecsvc.async;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.olingo.server.api.OData;
import org.apache.olingo.server.api.ODataHttpHandler;
import org.apache.olingo.server.api.ServiceMetadata;
import org.apache.olingo.server.api.edmx.EdmxReference;
import org.apache.olingo.server.api.edmx.EdmxReferenceInclude;
import org.apache.olingo.server.tecsvc.ETagSupport;
import org.apache.olingo.server.tecsvc.MetadataETagSupport;
import org.apache.olingo.server.tecsvc.data.DataProvider;
import org.apache.olingo.server.tecsvc.processor.TechnicalActionProcessor;
import org.apache.olingo.server.tecsvc.processor.TechnicalBatchProcessor;
import org.apache.olingo.server.tecsvc.processor.TechnicalEntityProcessor;
import org.apache.olingo.server.tecsvc.processor.TechnicalPrimitiveComplexProcessor;
import org.apache.olingo.server.tecsvc.provider.EdmTechProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class TechnicalStatusMonitorServlet extends HttpServlet {
@ -55,8 +36,10 @@ public class TechnicalStatusMonitorServlet extends HttpServlet {
protected void service(final HttpServletRequest request, final HttpServletResponse response)
throws ServletException, IOException {
try {
if(TechnicalAsyncService.getInstance().isStatusMonitorResource(request)) {
TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
if("/list".equals(request.getPathInfo())) {
asyncService.listQueue(response);
} else if(asyncService.isStatusMonitorResource(request)) {
asyncService.handle(request, response);
}
} catch (final Exception e) {