[OLINGO-708] Enabled async support for batch case in TecSvc

This commit is contained in:
Michael Bolz 2015-08-03 19:21:38 +02:00
parent 5e481b23ec
commit 955823383d
4 changed files with 392 additions and 0 deletions

View File

@ -19,9 +19,15 @@
package org.apache.olingo.fit.tecsvc.client; package org.apache.olingo.fit.tecsvc.client;
import org.apache.olingo.client.api.ODataClient; import org.apache.olingo.client.api.ODataClient;
import org.apache.olingo.client.api.communication.ODataClientErrorException;
import org.apache.olingo.client.api.communication.request.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.request.ODataBatchableRequest;
import org.apache.olingo.client.api.communication.request.ODataRequest; import org.apache.olingo.client.api.communication.request.ODataRequest;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchRequest;
import org.apache.olingo.client.api.communication.request.cud.ODataEntityCreateRequest; import org.apache.olingo.client.api.communication.request.cud.ODataEntityCreateRequest;
import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest;
import org.apache.olingo.client.api.communication.response.AsyncResponseWrapper; import org.apache.olingo.client.api.communication.response.AsyncResponseWrapper;
import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse; import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.communication.response.ODataRetrieveResponse; import org.apache.olingo.client.api.communication.response.ODataRetrieveResponse;
@ -29,6 +35,7 @@ import org.apache.olingo.client.api.domain.ClientEntity;
import org.apache.olingo.client.api.domain.ClientEntitySet; import org.apache.olingo.client.api.domain.ClientEntitySet;
import org.apache.olingo.client.api.domain.ClientObjectFactory; import org.apache.olingo.client.api.domain.ClientObjectFactory;
import org.apache.olingo.client.api.domain.ClientProperty; import org.apache.olingo.client.api.domain.ClientProperty;
import org.apache.olingo.client.api.uri.URIBuilder;
import org.apache.olingo.client.core.ODataClientFactory; import org.apache.olingo.client.core.ODataClientFactory;
import org.apache.olingo.commons.api.data.Entity; import org.apache.olingo.commons.api.data.Entity;
import org.apache.olingo.commons.api.data.EntityCollection; import org.apache.olingo.commons.api.data.EntityCollection;
@ -40,9 +47,11 @@ import org.apache.olingo.commons.api.http.HttpStatusCode;
import org.apache.olingo.fit.AbstractBaseTestITCase; import org.apache.olingo.fit.AbstractBaseTestITCase;
import org.apache.olingo.fit.tecsvc.TecSvcConst; import org.apache.olingo.fit.tecsvc.TecSvcConst;
import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService; import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -198,6 +207,101 @@ public final class AsyncSupportITCase extends AbstractBaseTestITCase {
assertNull(property2.getPrimitiveValue()); assertNull(property2.getPrimitiveValue());
} }
@Test
@Ignore("mibo: Does currently not work as expected -> issue in ODataClient?")
public void getBatchRequest() throws Exception {
ODataClient client = getClient();
final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(SERVICE_URI);
// final BatchManager payload = request.payloadManager();
// create new request
// ODataEntityRequest<ClientEntity> getRequest = appendGetRequest(client, payload, "ESAllPrim", 32767, false);
// payload.addRequest(getRequest);
//
request.addCustomHeader(HttpHeader.PREFER,
"respond-async; " + TechnicalAsyncService.TEC_ASYNC_SLEEP + "=1");
ODataBatchableRequest getRequest = appendGetRequest(client, "ESAllPrim", 32767, false);
AsyncBatchRequestWrapper asyncRequest =
client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request);
asyncRequest.addRetrieve(getRequest);
AsyncResponseWrapper<ODataBatchResponse> asyncResponse = asyncRequest.execute();
// Future<ODataBatchResponse> test = payload.getAsyncResponse();
// ODataBatchResponse res = payload.getResponse();
//
// while(!test.isDone()) {
// System.out.println("Wait...");
// TimeUnit.SECONDS.sleep(1);
// }
// // Fetch result
// final ODataBatchResponse response = asyncResponse.getODataResponse();
waitTillDone(asyncResponse, 3);
// assertEquals(HttpStatusCode.ACCEPTED.getStatusCode(), response.getStatusCode());
// assertEquals("Accepted", response.getStatusMessage());
ODataResponse firstResponse = asyncResponse.getODataResponse();
assertEquals(200, firstResponse.getStatusCode());
assertEquals(2, firstResponse.getHeaderNames().size());
assertEquals("4.0", firstResponse.getHeader("OData-Version").iterator().next());
ResWrap<Entity> firWrap = getClient().getDeserializer(ContentType.APPLICATION_JSON)
.toEntity(firstResponse.getRawResponse());
Entity entity = firWrap.getPayload();
assertEquals(32767, entity.getProperty("PropertyInt16").asPrimitive());
assertEquals("First Resource - positive values", entity.getProperty("PropertyString").asPrimitive());
}
/**
* Test delete with async prefer header but without async support from TecSvc.
*/
@Test
public void deleteEntity() throws Exception {
ODataClient client = getClient();
URI uri = client.newURIBuilder(SERVICE_URI)
.appendEntitySetSegment(ES_ALL_PRIM)
.appendKeySegment(32767).build();
// asyncDeleteRequest async request
ODataRequest deleteRequest = getClient().getCUDRequestFactory().getDeleteRequest(uri)
.addCustomHeader(HttpHeader.PREFER, "respond-async; " + TechnicalAsyncService.TEC_ASYNC_SLEEP + "=5");
AsyncResponseWrapper<ODataResponse> asyncDeleteRequest =
client.getAsyncRequestFactory().getAsyncRequestWrapper(deleteRequest).execute();
waitTillDone(asyncDeleteRequest, 5);
ODataResponse response = asyncDeleteRequest.getODataResponse();
assertEquals(HttpStatusCode.NO_CONTENT.getStatusCode(), response.getStatusCode());
// Check that the deleted entity is really gone.
// This check has to be in the same session in order to access the same data provider.
ODataEntityRequest<ClientEntity> entityRequest = client.getRetrieveRequestFactory().getEntityRequest(uri);
entityRequest.addCustomHeader(HttpHeader.COOKIE, response.getHeader(HttpHeader.SET_COOKIE).iterator().next());
try {
entityRequest.execute();
fail("Expected exception not thrown!");
} catch (final ODataClientErrorException e) {
assertEquals(HttpStatusCode.NOT_FOUND.getStatusCode(), e.getStatusLine().getStatusCode());
}
}
private ODataEntityRequest<ClientEntity> appendGetRequest(final ODataClient client, final String segment,
final Object key, final boolean isRelative)
throws URISyntaxException {
final URIBuilder targetURI = client.newURIBuilder(SERVICE_URI);
targetURI.appendEntitySetSegment(segment).appendKeySegment(key);
final URI uri = (isRelative) ? new URI(SERVICE_URI).relativize(targetURI.build()) : targetURI.build();
ODataEntityRequest<ClientEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(uri);
queryReq.setFormat(ContentType.JSON);
return queryReq;
}
private void checkEntityAvailableWith(ClientEntitySet entitySet, String property, Object value) { private void checkEntityAvailableWith(ClientEntitySet entitySet, String property, Object value) {
List<ClientEntity> entities = entitySet.getEntities(); List<ClientEntity> entities = entitySet.getEntities();
for (ClientEntity entity : entities) { for (ClientEntity entity : entities) {

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.fit.tecsvc.http;
import org.apache.olingo.client.api.ODataClient;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.api.http.HttpHeader;
import org.apache.olingo.commons.api.http.HttpMethod;
import org.apache.olingo.fit.AbstractBaseTestITCase;
import org.apache.olingo.fit.tecsvc.TecSvcConst;
import org.apache.olingo.fit.util.StringHelper;
import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test support of asynchronous batch within the TecSvc without using the OData client library (only
* use java.net.* components for plain http communication).
*/
public class BasicAsyncITCase extends AbstractBaseTestITCase {
private static final String SERVICE_URI = TecSvcConst.BASE_URI + "/";
private static final String HEADER_CONTENT_TRANSFER_ENCODING_BINARY = "Content-Transfer-Encoding: binary";
private static final String HEADER_CONTENT_TYPE_HTTP =
HttpHeader.CONTENT_TYPE + ": " + ContentType.APPLICATION_HTTP.toContentTypeString();
private static final String DEFAULT_BATCH_BOUNDARY = "batch_123";
private static final String ACCEPT_HEADER_VALUE = ContentType.APPLICATION_JSON.toContentTypeString();
private static final String CRLF = "\r\n";
private static final String DEFAULT_ENCODING = "utf-8";
/**
* Works
*/
@Test
public void batchAsync() throws Exception {
final String content = getDefaultRequest("ESAllPrim(32767)");
final HttpURLConnection connection = postBatch(StringHelper.encapsulate(content), DEFAULT_BATCH_BOUNDARY, 1);
StringHelper.Stream response = StringHelper.toStream(connection.getInputStream());
assertEquals(0, response.byteLength());
Map<String, List<String>> headerFields = connection.getHeaderFields();
assertEquals("HTTP/1.1 202 Accepted", headerFields.get(null).get(0));
assertTrue(Pattern.matches("http:\\/\\/localhost:9080\\/odata-server-tecsvc\\/status\\/\\d*",
headerFields.get("Location").get(0)));
assertEquals("respond-async", headerFields.get("Preference-Applied").get(0));
// get async response (still pending)
String respondUri = headerFields.get("Location").get(0);
HttpURLConnection statusRequest = getRequest(new URL(respondUri), Collections.<String, String>emptyMap());
StringHelper.Stream statusBody = StringHelper.toStream(statusRequest.getInputStream());
Map<String, List<String>> statusHeaderFields = statusRequest.getHeaderFields();
assertEquals("HTTP/1.1 202 Accepted", statusHeaderFields.get(null).get(0));
assertEquals(0, statusBody.byteLength());
// get async response (now finished)
TimeUnit.SECONDS.sleep(2);
HttpURLConnection result = getRequest(new URL(respondUri), Collections.<String, String>emptyMap());
StringHelper.Stream resultBody = StringHelper.toStream(result.getInputStream());
Map<String, List<String>> resultHeaderFields = result.getHeaderFields();
String resBody = resultBody.asString();
assertEquals("HTTP/1.1 200 OK", resultHeaderFields.get(null).get(0));
assertEquals(1013, resultBody.byteLength());
contains(resBody,
"HTTP/1.1 200 OK",
"OData-Version: 4.0",
"Content-Length: 605",
"\"@odata.context\":\"$metadata#ESAllPrim/$entity\"",
"\"PropertyInt16\":32767",
"\"PropertyGuid\":\"01234567-89ab-cdef-0123-456789abcdef\",",
"--batch_", "--");
}
/**
* Test with changeset
*/
@Test
public void asyncChangesetViaPost() throws Exception {
InputStream content = Thread.currentThread().getContextClassLoader().getResourceAsStream("basicBatchPost.batch");
final HttpURLConnection connection = postBatch(content, "batch_8194-cf13-1f56", 1);
StringHelper.Stream response = StringHelper.toStream(connection.getInputStream());
assertEquals(0, response.byteLength());
Map<String, List<String>> headerFields = connection.getHeaderFields();
assertEquals("HTTP/1.1 202 Accepted", headerFields.get(null).get(0));
// because of generated status id it is only checked that the location starts correct and contains a number
assertTrue(Pattern.matches("http:\\/\\/localhost:9080\\/odata-server-tecsvc\\/status\\/\\d*",
headerFields.get("Location").get(0)));
assertEquals("respond-async", headerFields.get("Preference-Applied").get(0));
// get async response (still pending)
String respondUri = headerFields.get("Location").get(0);
HttpURLConnection statusRequest = getRequest(new URL(respondUri), Collections.<String, String>emptyMap());
StringHelper.Stream statusBody = StringHelper.toStream(statusRequest.getInputStream());
Map<String, List<String>> statusHeaderFields = statusRequest.getHeaderFields();
assertEquals("HTTP/1.1 202 Accepted", statusHeaderFields.get(null).get(0));
assertEquals(0, statusBody.byteLength());
// get async response (now finished)
TimeUnit.SECONDS.sleep(2);
HttpURLConnection result = getRequest(new URL(respondUri), Collections.<String, String>emptyMap());
StringHelper.Stream resultBody = StringHelper.toStream(result.getInputStream());
Map<String, List<String>> resultHeaderFields = result.getHeaderFields();
String resBody = resultBody.asString();
assertEquals("HTTP/1.1 200 OK", resultHeaderFields.get(null).get(0));
assertEquals(2324, resultBody.byteLength());
contains(resBody,
"HTTP/1.1 200 OK",
"OData-Version: 4.0",
"Content-Length: 605",
"\"@odata.context\":\"$metadata#ESAllPrim/$entity\"",
"\"PropertyInt16\":32767",
"\"PropertyGuid\":\"01234567-89ab-cdef-0123-456789abcdef\",",
"--batch_", "--");
}
/**
* Validates that the content contains all given values in same order as the parameters are given.
* If the content does not contain a value or not in the given order <code>Assert.fail()</code> is called.
*
* @param content content which is checked
* @param values values which must be in content (and in correct order)
*/
private void contains(String content, String... values) {
int index = 0;
for (String value : values) {
int currentIndex = content.indexOf(value, index);
if(currentIndex == -1) {
if(content.contains(value)) {
int foundIndex = content.indexOf(value);
fail("Expected value '" + value + "' was found but not were expected " +
"(started to search from position '" + index + "' but found first occurrence at index '" +
foundIndex + "').");
} else {
fail("Expected value '" + value + "' was not found");
}
}
index = currentIndex;
}
}
private String getDefaultRequest(final String uri) {
return "--" + DEFAULT_BATCH_BOUNDARY + CRLF
+ HEADER_CONTENT_TRANSFER_ENCODING_BINARY + CRLF
+ HEADER_CONTENT_TYPE_HTTP + CRLF
+ CRLF
+ "GET " + uri + " HTTP/1.1" + CRLF
+ CRLF
+ CRLF
+ "--" + DEFAULT_BATCH_BOUNDARY + "--";
}
private HttpURLConnection postRequest(final URL url, final String content, final Map<String, String> headers)
throws IOException {
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(HttpMethod.POST.toString());
//
for (Map.Entry<String, String> header : headers.entrySet()) {
connection.setRequestProperty(header.getKey(), header.getValue());
}
//
connection.setDoOutput(true);
final OutputStreamWriter writer = new OutputStreamWriter(connection.getOutputStream());
writer.append(content);
writer.close();
connection.connect();
return connection;
}
private HttpURLConnection getRequest(URL url, Map<String, String> headers) throws IOException {
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(HttpMethod.GET.toString());
//
for (Map.Entry<String, String> header : headers.entrySet()) {
connection.setRequestProperty(header.getKey(), header.getValue());
}
//
connection.connect();
return connection;
}
private HttpURLConnection postBatch(final InputStream content, String batchBoundary, int sleepTime)
throws IOException {
Map<String, String> headers = new HashMap<String, String>();
String contentTypeValue = ContentType.create(
ContentType.MULTIPART_MIXED, "boundary", batchBoundary).toContentTypeString();
headers.put(HttpHeader.CONTENT_TYPE, contentTypeValue);
headers.put(HttpHeader.ACCEPT, ACCEPT_HEADER_VALUE);
if(sleepTime >= 0) {
headers.put(HttpHeader.PREFER, "respond-async; " +
TechnicalAsyncService.TEC_ASYNC_SLEEP + "=" + String.valueOf(sleepTime));
}
StringHelper.Stream s = StringHelper.toStream(content);
final URL url = new URL(SERVICE_URI + "$batch");
return postRequest(url, s.asString(DEFAULT_ENCODING), headers);
}
@Override
protected ODataClient getClient() {
return null;
}
}

View File

@ -0,0 +1,33 @@
--batch_8194-cf13-1f56
Content-Type: application/http
Content-Transfer-Encoding: binary
GET ESAllPrim(32767) HTTP/1.1
Accept: application/json
--batch_8194-cf13-1f56
Content-Type: multipart/mixed; boundary=changeset_f980-1cb6-94dd
--changeset_f980-1cb6-94dd
Content-Type: application/http
Content-Transfer-Encoding: binary
Content-ID: changeRequest1
PUT ESAllPrim(32767) HTTP/1.1
Accept: application/json
Content-Type: application/json
{"PropertyString":"MODIFIED"}
--changeset_f980-1cb6-94dd--
--batch_8194-cf13-1f56
Content-Type: application/http
Content-Transfer-Encoding: binary
GET ESAllPrim(32767)/PropertyString HTTP/1.1
Accept: application/json
--batch_8194-cf13-1f56--

View File

@ -36,6 +36,8 @@ import org.apache.olingo.server.api.deserializer.batch.BatchRequestPart;
import org.apache.olingo.server.api.deserializer.batch.ODataResponsePart; import org.apache.olingo.server.api.deserializer.batch.ODataResponsePart;
import org.apache.olingo.server.api.prefer.PreferencesApplied; import org.apache.olingo.server.api.prefer.PreferencesApplied;
import org.apache.olingo.server.api.processor.BatchProcessor; import org.apache.olingo.server.api.processor.BatchProcessor;
import org.apache.olingo.server.tecsvc.async.AsyncProcessor;
import org.apache.olingo.server.tecsvc.async.TechnicalAsyncService;
import org.apache.olingo.server.tecsvc.data.DataProvider; import org.apache.olingo.server.tecsvc.data.DataProvider;
public class TechnicalBatchProcessor extends TechnicalProcessor implements BatchProcessor { public class TechnicalBatchProcessor extends TechnicalProcessor implements BatchProcessor {
@ -47,6 +49,21 @@ public class TechnicalBatchProcessor extends TechnicalProcessor implements Batch
@Override @Override
public void processBatch(final BatchFacade facade, final ODataRequest request, final ODataResponse response) public void processBatch(final BatchFacade facade, final ODataRequest request, final ODataResponse response)
throws ODataApplicationException, ODataLibraryException { throws ODataApplicationException, ODataLibraryException {
// only the first batch call (process batch) must be handled in a separate way for async support
// because a changeset has to be wrapped within a process batch call
if(odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).hasRespondAsync()) {
TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
BatchProcessor processor = new TechnicalBatchProcessor(dataProvider);
processor.init(odata, serviceMetadata);
AsyncProcessor<BatchProcessor> asyncProcessor = asyncService.register(processor, BatchProcessor.class);
asyncProcessor.prepareFor().processBatch(facade, request, response);
String location = asyncProcessor.processAsync();
TechnicalAsyncService.acceptedResponse(response, location);
//
return;
}
final boolean continueOnError = final boolean continueOnError =
odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).hasContinueOnError(); odata.createPreferences(request.getHeaders(HttpHeader.PREFER)).hasContinueOnError();