[OLINGO-246, OLINGO-248] provided async batch support

This commit is contained in:
fmartelli 2014-04-28 13:10:40 +02:00
parent 15e7718a0c
commit 0b05798cd9
13 changed files with 415 additions and 54 deletions

View File

@ -51,12 +51,15 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.cxf.interceptor.InInterceptors; import org.apache.cxf.interceptor.InInterceptors;
import org.apache.cxf.jaxrs.ext.multipart.Attachment; import org.apache.cxf.jaxrs.ext.multipart.Attachment;
import org.apache.cxf.jaxrs.ext.multipart.Multipart;
import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;
import org.apache.olingo.commons.api.data.CollectionValue; import org.apache.olingo.commons.api.data.CollectionValue;
import org.apache.olingo.commons.api.data.ResWrap; import org.apache.olingo.commons.api.data.ResWrap;
import org.apache.olingo.commons.api.data.Entry; import org.apache.olingo.commons.api.data.Entry;
import org.apache.olingo.commons.api.data.Feed; import org.apache.olingo.commons.api.data.Feed;
import org.apache.olingo.commons.api.data.Property; import org.apache.olingo.commons.api.data.Property;
import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.core.data.AtomEntryImpl; import org.apache.olingo.commons.core.data.AtomEntryImpl;
import org.apache.olingo.commons.core.data.AtomFeedImpl; import org.apache.olingo.commons.core.data.AtomFeedImpl;
import org.apache.olingo.commons.core.data.AtomPropertyImpl; import org.apache.olingo.commons.core.data.AtomPropertyImpl;
@ -152,6 +155,54 @@ public class V4Services extends AbstractServices {
} }
} }
@POST
@Path("/async/$batch")
public Response async(
@Context final UriInfo uriInfo,
@HeaderParam("Prefer") @DefaultValue(StringUtils.EMPTY) final String prefer,
final @Multipart MultipartBody attachment) {
try {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write("HTTP/1.1 200 Ok".getBytes());
bos.write(CRLF);
bos.write("OData-Version: 4.0".getBytes());
bos.write(CRLF);
bos.write(("Content-Type: " + ContentType.APPLICATION_OCTET_STREAM + ";boundary=" + BOUNDARY).getBytes());
bos.write(CRLF);
bos.write(CRLF);
bos.write(("--" + BOUNDARY).getBytes());
bos.write(CRLF);
bos.write("Content-Type: application/http".getBytes());
bos.write(CRLF);
bos.write("Content-Transfer-Encoding: binary".getBytes());
bos.write(CRLF);
bos.write(CRLF);
bos.write("HTTP/1.1 202 Accepted".getBytes());
bos.write(CRLF);
bos.write("Location: http://service-root/async-monitor".getBytes());
bos.write(CRLF);
bos.write("Retry-After: 10".getBytes());
bos.write(CRLF);
bos.write(CRLF);
bos.write(("--" + BOUNDARY + "--").getBytes());
bos.write(CRLF);
final UUID uuid = UUID.randomUUID();
providedAsync.put(uuid.toString(), bos.toString(Constants.ENCODING.toString()));
bos.flush();
bos.close();
return xml.createAsyncResponse(
uriInfo.getRequestUri().toASCIIString().replace("async/$batch", "") + "monitor/" + uuid.toString());
} catch (Exception e) {
return xml.createFaultResponse(Accept.JSON.toString(), e);
}
}
@GET @GET
@Path("/async/{name}") @Path("/async/{name}")
public Response async( public Response async(

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.client.api.communication.request.v4;
import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
public interface AsyncBatchRequestWrapper extends AsyncRequestWrapper<ODataBatchResponse> {
/**
* Gets a changeset batch item instance. A changeset can be submitted embedded into a batch request only.
*
* @return ODataChangeset instance.
*/
ODataChangeset addChangeset();
/**
* Gets a retrieve batch item instance. A retrieve item can be submitted embedded into a batch request only.
*
* @return ODataRetrieve instance.
*/
ODataRetrieve addRetrieve();
/**
* Gets an outside change batch item instance. An outside item can be submitted embedded into a batch request only.
*
* @return ODataOutsideUpdate instance.
*/
ODataOutsideUpdate addOutsideUpdate();
}

View File

@ -19,10 +19,13 @@
package org.apache.olingo.client.api.communication.request.v4; package org.apache.olingo.client.api.communication.request.v4;
import org.apache.olingo.client.api.communication.request.ODataRequest; import org.apache.olingo.client.api.communication.request.ODataRequest;
import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public interface AsyncRequestFactory { public interface AsyncRequestFactory {
<R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest); <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest);
AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest);
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.olingo.client.api.communication.response.v4; package org.apache.olingo.client.api.communication.response.v4;
import java.net.URI;
import org.apache.olingo.client.api.communication.response.ODataDeleteResponse; import org.apache.olingo.client.api.communication.response.ODataDeleteResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
@ -49,6 +50,16 @@ public interface AsyncResponseWrapper<R extends ODataResponse> {
*/ */
R getODataResponse(); R getODataResponse();
/**
* Specifies the location for the next monitor check.
* <br />
* Overrides the location value retrieved among headers and nullifies the previous valid response (if exists).
*
* @param uri monitor location.
* @return the current async response wrapper.
*/
AsyncResponseWrapper<R> forceNextMonitorCheck(URI uri);
/** /**
* DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A * DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A
* 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled. * 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled.

View File

@ -25,6 +25,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
/** /**
* Changeset wrapper for the corresponding batch item. * Changeset wrapper for the corresponding batch item.
@ -116,11 +117,15 @@ public class ODataChangesetResponseItem extends AbstractODataBatchResponseItem {
final Map.Entry<Integer, String> responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator); final Map.Entry<Integer, String> responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator);
LOG.debug("Retrieved item response {}", responseLine); LOG.debug("Retrieved item response {}", responseLine);
if (responseLine.getKey() >= 400) {
// generate error response
final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator); final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
LOG.debug("Retrieved item headers {}", headers); LOG.debug("Retrieved item headers {}", headers);
if (responseLine.getKey() == 202) {
// generate async response
current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
return current;
} else if (responseLine.getKey() >= 400) {
// generate error response
current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
return current; return current;
} }

View File

@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG; import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse; import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
/** /**
* Retrieve response wrapper for the corresponding batch item. * Retrieve response wrapper for the corresponding batch item.
@ -58,7 +59,11 @@ public class ODataRetrieveResponseItem extends AbstractODataBatchResponseItem {
final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator); final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
LOG.debug("Retrieved item headers {}", headers); LOG.debug("Retrieved item headers {}", headers);
if (responseLine.getKey() >= 400) { if (responseLine.getKey() == 202) {
// generate async response
current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
breakingitem = true;
} else if (responseLine.getKey() >= 400) {
// generate error response // generate error response
current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary); current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
breakingitem = true; breakingitem = true;

View File

@ -122,6 +122,9 @@ public class ODataBatchRequestImpl
*/ */
protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse { protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse {
private ODataBatchResponseImpl() {
}
/** /**
* Constructor. * Constructor.
* *

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.client.core.communication.request.v4;
import java.net.URI;
import java.util.Collection;
import org.apache.commons.io.IOUtils;
import org.apache.olingo.client.api.communication.header.HeaderName;
import org.apache.olingo.client.api.communication.header.ODataPreferences;
import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
import org.apache.olingo.client.api.communication.request.batch.v4.BatchStreamManager;
import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
import org.apache.olingo.client.api.v4.ODataClient;
import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
public class AsyncBatchRequestWrapperImpl extends AsyncRequestWrapperImpl<ODataBatchResponse>
implements AsyncBatchRequestWrapper {
private BatchStreamManager batchStreamManager;
protected AsyncBatchRequestWrapperImpl(final ODataClient odataClient, final ODataBatchRequest odataRequest) {
super(odataClient, odataRequest);
batchStreamManager = odataRequest.execute();
}
/**
* {@inheritDoc}
*/
@Override
public ODataChangeset addChangeset() {
return batchStreamManager.addChangeset();
}
/**
* {@inheritDoc}
*/
@Override
public ODataRetrieve addRetrieve() {
return batchStreamManager.addRetrieve();
}
/**
* {@inheritDoc}
*/
@Override
public ODataOutsideUpdate addOutsideUpdate() {
return batchStreamManager.addOutsideUpdate();
}
@Override
public AsyncResponseWrapper<ODataBatchResponse> execute() {
return new AsyncResponseWrapperImpl(batchStreamManager.getResponse());
}
public class AsyncResponseWrapperImpl
extends AsyncRequestWrapperImpl<ODataBatchResponse>.AsyncResponseWrapperImpl {
/**
* Constructor.
*
* @param res OData batch response.
*/
public AsyncResponseWrapperImpl(final ODataBatchResponse res) {
super();
if (res.getStatusCode() == 202) {
retrieveMonitorDetails(res);
} else {
response = res;
}
}
private void retrieveMonitorDetails(final ODataBatchResponse res) {
Collection<String> headers = res.getHeader(HeaderName.location.toString());
if (headers == null || headers.isEmpty()) {
throw new AsyncRequestException("Invalid async request response. Monitor URL not found");
} else {
this.location = URI.create(headers.iterator().next());
}
headers = res.getHeader(HeaderName.retryAfter.toString());
if (headers != null && !headers.isEmpty()) {
this.retryAfter = Integer.parseInt(headers.iterator().next());
}
headers = res.getHeader(HeaderName.preferenceApplied.toString());
if (headers != null && !headers.isEmpty()) {
for (String header : headers) {
if (header.equalsIgnoreCase(new ODataPreferences(ODataServiceVersion.V40).respondAsync())) {
preferenceApplied = true;
}
}
}
IOUtils.closeQuietly(res.getRawResponse());
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.olingo.client.core.communication.request.v4; package org.apache.olingo.client.core.communication.request.v4;
import org.apache.olingo.client.api.communication.request.ODataRequest; import org.apache.olingo.client.api.communication.request.ODataRequest;
import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory;
import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper; import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
@ -39,4 +41,9 @@ public class AsyncRequestFactoryImpl implements AsyncRequestFactory {
public <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest) { public <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest) {
return new AsyncRequestWrapperImpl<R>(client, odataRequest); return new AsyncRequestWrapperImpl<R>(client, odataRequest);
} }
@Override
public AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest) {
return new AsyncBatchRequestWrapperImpl(client, odataRequest);
}
} }

View File

@ -41,7 +41,6 @@ import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapp
import org.apache.olingo.client.api.http.HttpClientException; import org.apache.olingo.client.api.http.HttpClientException;
import org.apache.olingo.client.api.http.HttpMethod; import org.apache.olingo.client.api.http.HttpMethod;
import org.apache.olingo.client.api.v4.ODataClient; import org.apache.olingo.client.api.v4.ODataClient;
import org.apache.olingo.client.core.communication.header.ODataHeadersImpl;
import org.apache.olingo.client.core.communication.request.AbstractODataRequest; import org.apache.olingo.client.core.communication.request.AbstractODataRequest;
import org.apache.olingo.client.core.communication.request.AbstractRequest; import org.apache.olingo.client.core.communication.request.AbstractRequest;
import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion; import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
@ -49,34 +48,29 @@ import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest
implements AsyncRequestWrapper<R> { implements AsyncRequestWrapper<R> {
private static final int MAX_RETRY = 5; protected static final int MAX_RETRY = 5;
private final ODataClient odataClient; protected final ODataClient odataClient;
/** /**
* Request to be wrapped. * Request to be wrapped.
*/ */
private final ODataRequest odataRequest; protected final ODataRequest odataRequest;
/** /**
* HTTP client. * HTTP client.
*/ */
private final HttpClient httpClient; protected final HttpClient httpClient;
/** /**
* HTTP request. * HTTP request.
*/ */
private final HttpUriRequest request; protected final HttpUriRequest request;
/**
* OData request header.
*/
private final ODataHeadersImpl odataHeaders;
/** /**
* Target URI. * Target URI.
*/ */
private final URI uri; protected final URI uri;
protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) { protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) {
this.odataRequest = odataRequest; this.odataRequest = odataRequest;
@ -88,9 +82,6 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
this.odataClient = odataClient; this.odataClient = odataClient;
final HttpMethod method = odataRequest.getMethod(); final HttpMethod method = odataRequest.getMethod();
// initialize default headers
this.odataHeaders = (ODataHeadersImpl) odataClient.getVersionHeaders();
// target uri // target uri
this.uri = odataRequest.getURI(); this.uri = odataRequest.getURI();
@ -104,19 +95,19 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
} }
@Override @Override
public AsyncRequestWrapper<R> wait(final int waitInSeconds) { public final AsyncRequestWrapper<R> wait(final int waitInSeconds) {
extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds)); extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds));
return this; return this;
} }
@Override @Override
public AsyncRequestWrapper<R> callback(URI url) { public final AsyncRequestWrapper<R> callback(URI url) {
extendHeader(HeaderName.prefer.toString(), extendHeader(HeaderName.prefer.toString(),
new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString())); new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString()));
return this; return this;
} }
private void extendHeader(final String headerName, final String headerValue) { protected final void extendHeader(final String headerName, final String headerValue) {
final StringBuilder extended = new StringBuilder(); final StringBuilder extended = new StringBuilder();
if (this.odataRequest.getHeaderNames().contains(headerName)) { if (this.odataRequest.getHeaderNames().contains(headerName)) {
extended.append(this.odataRequest.getHeader(headerName)).append(", "); extended.append(this.odataRequest.getHeader(headerName)).append(", ");
@ -130,7 +121,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return new AsyncResponseWrapperImpl(doExecute()); return new AsyncResponseWrapperImpl(doExecute());
} }
private HttpResponse doExecute() { protected HttpResponse doExecute() {
// Add all available headers // Add all available headers
for (String key : odataRequest.getHeaderNames()) { for (String key : odataRequest.getHeaderNames()) {
final String value = odataRequest.getHeader(key); final String value = odataRequest.getHeader(key);
@ -143,13 +134,16 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> { public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> {
private URI location = null; protected URI location = null;
private R response = null; protected R response = null;
private int retryAfter = 5; protected int retryAfter = 5;
private boolean preferenceApplied = false; protected boolean preferenceApplied = false;
public AsyncResponseWrapperImpl() {
}
/** /**
* Constructor. * Constructor.
@ -159,7 +153,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public AsyncResponseWrapperImpl(final HttpResponse res) { public AsyncResponseWrapperImpl(final HttpResponse res) {
if (res.getStatusLine().getStatusCode() == 202) { if (res.getStatusLine().getStatusCode() == 202) {
retrieveMonitorDetails(res, true); retrieveMonitorDetails(res);
} else { } else {
response = (R) ((AbstractODataRequest<?>) odataRequest).getResponseTemplate().initFromHttpResponse(res); response = (R) ((AbstractODataRequest<?>) odataRequest).getResponseTemplate().initFromHttpResponse(res);
} }
@ -177,7 +171,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
final HttpResponse res = checkMonitor(location); final HttpResponse res = checkMonitor(location);
if (res.getStatusLine().getStatusCode() == 202) { if (res.getStatusLine().getStatusCode() == 202) {
retrieveMonitorDetails(res, false); retrieveMonitorDetails(res);
} else { } else {
response = instantiateResponse(res); response = instantiateResponse(res);
} }
@ -219,18 +213,34 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return response; return response;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public ODataDeleteResponse delete() { public ODataDeleteResponse delete() {
final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location); final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location);
return deleteRequest.execute(); return deleteRequest.execute();
} }
/**
* {@inheritDoc}
*/
@Override @Override
public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() { public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() {
return odataClient.getAsyncRequestFactory().<ODataDeleteResponse>getAsyncRequestWrapper( return odataClient.getAsyncRequestFactory().<ODataDeleteResponse>getAsyncRequestWrapper(
odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute(); odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute();
} }
/**
* {@inheritDoc}
*/
@Override
public AsyncResponseWrapper<R> forceNextMonitorCheck(final URI uri) {
this.location = uri;
this.response = null;
return this;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private R instantiateResponse(final HttpResponse res) { private R instantiateResponse(final HttpResponse res) {
R odataResponse; R odataResponse;
@ -246,7 +256,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return odataResponse; return odataResponse;
} }
private void retrieveMonitorDetails(final HttpResponse res, final boolean includePreferenceApplied) { private void retrieveMonitorDetails(final HttpResponse res) {
Header[] headers = res.getHeaders(HeaderName.location.toString()); Header[] headers = res.getHeaders(HeaderName.location.toString());
if (ArrayUtils.isNotEmpty(headers)) { if (ArrayUtils.isNotEmpty(headers)) {
this.location = URI.create(headers[0].getValue()); this.location = URI.create(headers[0].getValue());
@ -276,7 +286,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
} }
} }
private HttpResponse checkMonitor(final URI location) { protected final HttpResponse checkMonitor(final URI location) {
if (location == null) { if (location == null) {
throw new AsyncRequestException("Invalid async request response. Missing monitor URL"); throw new AsyncRequestException("Invalid async request response. Missing monitor URL");
} }
@ -287,10 +297,8 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return executeHttpRequest(httpClient, monitor); return executeHttpRequest(httpClient, monitor);
} }
private HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) { protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
checkRequest(odataClient, request); final HttpResponse response;
HttpResponse response;
try { try {
response = client.execute(req); response = client.execute(req);
} catch (IOException e) { } catch (IOException e) {

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PipedInputStream; import java.io.PipedInputStream;
import java.io.PipedOutputStream; import java.io.PipedOutputStream;
import java.nio.charset.Charset;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -35,6 +34,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.olingo.client.api.communication.header.HeaderName; import org.apache.olingo.client.api.communication.header.HeaderName;
import org.apache.olingo.client.api.communication.request.ODataStreamer;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator; import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.NoContentException; import org.apache.olingo.client.api.http.NoContentException;
@ -257,10 +257,10 @@ public abstract class AbstractODataResponse implements ODataResponse {
this.headers.putAll(partHeaders); this.headers.putAll(partHeaders);
final ByteArrayOutputStream bos = new ByteArrayOutputStream(); final ByteArrayOutputStream bos = new ByteArrayOutputStream();
LOG.debug("Retrieved payload {}", bos.toString(Charset.forName("UTF-8").toString()));
while (batchLineIterator.hasNext()) { while (batchLineIterator.hasNext()) {
bos.write(batchLineIterator.nextLine().getBytes()); bos.write(batchLineIterator.nextLine().getBytes(Constants.UTF8));
bos.write(ODataStreamer.CRLF);
} }
try { try {

View File

@ -18,9 +18,13 @@
*/ */
package org.apache.olingo.client.core.communication.response.v4; package org.apache.olingo.client.core.communication.response.v4;
import java.util.Collection;
import java.util.Map;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.v4.AsyncResponse; import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
import org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
import org.apache.olingo.client.core.communication.response.AbstractODataResponse; import org.apache.olingo.client.core.communication.response.AbstractODataResponse;
/** /**
@ -46,4 +50,27 @@ public class AsyncResponseImpl extends AbstractODataResponse implements AsyncRes
public AsyncResponseImpl(final HttpClient client, final HttpResponse res) { public AsyncResponseImpl(final HttpClient client, final HttpResponse res) {
super(client, res); super(client, res);
} }
/**
* Constructor to be used inside a batch item.
*/
public AsyncResponseImpl(
final Map.Entry<Integer, String> responseLine,
final Map<String, Collection<String>> headers,
final ODataBatchLineIterator batchLineIterator,
final String boundary) {
super();
if (hasBeenInitialized) {
throw new IllegalStateException("Request already initialized");
}
this.hasBeenInitialized = true;
this.batchInfo = new ODataBatchController(batchLineIterator, boundary);
this.statusCode = responseLine.getKey();
this.statusMessage = responseLine.getValue();
this.headers.putAll(headers);
}
} }

View File

@ -26,12 +26,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.olingo.client.api.ODataBatchConstants; import org.apache.olingo.client.api.ODataBatchConstants;
import org.apache.olingo.client.api.communication.header.HeaderName;
import org.apache.olingo.client.api.communication.request.ODataStreamManager; import org.apache.olingo.client.api.communication.request.ODataStreamManager;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem; import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
import org.apache.olingo.client.api.communication.request.batch.ODataChangeset; import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
@ -44,10 +46,13 @@ import org.apache.olingo.client.api.communication.request.cud.ODataEntityUpdateR
import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType; import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType;
import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest;
import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest; import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest;
import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.response.ODataBatchResponse; import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse; import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse;
import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse; import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse; import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
import org.apache.olingo.client.api.uri.v4.URIBuilder; import org.apache.olingo.client.api.uri.v4.URIBuilder;
import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager; import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager;
import org.apache.olingo.client.core.communication.request.Wrapper; import org.apache.olingo.client.core.communication.request.Wrapper;
@ -397,7 +402,7 @@ public class BatchTestITCase extends AbstractTestITCase {
} }
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings({"unchecked"})
public void batchRequest() throws EdmPrimitiveTypeException { public void batchRequest() throws EdmPrimitiveTypeException {
// create your request // create your request
final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL); final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL);
@ -412,8 +417,7 @@ public class BatchTestITCase extends AbstractTestITCase {
// prepare URI // prepare URI
URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL); URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);//. targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);
// expand("Orders").select("PersonID,Orders/OrderID");
// create new request // create new request
ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build()); ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
@ -520,12 +524,82 @@ public class BatchTestITCase extends AbstractTestITCase {
entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res; entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res;
entity = entres.getBody(); entity = entres.getBody();
assertEquals("new last name", assertEquals("new last name", entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
assertFalse(iter.hasNext()); assertFalse(iter.hasNext());
} }
@Test
public void async() {
// create your request
final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(
URI.create(testStaticServiceRootURL + "/async/").normalize().toASCIIString());
request.setAccept(ACCEPT);
final AsyncBatchRequestWrapper async = client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request);
// -------------------------------------------
// Add retrieve item
// -------------------------------------------
ODataRetrieve retrieve = async.addRetrieve();
// prepare URI
URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
targetURI.appendEntitySetSegment("People").appendKeySegment(5);
// create new request
ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
queryReq.setFormat(ODataPubFormat.JSON);
retrieve.setRequest(queryReq);
// -------------------------------------------
// -------------------------------------------
// Add retrieve item
// -------------------------------------------
retrieve = async.addRetrieve();
// prepare URI
targetURI = client.getURIBuilder(testStaticServiceRootURL).appendEntitySetSegment("Customers").appendKeySegment(1);
// create new request
queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
retrieve.setRequest(queryReq);
// -------------------------------------------
final AsyncResponseWrapper<ODataBatchResponse> responseWrapper = async.execute();
assertTrue(responseWrapper.isPreferenceApplied());
assertTrue(responseWrapper.isDone());
final ODataBatchResponse response = responseWrapper.getODataResponse();
assertEquals(200, response.getStatusCode());
assertEquals("Ok", response.getStatusMessage());
final Iterator<ODataBatchResponseItem> iter = response.getBody();
// retrieve the first item (ODataRetrieve)
ODataBatchResponseItem item = iter.next();
assertTrue(item instanceof ODataRetrieveResponseItem);
// The service return interim results to an asynchronously executing batch.
ODataRetrieveResponseItem retitem = (ODataRetrieveResponseItem) item;
ODataResponse res = retitem.next();
assertTrue(res instanceof AsyncResponse);
assertEquals(202, res.getStatusCode());
assertEquals("Accepted", res.getStatusMessage());
Collection<String> newMonitorLocation = res.getHeader(HeaderName.location);
if (newMonitorLocation != null && !newMonitorLocation.isEmpty()) {
responseWrapper.forceNextMonitorCheck(URI.create(newMonitorLocation.iterator().next()));
// .... now you can start again with isDone() and getODataResponse().
}
assertFalse(retitem.hasNext());
assertFalse(iter.hasNext());
}
private static class TestStreamManager extends AbstractODataStreamManager<ODataBatchResponse> { private static class TestStreamManager extends AbstractODataStreamManager<ODataBatchResponse> {
public TestStreamManager() { public TestStreamManager() {