[OLINGO-832] ODataJsonSerializer changes for streaming

This commit is contained in:
Michael Bolz 2016-02-19 15:11:40 +01:00
parent 396a39baec
commit 4aa1277a12
5 changed files with 71 additions and 203 deletions

View File

@ -27,7 +27,4 @@ public abstract class AbstractEntityCollection extends AbstractODataObject imple
public abstract URI getNext();
public abstract URI getDeltaLink();
//
// @Override
// Iterator<Entity> iterator();
}

View File

@ -65,51 +65,58 @@ public class ODataWritableContent implements ODataContent {
EntityCollectionSerializerOptions options, String tail) {
this.coll = coll;
this.entityType = entityType;
this.head = ByteBuffer.wrap(head.getBytes(DEFAULT));
this.head = head == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(head.getBytes(DEFAULT));
this.jsonSerializer = jsonSerializer;
this.metadata = metadata;
this.options = options;
this.tail = ByteBuffer.wrap(tail.getBytes(DEFAULT));
this.tail = tail == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(tail.getBytes(DEFAULT));
}
public boolean write(OutputStream out) throws IOException {
if(head.hasRemaining()) {
out.write(head.array());
head.flip();
return true;
}
if (coll.hasNext()) {
try {
writeEntity(coll.next(), out);
if(coll.hasNext()) {
out.write(",".getBytes(DEFAULT));
}
return true;
} catch (SerializerException e) {
final WriteContentErrorCallback errorCallback = options.getWriteContentErrorCallback();
if(errorCallback != null) {
final ErrorContext errorContext = new ErrorContext(e).setParameter("Sample", "Some exception happened.");
errorCallback.handleError(errorContext, Channels.newChannel(out));
}
}
} else if(tail.hasRemaining()) {
out.write(tail.array());
tail.flip();
return true;
}
return false;
}
// public boolean write(OutputStream out) throws IOException {
// if(head.hasRemaining()) {
// out.write(head.array());
// head.flip();
// return true;
// }
// if (coll.hasNext()) {
// try {
// writeEntity(coll.next(), out);
// if(coll.hasNext()) {
// out.write(",".getBytes(DEFAULT));
// }
// return true;
// } catch (SerializerException e) {
// final WriteContentErrorCallback errorCallback = options.getWriteContentErrorCallback();
// if(errorCallback != null) {
// final ErrorContext errorContext = new ErrorContext(e).setParameter("Sample", "Some exception happened.");
// errorCallback.handleError(errorContext, Channels.newChannel(out));
// }
// }
// } else if(tail.hasRemaining()) {
// out.write(tail.array());
// tail.flip();
// return true;
// }
// return false;
// }
private void writeEntity(Entity entity, OutputStream outputStream) throws SerializerException {
public void write(OutputStream out) {
try {
JsonGenerator json = new JsonFactory().createGenerator(outputStream);
jsonSerializer.writeEntity(metadata, entityType, entity, null,
options == null ? null : options.getExpand(),
options == null ? null : options.getSelect(),
options != null && options.getWriteOnlyReferences(),
json);
json.flush();
writeEntity(coll, out);
} catch (SerializerException e) {
final WriteContentErrorCallback errorCallback = options.getWriteContentErrorCallback();
if(errorCallback != null) {
final ErrorContext errorContext = new ErrorContext(e).setParameter("Sample", "Some exception happened.");
errorCallback.handleError(errorContext, Channels.newChannel(out));
}
}
}
private void writeEntity(EntityIterator entity, OutputStream outputStream) throws SerializerException {
try {
jsonSerializer.entityCollectionIntoStream(metadata, entityType, entity, options, outputStream);
outputStream.flush();
} catch (final IOException e) {
throw new ODataRuntimeException("Failed entity serialization");
}
@ -204,14 +211,7 @@ public class ODataWritableContent implements ODataContent {
@Override
public void write(WritableByteChannel writeChannel) {
try {
boolean contentAvailable = true;
while(contentAvailable) {
contentAvailable = this.channel.write(Channels.newOutputStream(writeChannel));
}
} catch (IOException e) {
e.printStackTrace();
}
this.channel.write(Channels.newOutputStream(writeChannel));
}
@Override

View File

@ -18,10 +18,8 @@
*/
package org.apache.olingo.server.core.serializer.json;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -184,8 +182,16 @@ public class ODataJsonSerializer extends AbstractODataSerializer {
public SerializerStreamResult entityCollectionStreamed(ServiceMetadata metadata, EdmEntityType entityType,
EntityIterator entities, EntityCollectionSerializerOptions options) throws SerializerException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
SerializerException cachedException = null;
return ODataWritableContent.with(entities, entityType, this, metadata, options).build();
}
public void entityCollectionIntoStream(final ServiceMetadata metadata,
final EdmEntityType entityType, final EntityIterator entitySet,
final EntityCollectionSerializerOptions options, final OutputStream outputStream)
throws SerializerException {
SerializerException cachedException;
try {
JsonGenerator json = new JsonFactory().createGenerator(outputStream);
json.writeStartObject();
@ -196,29 +202,23 @@ public class ODataJsonSerializer extends AbstractODataSerializer {
writeMetadataETag(metadata, json);
if (options != null && options.getCount() != null && options.getCount().getValue()) {
writeCount(entities, json);
writeCount(entitySet, json);
}
json.writeFieldName(Constants.VALUE);
json.writeStartArray();
if (options == null) {
writeEntitySet(metadata, entityType, entitySet, null, null, false, json);
} else {
writeEntitySet(metadata, entityType, entitySet,
options.getExpand(), options.getSelect(), options.getWriteOnlyReferences(), json);
}
// next link not supported by default for streaming results
// writeNextLink(entitySet, json);
json.close();
outputStream.close();
String temp = new String(outputStream.toByteArray(), Charset.forName("UTF-8"));
String head = temp.substring(0, temp.length()-2);
outputStream = new ByteArrayOutputStream();
outputStream.write(']');
outputStream.write('}');
outputStream.close();
String tail = new String(outputStream.toByteArray(), Charset.forName("UTF-8"));
return ODataWritableContent.with(entities, entityType, this, metadata, options)
.addHead(head).addTail(tail).build();
} catch (final IOException e) {
cachedException =
new SerializerException(IO_EXCEPTION_TEXT, e, SerializerException.MessageKeys.IO_EXCEPTION);
throw cachedException;
} finally {
closeCircleStreamBufferOutput(outputStream, cachedException);
}
}

View File

@ -18,16 +18,8 @@
*/
package org.apache.olingo.server.tecsvc.processor;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.olingo.commons.api.data.ContextURL;
import org.apache.olingo.commons.api.data.ContextURL.Builder;
@ -35,8 +27,6 @@ import org.apache.olingo.commons.api.data.ContextURL.Suffix;
import org.apache.olingo.commons.api.data.Entity;
import org.apache.olingo.commons.api.data.EntityCollection;
import org.apache.olingo.commons.api.data.EntityIterator;
import org.apache.olingo.commons.api.data.Property;
import org.apache.olingo.commons.api.data.ValueType;
import org.apache.olingo.commons.api.edm.EdmEntitySet;
import org.apache.olingo.commons.api.edm.EdmEntityType;
import org.apache.olingo.commons.api.format.ContentType;
@ -44,12 +34,10 @@ import org.apache.olingo.commons.api.http.HttpHeader;
import org.apache.olingo.commons.api.http.HttpMethod;
import org.apache.olingo.commons.api.http.HttpStatusCode;
import org.apache.olingo.server.api.ODataApplicationException;
import org.apache.olingo.server.api.ODataContent;
import org.apache.olingo.server.api.ODataLibraryException;
import org.apache.olingo.server.api.ODataRequest;
import org.apache.olingo.server.api.ODataResponse;
import org.apache.olingo.server.api.ServiceMetadata;
import org.apache.olingo.server.api.WriteContentErrorCallback;
import org.apache.olingo.server.api.deserializer.DeserializerResult;
import org.apache.olingo.server.api.deserializer.ODataDeserializer;
import org.apache.olingo.server.api.prefer.Preferences.Return;
@ -540,7 +528,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
response.setContent(serializerResult.getContent());
} else {
final SerializerStreamResult serializerResult =
serializeEntityStreamCollectionFixed(request,
serializeEntityCollectionStreamed(request,
entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType,
expand, select, countOption, id);
@ -556,8 +544,8 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
}
}
// just for demonstration
private SerializerStreamResult serializeEntityStreamCollectionFixed(final ODataRequest request,
// serialise as streamed collection
private SerializerStreamResult serializeEntityCollectionStreamed(final ODataRequest request,
final EntityCollection entityCollection, final EdmEntitySet edmEntitySet,
final EdmEntityType edmEntityType,
final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,
@ -573,66 +561,8 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
@Override
public Entity next() {
Entity next = entityIterator.next();
// replacePrimitiveProperty(next, "PropertyString", generateData(28192));
replacePrimitiveProperty(next, "PropertyString", generateData(request));
// next.addProperty(new Property(null, "PropertyString", ValueType.PRIMITIVE, generateData(28192)));
sleep(request, 2500);
return next;
return entityIterator.next();
}
// @Override
// public List<Entity> getEntities() {
// return entityCollection.getEntities();
// }
private void replacePrimitiveProperty(Entity entity, String name, Object data) {
List<Property> properties = entity.getProperties();
int pos = 0;
for (Property property : properties) {
if(name.equals(property.getName())) {
properties.remove(pos);
entity.addProperty(new Property(null, name, ValueType.PRIMITIVE, data));
break;
}
pos++;
}
}
private void sleep(ODataRequest request, int defaultTimeMs) {
String sleepTimeMs = request.getHeader("StreamSleep");
if(sleepTimeMs != null) {
try {
defaultTimeMs = Integer.parseInt(sleepTimeMs);
} catch (NumberFormatException e) { }
}
try {
TimeUnit.MILLISECONDS.sleep(defaultTimeMs);
} catch (InterruptedException e) { }
}
private String generateData(ODataRequest request) {
String streamHeader = request.getHeader("StreamData");
if(streamHeader != null) {
try {
return generateData(Integer.parseInt(streamHeader));
} catch (NumberFormatException e) { }
}
return generateData(28192);
}
private String generateData(final int len) {
Random random = new Random();
StringBuilder b = new StringBuilder(len);
for (int j = 0; j < len; j++) {
final char c = (char) ('A' + random.nextInt('Z' - 'A' + 1));
b.append(c);
}
return b.toString();
}
};
return odata.createSerializer(requestedFormat).entityCollectionStreamed(
@ -648,59 +578,6 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
.build());
}
private SerializerResult serializeEntityStreamCollection(final ODataRequest request,
final EntityCollection entityCollection, final EdmEntitySet edmEntitySet,
final EdmEntityType edmEntityType,
final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,
final CountOption countOption, final String id) throws ODataLibraryException {
EntityIterator streamCollection = new EntityIterator() {
Iterator<Entity> test = entityCollection.getEntities().iterator();
@Override
public boolean hasNext() {
return test.hasNext();
}
@Override
public Entity next() {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) { }
return test.next();
}
};
return odata.createSerializer(requestedFormat).entityCollection(
serviceMetadata,
edmEntityType,
streamCollection,
EntityCollectionSerializerOptions.with()
.contextURL(isODataMetadataNone(requestedFormat) ? null :
getContextUrl(request.getRawODataPath(), edmEntitySet, edmEntityType, false, expand, select))
.count(countOption)
.expand(expand).select(select)
.id(id)
.build());
}
private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection
entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType,
final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,
final CountOption countOption, String id) throws ODataLibraryException {
return odata.createSerializer(requestedFormat).entityCollection(
serviceMetadata,
edmEntityType,
entityCollection,
EntityCollectionSerializerOptions.with()
.contextURL(isODataMetadataNone(requestedFormat) ? null :
getContextUrl(request.getRawODataPath(), edmEntitySet, edmEntityType, false, expand, select))
.count(countOption)
.expand(expand).select(select)
.id(id)
.build());
}
private SerializerResult serializeReferenceCollection(final EntityCollection entityCollection,
final EdmEntitySet edmEntitySet, final ContentType requestedFormat, final CountOption countOption)
throws ODataLibraryException {

View File

@ -297,13 +297,7 @@ public class ODataJsonSerializerTest {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
result.write(bout);
final String resultString = new String(bout.toByteArray(), "UTF-8");
Assert.assertThat(resultString, CoreMatchers.startsWith("{"
+ "\"@odata.context\":\"$metadata#ESAllPrim\","
+ "\"@odata.metadataEtag\":\"W/\\\"metadataETag\\\"\","
+ "\"value\":"));
Assert.assertThat(resultString, CoreMatchers.endsWith(
"[ERROR: MISSING_PROPERTY"));
Assert.assertEquals(resultString, "ERROR: MISSING_PROPERTY");
}