From 885ec27ef2411697c9e8b82e12c77e54e4c2710d Mon Sep 17 00:00:00 2001 From: Michael Bolz Date: Fri, 8 Jan 2016 09:45:09 +0100 Subject: [PATCH] [OLINGO-832] Added additional methods for Java NIO Channels --- .../olingo/server/api/ODataResponse.java | 13 ++ .../api/serializer/SerializerResult.java | 5 + .../server/core/ODataHttpHandlerImpl.java | 8 +- .../serializer/ChannelSerializerResult.java | 201 ++++++++++++++++++ .../core/serializer/SerializerResultImpl.java | 13 ++ .../serializer/StreamSerializerResult.java | 14 ++ .../json/ODataJsonStreamSerializer.java | 5 +- .../serializer/utils/CircleStreamBuffer.java | 20 ++ .../processor/TechnicalEntityProcessor.java | 11 +- 9 files changed, 285 insertions(+), 5 deletions(-) create mode 100644 lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java index 3b63af70f..a4dc7e0c2 100644 --- a/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java +++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/ODataResponse.java @@ -19,6 +19,7 @@ package org.apache.olingo.server.api; import java.io.InputStream; +import java.nio.channels.ReadableByteChannel; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ public class ODataResponse { private int statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(); private final HttpHeaders headers = new HttpHeaders(); private InputStream content; + private ReadableByteChannel channel; /** * Sets the status code. @@ -132,4 +134,15 @@ public class ODataResponse { return content; } + public void setChannel(final ReadableByteChannel channel) { + this.channel = channel; + } + + public ReadableByteChannel getChannel() { + return channel; + } + + public boolean isChannelAvailable() { + return channel != null; + } } diff --git a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java index edf3ac89b..fe2350205 100644 --- a/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java +++ b/lib/server-api/src/main/java/org/apache/olingo/server/api/serializer/SerializerResult.java @@ -19,6 +19,7 @@ package org.apache.olingo.server.api.serializer; import java.io.InputStream; +import java.nio.channels.ReadableByteChannel; /** * Result type for {@link ODataSerializer} methods @@ -29,4 +30,8 @@ public interface SerializerResult { * @return serialized content */ InputStream getContent(); + + ReadableByteChannel getChannel(); + + boolean isNioSupported(); } diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java index 162494304..59d79724b 100644 --- a/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/ODataHttpHandlerImpl.java @@ -149,7 +149,7 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler { } } - if (odResponse.getContent() != null) { + if (odResponse.getContent() != null || odResponse.isChannelAvailable()) { copyContent(odResponse, response); } } @@ -160,7 +160,11 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler { try { ByteBuffer inBuffer = ByteBuffer.allocate(COPY_BUFFER_SIZE); output = Channels.newChannel(servletResponse.getOutputStream()); - input = Channels.newChannel(odataResponse.getContent()); + if(odataResponse.isChannelAvailable()) { + input = odataResponse.getChannel(); + } else { + input = Channels.newChannel(odataResponse.getContent()); + } while (input.read(inBuffer) > 0) { inBuffer.flip(); output.write(inBuffer); diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java new file mode 100644 index 000000000..1d4c32ff1 --- /dev/null +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/ChannelSerializerResult.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.olingo.server.core.serializer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.Charset; + +import org.apache.olingo.commons.api.data.Entity; +import org.apache.olingo.commons.api.data.EntityStreamCollection; +import org.apache.olingo.commons.api.edm.EdmEntityType; +import org.apache.olingo.server.api.ServiceMetadata; +import org.apache.olingo.server.api.serializer.EntitySerializerOptions; +import org.apache.olingo.server.api.serializer.SerializerException; +import org.apache.olingo.server.api.serializer.SerializerResult; +import org.apache.olingo.server.core.serializer.json.ODataJsonStreamSerializer; +import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; + +public class ChannelSerializerResult implements SerializerResult { + private ReadableByteChannel channel; + + private static class StreamChannel implements ReadableByteChannel { + private static final Charset DEFAULT = Charset.forName("UTF-8"); + private ByteBuffer head; + private ByteBuffer tail; + private ODataJsonStreamSerializer jsonSerializer; + private EntityStreamCollection coll; + private ServiceMetadata metadata; + private EdmEntityType entityType; + private EntitySerializerOptions options; + + public StreamChannel(EntityStreamCollection coll, EdmEntityType entityType, String head, + ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, + EntitySerializerOptions options, String tail) { + this.coll = coll; + this.entityType = entityType; + this.head = ByteBuffer.wrap(head.getBytes(DEFAULT)); + this.jsonSerializer = jsonSerializer; + this.metadata = metadata; + this.options = options; + this.tail = ByteBuffer.wrap(tail.getBytes(DEFAULT)); + } + + @Override + public int read(ByteBuffer dest) throws IOException { + ByteBuffer buffer = getCurrentBuffer(); + if (buffer != null && buffer.hasRemaining()) { + int r = buffer.remaining(); + if(r <= dest.remaining()) { + dest.put(buffer); + } else { + byte[] buf = new byte[dest.remaining()]; + buffer.get(buf); + dest.put(buf); + } + return r; + } + return -1; + } + + ByteBuffer currentBuffer; + + private ByteBuffer getCurrentBuffer() { + if(currentBuffer == null) { + currentBuffer = head; + } if(!currentBuffer.hasRemaining()) { + if (coll.hasNext()) { + try { + // FIXME: mibo_160108: Inefficient buffer handling, replace + currentBuffer = serEntity(coll.nextEntity()); + if(coll.hasNext()) { + ByteBuffer b = ByteBuffer.allocate(currentBuffer.position() + 1); + currentBuffer.flip(); + b.put(currentBuffer).put(",".getBytes(DEFAULT)); + currentBuffer = b; + } + currentBuffer.flip(); + } catch (SerializerException e) { + return getCurrentBuffer(); + } + } else if(tail.hasRemaining()) { + currentBuffer = tail; + } else { + return null; + } + } + return currentBuffer; + } + + private ByteBuffer serEntity(Entity entity) throws SerializerException { + try { + CircleStreamBuffer buffer = new CircleStreamBuffer(); + OutputStream outputStream = buffer.getOutputStream(); + JsonGenerator json = new JsonFactory().createGenerator(outputStream); + jsonSerializer.writeEntity(metadata, entityType, entity, null, + options == null ? null : options.getExpand(), + options == null ? null : options.getSelect(), + options != null && options.getWriteOnlyReferences(), + json); + + json.close(); + outputStream.close(); + return buffer.getBuffer(); + } catch (final IOException e) { + return ByteBuffer.wrap(("ERROR" + e.getMessage()).getBytes()); + } + } + + + @Override + public boolean isOpen() { + return false; + } + + @Override + public void close() throws IOException { + + } + } + + @Override + public InputStream getContent() { + return Channels.newInputStream(this.channel); + } + + @Override + public ReadableByteChannel getChannel() { + return this.channel; + } + + @Override + public boolean isNioSupported() { + return true; + } + + private ChannelSerializerResult(ReadableByteChannel channel) { + this.channel = channel; + } + + public static SerializerResultBuilder with(EntityStreamCollection coll, EdmEntityType entityType, + ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) { + return new SerializerResultBuilder(coll, entityType, jsonSerializer, metadata, options); + } + + public static class SerializerResultBuilder { + private ODataJsonStreamSerializer jsonSerializer; + private EntityStreamCollection coll; + private ServiceMetadata metadata; + private EdmEntityType entityType; + private EntitySerializerOptions options; + private String head; + private String tail; + + public SerializerResultBuilder(EntityStreamCollection coll, EdmEntityType entityType, + ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) { + this.coll = coll; + this.entityType = entityType; + this.jsonSerializer = jsonSerializer; + this.metadata = metadata; + this.options = options; + } + + public SerializerResultBuilder addHead(String head) { + this.head = head; + return this; + } + + public SerializerResultBuilder addTail(String tail) { + this.tail = tail; + return this; + } + + public SerializerResult build() { + ReadableByteChannel input = new StreamChannel(coll, entityType, head, jsonSerializer, metadata, options, tail); + return new ChannelSerializerResult(input); + } + } +} diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java index 53dca1999..5a5364ad4 100644 --- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/SerializerResultImpl.java @@ -19,6 +19,9 @@ package org.apache.olingo.server.core.serializer; import java.io.InputStream; +import java.nio.channels.Channel; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import org.apache.olingo.server.api.serializer.SerializerResult; @@ -30,6 +33,16 @@ public class SerializerResultImpl implements SerializerResult { return content; } + @Override + public ReadableByteChannel getChannel() { + return Channels.newChannel(getContent()); + } + + @Override + public boolean isNioSupported() { + return false; + } + public static SerializerResultBuilder with() { return new SerializerResultBuilder(); } diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java index d45c59466..e4c8051ed 100644 --- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/StreamSerializerResult.java @@ -34,6 +34,10 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channel; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; public class StreamSerializerResult implements SerializerResult { private InputStream content; @@ -121,6 +125,16 @@ public class StreamSerializerResult implements SerializerResult { return content; } + @Override + public ReadableByteChannel getChannel() { + return Channels.newChannel(getContent()); + } + + @Override + public boolean isNioSupported() { + return true; + } + private StreamSerializerResult(InputStream content) { this.content = content; } diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java index 08a30c648..110d4161b 100644 --- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/json/ODataJsonStreamSerializer.java @@ -58,6 +58,7 @@ import org.apache.olingo.server.api.uri.queryoption.ExpandItem; import org.apache.olingo.server.api.uri.queryoption.ExpandOption; import org.apache.olingo.server.api.uri.queryoption.SelectOption; import org.apache.olingo.server.core.serializer.AbstractODataSerializer; +import org.apache.olingo.server.core.serializer.ChannelSerializerResult; import org.apache.olingo.server.core.serializer.SerializerResultImpl; import org.apache.olingo.server.core.serializer.StreamSerializerResult; import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer; @@ -134,7 +135,9 @@ public class ODataJsonStreamSerializer extends ODataJsonSerializer { opt.expand(options.getExpand()).select(options .getSelect()).writeOnlyReferences(options.getWriteOnlyReferences()); } - return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build()) +// return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build()) +// .addHead(head).addTail(tail).build(); + return ChannelSerializerResult.with(coll, entityType, this, metadata, opt.build()) .addHead(head).addTail(tail).build(); } catch (final IOException e) { cachedException = diff --git a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java index 20d9ca539..b7ba2f2ae 100644 --- a/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java +++ b/lib/server-core/src/main/java/org/apache/olingo/server/core/serializer/utils/CircleStreamBuffer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; @@ -193,6 +194,25 @@ public class CircleStreamBuffer { return readBuffer.get(); } + public ByteBuffer getBuffer() throws IOException { + if (readClosed) { + throw new IOException("Tried to read from closed stream."); + } + writeMode = false; + + // FIXME: mibo_160108: This is not efficient and only for test/poc reasons + int reqSize = 0; + for (ByteBuffer byteBuffer : bufferQueue) { + reqSize += byteBuffer.position(); + } + ByteBuffer tmp = ByteBuffer.allocateDirect(reqSize); + for (ByteBuffer byteBuffer : bufferQueue) { + byteBuffer.flip(); + tmp.put(byteBuffer); + } + return tmp; + } + // ############################################# // # // # Writing parts diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java index 6644f1ecc..f8fa7c801 100644 --- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java +++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/processor/TechnicalEntityProcessor.java @@ -536,8 +536,11 @@ public class TechnicalEntityProcessor extends TechnicalProcessor serializeEntityStreamCollectionFixed(request, entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType, expand, select, countOption, id); - response.setContent(serializerResult.getContent()); - + if(serializerResult.isNioSupported()) { + response.setChannel(serializerResult.getChannel()); + } else { + response.setContent(serializerResult.getContent()); + } response.setStatusCode(HttpStatusCode.OK.getStatusCode()); response.setHeader(HttpHeader.CONTENT_TYPE, requestedContentType.toContentTypeString()); if (pageSize != null) { @@ -631,6 +634,9 @@ public class TechnicalEntityProcessor extends TechnicalProcessor @Override public Entity nextEntity() { + try { + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { } return test.next(); } }; @@ -647,6 +653,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor .build()); } + private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType, final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,