From 0b05798cd943529bf0099cb05760f2baf1c6cf9b Mon Sep 17 00:00:00 2001 From: fmartelli Date: Mon, 28 Apr 2014 13:10:40 +0200 Subject: [PATCH] [OLINGO-246, OLINGO-248] provided async batch support --- .../org/apache/olingo/fit/V4Services.java | 71 +++++++++-- .../request/v4/AsyncBatchRequestWrapper.java | 48 +++++++ .../request/v4/AsyncRequestFactory.java | 3 + .../response/v4/AsyncResponseWrapper.java | 11 ++ .../batch/ODataChangesetResponseItem.java | 13 +- .../batch/ODataRetrieveResponseItem.java | 7 +- .../batch/v4/ODataBatchRequestImpl.java | 3 + .../v4/AsyncBatchRequestWrapperImpl.java | 119 ++++++++++++++++++ .../request/v4/AsyncRequestFactoryImpl.java | 7 ++ .../request/v4/AsyncRequestWrapperImpl.java | 70 ++++++----- .../response/AbstractODataResponse.java | 6 +- .../response/v4/AsyncResponseImpl.java | 27 ++++ .../client/core/it/v4/BatchTestITCase.java | 84 ++++++++++++- 13 files changed, 415 insertions(+), 54 deletions(-) create mode 100644 lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java create mode 100644 lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java diff --git a/fit/src/main/java/org/apache/olingo/fit/V4Services.java b/fit/src/main/java/org/apache/olingo/fit/V4Services.java index 25eac8948..38d869698 100644 --- a/fit/src/main/java/org/apache/olingo/fit/V4Services.java +++ b/fit/src/main/java/org/apache/olingo/fit/V4Services.java @@ -51,12 +51,15 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.cxf.interceptor.InInterceptors; import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.Multipart; +import org.apache.cxf.jaxrs.ext.multipart.MultipartBody; import org.apache.olingo.commons.api.data.CollectionValue; import org.apache.olingo.commons.api.data.ResWrap; import org.apache.olingo.commons.api.data.Entry; import org.apache.olingo.commons.api.data.Feed; import org.apache.olingo.commons.api.data.Property; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; +import org.apache.olingo.commons.api.format.ContentType; import org.apache.olingo.commons.core.data.AtomEntryImpl; import org.apache.olingo.commons.core.data.AtomFeedImpl; import org.apache.olingo.commons.core.data.AtomPropertyImpl; @@ -152,6 +155,54 @@ public class V4Services extends AbstractServices { } } + @POST + @Path("/async/$batch") + public Response async( + @Context final UriInfo uriInfo, + @HeaderParam("Prefer") @DefaultValue(StringUtils.EMPTY) final String prefer, + final @Multipart MultipartBody attachment) { + + try { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + bos.write("HTTP/1.1 200 Ok".getBytes()); + bos.write(CRLF); + bos.write("OData-Version: 4.0".getBytes()); + bos.write(CRLF); + bos.write(("Content-Type: " + ContentType.APPLICATION_OCTET_STREAM + ";boundary=" + BOUNDARY).getBytes()); + bos.write(CRLF); + bos.write(CRLF); + + bos.write(("--" + BOUNDARY).getBytes()); + bos.write(CRLF); + bos.write("Content-Type: application/http".getBytes()); + bos.write(CRLF); + bos.write("Content-Transfer-Encoding: binary".getBytes()); + bos.write(CRLF); + bos.write(CRLF); + + bos.write("HTTP/1.1 202 Accepted".getBytes()); + bos.write(CRLF); + bos.write("Location: http://service-root/async-monitor".getBytes()); + bos.write(CRLF); + bos.write("Retry-After: 10".getBytes()); + bos.write(CRLF); + bos.write(CRLF); + bos.write(("--" + BOUNDARY + "--").getBytes()); + bos.write(CRLF); + + final UUID uuid = UUID.randomUUID(); + providedAsync.put(uuid.toString(), bos.toString(Constants.ENCODING.toString())); + + bos.flush(); + bos.close(); + + return xml.createAsyncResponse( + uriInfo.getRequestUri().toASCIIString().replace("async/$batch", "") + "monitor/" + uuid.toString()); + } catch (Exception e) { + return xml.createFaultResponse(Accept.JSON.toString(), e); + } + } + @GET @Path("/async/{name}") public Response async( @@ -568,7 +619,7 @@ public class V4Services extends AbstractServices { return utils.getValue().createResponse( FSManager.instance(version).readFile(Constants.get(version, ConstantKey.REF) - + File.separatorChar + filename, utils.getKey()), + + File.separatorChar + filename, utils.getKey()), null, utils.getKey()); } catch (Exception e) { @@ -590,7 +641,7 @@ public class V4Services extends AbstractServices { final Response response = getEntityInternal(uriInfo.getRequestUri().toASCIIString(), - accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false); + accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false); return response.getStatus() >= 400 ? postNewEntity(uriInfo, accept, contentType, prefer, entitySetName, changes) : super.patchEntity(uriInfo, accept, contentType, prefer, ifMatch, entitySetName, entityId, changes); @@ -688,8 +739,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap jcontainer = mapper.readValue(IOUtils.toInputStream(entity, Constants.ENCODING), - new TypeReference() { - }); + new TypeReference() { + }); entry = dataBinder.toAtomEntry(jcontainer.getPayload()); @@ -787,7 +838,7 @@ public class V4Services extends AbstractServices { final ResWrap jsonContainer = mapper.readValue( IOUtils.toInputStream(changes, Constants.ENCODING), new TypeReference() { - }); + }); jsonContainer.getPayload().setType(typeInfo.getFullQualifiedName().toString()); entryChanges = dataBinder.toAtomEntry(jsonContainer.getPayload()); } @@ -820,7 +871,7 @@ public class V4Services extends AbstractServices { // 1. Fetch the contained entity to be removed final InputStream entry = FSManager.instance(version). readFile(containedPath(entityId, containedEntitySetName). - append('(').append(containedEntityId).append(')').toString(), Accept.ATOM); + append('(').append(containedEntityId).append(')').toString(), Accept.ATOM); final ResWrap container = atomDeserializer.read(entry, AtomEntryImpl.class); // 2. Remove the contained entity @@ -1049,8 +1100,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap paramContainer = mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING), - new TypeReference() { - }); + new TypeReference() { + }); property = paramContainer.getPayload(); } @@ -1091,8 +1142,8 @@ public class V4Services extends AbstractServices { } else { final ResWrap paramContainer = mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING), - new TypeReference() { - }); + new TypeReference() { + }); property = paramContainer.getPayload(); } diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java new file mode 100644 index 000000000..9efa0fd48 --- /dev/null +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.client.api.communication.request.v4; + +import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; +import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate; +import org.apache.olingo.client.api.communication.response.ODataBatchResponse; + +public interface AsyncBatchRequestWrapper extends AsyncRequestWrapper { + + /** + * Gets a changeset batch item instance. A changeset can be submitted embedded into a batch request only. + * + * @return ODataChangeset instance. + */ + ODataChangeset addChangeset(); + + /** + * Gets a retrieve batch item instance. A retrieve item can be submitted embedded into a batch request only. + * + * @return ODataRetrieve instance. + */ + ODataRetrieve addRetrieve(); + + /** + * Gets an outside change batch item instance. An outside item can be submitted embedded into a batch request only. + * + * @return ODataOutsideUpdate instance. + */ + ODataOutsideUpdate addOutsideUpdate(); +} diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java index 9fd06dc53..17b510f9e 100644 --- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java @@ -19,10 +19,13 @@ package org.apache.olingo.client.api.communication.request.v4; import org.apache.olingo.client.api.communication.request.ODataRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; import org.apache.olingo.client.api.communication.response.ODataResponse; @SuppressWarnings("unchecked") public interface AsyncRequestFactory { AsyncRequestWrapper getAsyncRequestWrapper(final ODataRequest odataRequest); + + AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest); } diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java index 87b6904b7..04d368818 100644 --- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java +++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java @@ -18,6 +18,7 @@ */ package org.apache.olingo.client.api.communication.response.v4; +import java.net.URI; import org.apache.olingo.client.api.communication.response.ODataDeleteResponse; import org.apache.olingo.client.api.communication.response.ODataResponse; @@ -49,6 +50,16 @@ public interface AsyncResponseWrapper { */ R getODataResponse(); + /** + * Specifies the location for the next monitor check. + *
+ * Overrides the location value retrieved among headers and nullifies the previous valid response (if exists). + * + * @param uri monitor location. + * @return the current async response wrapper. + */ + AsyncResponseWrapper forceNextMonitorCheck(URI uri); + /** * DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A * 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled. diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java index 53eeb16d2..51f3a48a3 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java @@ -25,6 +25,7 @@ import org.apache.olingo.client.api.ODataBatchConstants; import org.apache.olingo.client.api.communication.response.ODataResponse; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; +import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl; /** * Changeset wrapper for the corresponding batch item. @@ -116,11 +117,15 @@ public class ODataChangesetResponseItem extends AbstractODataBatchResponseItem { final Map.Entry responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator); LOG.debug("Retrieved item response {}", responseLine); - if (responseLine.getKey() >= 400) { - // generate error response - final Map> headers = ODataBatchUtilities.readHeaders(batchLineIterator); - LOG.debug("Retrieved item headers {}", headers); + final Map> headers = ODataBatchUtilities.readHeaders(batchLineIterator); + LOG.debug("Retrieved item headers {}", headers); + if (responseLine.getKey() == 202) { + // generate async response + current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary); + return current; + } else if (responseLine.getKey() >= 400) { + // generate error response current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); return current; } diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java index 8919ffebf..a180b379a 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java @@ -24,6 +24,7 @@ import java.util.NoSuchElementException; import org.apache.olingo.client.api.communication.response.ODataResponse; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; +import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl; /** * Retrieve response wrapper for the corresponding batch item. @@ -58,7 +59,11 @@ public class ODataRetrieveResponseItem extends AbstractODataBatchResponseItem { final Map> headers = ODataBatchUtilities.readHeaders(batchLineIterator); LOG.debug("Retrieved item headers {}", headers); - if (responseLine.getKey() >= 400) { + if (responseLine.getKey() == 202) { + // generate async response + current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary); + breakingitem = true; + } else if (responseLine.getKey() >= 400) { // generate error response current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); breakingitem = true; diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java index 648cb3532..a445013b7 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java @@ -122,6 +122,9 @@ public class ODataBatchRequestImpl */ protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse { + private ODataBatchResponseImpl() { + } + /** * Constructor. * diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java new file mode 100644 index 000000000..8a3d903a6 --- /dev/null +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.client.core.communication.request.v4; + +import java.net.URI; +import java.util.Collection; +import org.apache.commons.io.IOUtils; +import org.apache.olingo.client.api.communication.header.HeaderName; +import org.apache.olingo.client.api.communication.header.ODataPreferences; +import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; +import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve; +import org.apache.olingo.client.api.communication.request.batch.v4.BatchStreamManager; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; +import org.apache.olingo.client.api.communication.response.ODataBatchResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper; +import org.apache.olingo.client.api.v4.ODataClient; +import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; + +public class AsyncBatchRequestWrapperImpl extends AsyncRequestWrapperImpl + implements AsyncBatchRequestWrapper { + + private BatchStreamManager batchStreamManager; + + protected AsyncBatchRequestWrapperImpl(final ODataClient odataClient, final ODataBatchRequest odataRequest) { + super(odataClient, odataRequest); + batchStreamManager = odataRequest.execute(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataChangeset addChangeset() { + return batchStreamManager.addChangeset(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataRetrieve addRetrieve() { + return batchStreamManager.addRetrieve(); + } + + /** + * {@inheritDoc} + */ + @Override + public ODataOutsideUpdate addOutsideUpdate() { + return batchStreamManager.addOutsideUpdate(); + } + + @Override + public AsyncResponseWrapper execute() { + return new AsyncResponseWrapperImpl(batchStreamManager.getResponse()); + } + + public class AsyncResponseWrapperImpl + extends AsyncRequestWrapperImpl.AsyncResponseWrapperImpl { + + /** + * Constructor. + * + * @param res OData batch response. + */ + public AsyncResponseWrapperImpl(final ODataBatchResponse res) { + super(); + + if (res.getStatusCode() == 202) { + retrieveMonitorDetails(res); + } else { + response = res; + } + } + + private void retrieveMonitorDetails(final ODataBatchResponse res) { + Collection headers = res.getHeader(HeaderName.location.toString()); + if (headers == null || headers.isEmpty()) { + throw new AsyncRequestException("Invalid async request response. Monitor URL not found"); + } else { + this.location = URI.create(headers.iterator().next()); + } + + headers = res.getHeader(HeaderName.retryAfter.toString()); + if (headers != null && !headers.isEmpty()) { + this.retryAfter = Integer.parseInt(headers.iterator().next()); + } + + headers = res.getHeader(HeaderName.preferenceApplied.toString()); + if (headers != null && !headers.isEmpty()) { + for (String header : headers) { + if (header.equalsIgnoreCase(new ODataPreferences(ODataServiceVersion.V40).respondAsync())) { + preferenceApplied = true; + } + } + } + + IOUtils.closeQuietly(res.getRawResponse()); + } + } +} diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java index d6905dcfb..d5331bcd5 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java @@ -19,6 +19,8 @@ package org.apache.olingo.client.core.communication.request.v4; import org.apache.olingo.client.api.communication.request.ODataRequest; +import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper; import org.apache.olingo.client.api.communication.response.ODataResponse; @@ -39,4 +41,9 @@ public class AsyncRequestFactoryImpl implements AsyncRequestFactory { public AsyncRequestWrapper getAsyncRequestWrapper(final ODataRequest odataRequest) { return new AsyncRequestWrapperImpl(client, odataRequest); } + + @Override + public AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest) { + return new AsyncBatchRequestWrapperImpl(client, odataRequest); + } } diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java index 57305cd7a..7aef8a9e0 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java @@ -41,7 +41,6 @@ import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapp import org.apache.olingo.client.api.http.HttpClientException; import org.apache.olingo.client.api.http.HttpMethod; import org.apache.olingo.client.api.v4.ODataClient; -import org.apache.olingo.client.core.communication.header.ODataHeadersImpl; import org.apache.olingo.client.core.communication.request.AbstractODataRequest; import org.apache.olingo.client.core.communication.request.AbstractRequest; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; @@ -49,34 +48,29 @@ import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; public class AsyncRequestWrapperImpl extends AbstractRequest implements AsyncRequestWrapper { - private static final int MAX_RETRY = 5; + protected static final int MAX_RETRY = 5; - private final ODataClient odataClient; + protected final ODataClient odataClient; /** * Request to be wrapped. */ - private final ODataRequest odataRequest; + protected final ODataRequest odataRequest; /** * HTTP client. */ - private final HttpClient httpClient; + protected final HttpClient httpClient; /** * HTTP request. */ - private final HttpUriRequest request; - - /** - * OData request header. - */ - private final ODataHeadersImpl odataHeaders; + protected final HttpUriRequest request; /** * Target URI. */ - private final URI uri; + protected final URI uri; protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) { this.odataRequest = odataRequest; @@ -88,9 +82,6 @@ public class AsyncRequestWrapperImpl extends AbstractRe this.odataClient = odataClient; final HttpMethod method = odataRequest.getMethod(); - // initialize default headers - this.odataHeaders = (ODataHeadersImpl) odataClient.getVersionHeaders(); - // target uri this.uri = odataRequest.getURI(); @@ -104,19 +95,19 @@ public class AsyncRequestWrapperImpl extends AbstractRe } @Override - public AsyncRequestWrapper wait(final int waitInSeconds) { + public final AsyncRequestWrapper wait(final int waitInSeconds) { extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds)); return this; } @Override - public AsyncRequestWrapper callback(URI url) { + public final AsyncRequestWrapper callback(URI url) { extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString())); return this; } - private void extendHeader(final String headerName, final String headerValue) { + protected final void extendHeader(final String headerName, final String headerValue) { final StringBuilder extended = new StringBuilder(); if (this.odataRequest.getHeaderNames().contains(headerName)) { extended.append(this.odataRequest.getHeader(headerName)).append(", "); @@ -130,7 +121,7 @@ public class AsyncRequestWrapperImpl extends AbstractRe return new AsyncResponseWrapperImpl(doExecute()); } - private HttpResponse doExecute() { + protected HttpResponse doExecute() { // Add all available headers for (String key : odataRequest.getHeaderNames()) { final String value = odataRequest.getHeader(key); @@ -143,13 +134,16 @@ public class AsyncRequestWrapperImpl extends AbstractRe public class AsyncResponseWrapperImpl implements AsyncResponseWrapper { - private URI location = null; + protected URI location = null; - private R response = null; + protected R response = null; - private int retryAfter = 5; + protected int retryAfter = 5; - private boolean preferenceApplied = false; + protected boolean preferenceApplied = false; + + public AsyncResponseWrapperImpl() { + } /** * Constructor. @@ -159,7 +153,7 @@ public class AsyncRequestWrapperImpl extends AbstractRe @SuppressWarnings("unchecked") public AsyncResponseWrapperImpl(final HttpResponse res) { if (res.getStatusLine().getStatusCode() == 202) { - retrieveMonitorDetails(res, true); + retrieveMonitorDetails(res); } else { response = (R) ((AbstractODataRequest) odataRequest).getResponseTemplate().initFromHttpResponse(res); } @@ -177,7 +171,7 @@ public class AsyncRequestWrapperImpl extends AbstractRe final HttpResponse res = checkMonitor(location); if (res.getStatusLine().getStatusCode() == 202) { - retrieveMonitorDetails(res, false); + retrieveMonitorDetails(res); } else { response = instantiateResponse(res); } @@ -219,18 +213,34 @@ public class AsyncRequestWrapperImpl extends AbstractRe return response; } + /** + * {@inheritDoc} + */ @Override public ODataDeleteResponse delete() { final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location); return deleteRequest.execute(); } + /** + * {@inheritDoc} + */ @Override public AsyncResponseWrapper asyncDelete() { return odataClient.getAsyncRequestFactory().getAsyncRequestWrapper( odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute(); } + /** + * {@inheritDoc} + */ + @Override + public AsyncResponseWrapper forceNextMonitorCheck(final URI uri) { + this.location = uri; + this.response = null; + return this; + } + @SuppressWarnings("unchecked") private R instantiateResponse(final HttpResponse res) { R odataResponse; @@ -246,7 +256,7 @@ public class AsyncRequestWrapperImpl extends AbstractRe return odataResponse; } - private void retrieveMonitorDetails(final HttpResponse res, final boolean includePreferenceApplied) { + private void retrieveMonitorDetails(final HttpResponse res) { Header[] headers = res.getHeaders(HeaderName.location.toString()); if (ArrayUtils.isNotEmpty(headers)) { this.location = URI.create(headers[0].getValue()); @@ -276,7 +286,7 @@ public class AsyncRequestWrapperImpl extends AbstractRe } } - private HttpResponse checkMonitor(final URI location) { + protected final HttpResponse checkMonitor(final URI location) { if (location == null) { throw new AsyncRequestException("Invalid async request response. Missing monitor URL"); } @@ -287,10 +297,8 @@ public class AsyncRequestWrapperImpl extends AbstractRe return executeHttpRequest(httpClient, monitor); } - private HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) { - checkRequest(odataClient, request); - - HttpResponse response; + protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) { + final HttpResponse response; try { response = client.execute(req); } catch (IOException e) { diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java index d66fa2045..fe8fa526b 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; -import java.nio.charset.Charset; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -35,6 +34,7 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.olingo.client.api.communication.header.HeaderName; +import org.apache.olingo.client.api.communication.request.ODataStreamer; import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator; import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.http.NoContentException; @@ -257,10 +257,10 @@ public abstract class AbstractODataResponse implements ODataResponse { this.headers.putAll(partHeaders); final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - LOG.debug("Retrieved payload {}", bos.toString(Charset.forName("UTF-8").toString())); while (batchLineIterator.hasNext()) { - bos.write(batchLineIterator.nextLine().getBytes()); + bos.write(batchLineIterator.nextLine().getBytes(Constants.UTF8)); + bos.write(ODataStreamer.CRLF); } try { diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java index cf7da1389..a800037cb 100644 --- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java +++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java @@ -18,9 +18,13 @@ */ package org.apache.olingo.client.core.communication.response.v4; +import java.util.Collection; +import java.util.Map; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator; import org.apache.olingo.client.api.communication.response.v4.AsyncResponse; +import org.apache.olingo.client.core.communication.request.batch.ODataBatchController; import org.apache.olingo.client.core.communication.response.AbstractODataResponse; /** @@ -46,4 +50,27 @@ public class AsyncResponseImpl extends AbstractODataResponse implements AsyncRes public AsyncResponseImpl(final HttpClient client, final HttpResponse res) { super(client, res); } + + /** + * Constructor to be used inside a batch item. + */ + public AsyncResponseImpl( + final Map.Entry responseLine, + final Map> headers, + final ODataBatchLineIterator batchLineIterator, + final String boundary) { + super(); + + if (hasBeenInitialized) { + throw new IllegalStateException("Request already initialized"); + } + + this.hasBeenInitialized = true; + + this.batchInfo = new ODataBatchController(batchLineIterator, boundary); + + this.statusCode = responseLine.getKey(); + this.statusMessage = responseLine.getValue(); + this.headers.putAll(headers); + } } diff --git a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java index 8a798dd06..6c2478980 100644 --- a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java +++ b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java @@ -26,12 +26,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; import java.util.Calendar; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.http.HttpResponse; import org.apache.olingo.client.api.ODataBatchConstants; +import org.apache.olingo.client.api.communication.header.HeaderName; import org.apache.olingo.client.api.communication.request.ODataStreamManager; import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem; import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; @@ -44,10 +46,13 @@ import org.apache.olingo.client.api.communication.request.cud.ODataEntityUpdateR import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest; +import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper; import org.apache.olingo.client.api.communication.response.ODataBatchResponse; import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse; import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse; import org.apache.olingo.client.api.communication.response.ODataResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponse; +import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper; import org.apache.olingo.client.api.uri.v4.URIBuilder; import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager; import org.apache.olingo.client.core.communication.request.Wrapper; @@ -397,7 +402,7 @@ public class BatchTestITCase extends AbstractTestITCase { } @Test - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked"}) public void batchRequest() throws EdmPrimitiveTypeException { // create your request final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL); @@ -412,8 +417,7 @@ public class BatchTestITCase extends AbstractTestITCase { // prepare URI URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL); - targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);//. -// expand("Orders").select("PersonID,Orders/OrderID"); + targetURI.appendEntitySetSegment("Customers").appendKeySegment(1); // create new request ODataEntityRequest queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); @@ -520,12 +524,82 @@ public class BatchTestITCase extends AbstractTestITCase { entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res; entity = entres.getBody(); - assertEquals("new last name", - entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class)); + assertEquals("new last name", entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class)); assertFalse(iter.hasNext()); } + @Test + public void async() { + // create your request + final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest( + URI.create(testStaticServiceRootURL + "/async/").normalize().toASCIIString()); + request.setAccept(ACCEPT); + + final AsyncBatchRequestWrapper async = client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request); + + // ------------------------------------------- + // Add retrieve item + // ------------------------------------------- + ODataRetrieve retrieve = async.addRetrieve(); + + // prepare URI + URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL); + targetURI.appendEntitySetSegment("People").appendKeySegment(5); + + // create new request + ODataEntityRequest queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); + queryReq.setFormat(ODataPubFormat.JSON); + + retrieve.setRequest(queryReq); + // ------------------------------------------- + + // ------------------------------------------- + // Add retrieve item + // ------------------------------------------- + retrieve = async.addRetrieve(); + + // prepare URI + targetURI = client.getURIBuilder(testStaticServiceRootURL).appendEntitySetSegment("Customers").appendKeySegment(1); + + // create new request + queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); + + retrieve.setRequest(queryReq); + // ------------------------------------------- + + final AsyncResponseWrapper responseWrapper = async.execute(); + + assertTrue(responseWrapper.isPreferenceApplied()); + assertTrue(responseWrapper.isDone()); + + final ODataBatchResponse response = responseWrapper.getODataResponse(); + + assertEquals(200, response.getStatusCode()); + assertEquals("Ok", response.getStatusMessage()); + final Iterator iter = response.getBody(); + + // retrieve the first item (ODataRetrieve) + ODataBatchResponseItem item = iter.next(); + assertTrue(item instanceof ODataRetrieveResponseItem); + + // The service return interim results to an asynchronously executing batch. + ODataRetrieveResponseItem retitem = (ODataRetrieveResponseItem) item; + ODataResponse res = retitem.next(); + assertTrue(res instanceof AsyncResponse); + assertEquals(202, res.getStatusCode()); + assertEquals("Accepted", res.getStatusMessage()); + + Collection newMonitorLocation = res.getHeader(HeaderName.location); + if (newMonitorLocation != null && !newMonitorLocation.isEmpty()) { + responseWrapper.forceNextMonitorCheck(URI.create(newMonitorLocation.iterator().next())); + // .... now you can start again with isDone() and getODataResponse(). + } + + assertFalse(retitem.hasNext()); + assertFalse(iter.hasNext()); + } + private static class TestStreamManager extends AbstractODataStreamManager { public TestStreamManager() {