[OLINGO-832] Added additional methods for Java NIO Channels

This commit is contained in:
Michael Bolz 2016-01-08 09:45:09 +01:00
parent f4ad8892ad
commit 885ec27ef2
9 changed files with 285 additions and 5 deletions

View File

@ -19,6 +19,7 @@
package org.apache.olingo.server.api; package org.apache.olingo.server.api;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,6 +33,7 @@ public class ODataResponse {
private int statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(); private int statusCode = HttpStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
private final HttpHeaders headers = new HttpHeaders(); private final HttpHeaders headers = new HttpHeaders();
private InputStream content; private InputStream content;
private ReadableByteChannel channel;
/** /**
* Sets the status code. * Sets the status code.
@ -132,4 +134,15 @@ public class ODataResponse {
return content; return content;
} }
public void setChannel(final ReadableByteChannel channel) {
this.channel = channel;
}
public ReadableByteChannel getChannel() {
return channel;
}
public boolean isChannelAvailable() {
return channel != null;
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.olingo.server.api.serializer; package org.apache.olingo.server.api.serializer;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
/** /**
* Result type for {@link ODataSerializer} methods * Result type for {@link ODataSerializer} methods
@ -29,4 +30,8 @@ public interface SerializerResult {
* @return serialized content * @return serialized content
*/ */
InputStream getContent(); InputStream getContent();
ReadableByteChannel getChannel();
boolean isNioSupported();
} }

View File

@ -149,7 +149,7 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
} }
} }
if (odResponse.getContent() != null) { if (odResponse.getContent() != null || odResponse.isChannelAvailable()) {
copyContent(odResponse, response); copyContent(odResponse, response);
} }
} }
@ -160,7 +160,11 @@ public class ODataHttpHandlerImpl implements ODataHttpHandler {
try { try {
ByteBuffer inBuffer = ByteBuffer.allocate(COPY_BUFFER_SIZE); ByteBuffer inBuffer = ByteBuffer.allocate(COPY_BUFFER_SIZE);
output = Channels.newChannel(servletResponse.getOutputStream()); output = Channels.newChannel(servletResponse.getOutputStream());
input = Channels.newChannel(odataResponse.getContent()); if(odataResponse.isChannelAvailable()) {
input = odataResponse.getChannel();
} else {
input = Channels.newChannel(odataResponse.getContent());
}
while (input.read(inBuffer) > 0) { while (input.read(inBuffer) > 0) {
inBuffer.flip(); inBuffer.flip();
output.write(inBuffer); output.write(inBuffer);

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.olingo.server.core.serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import org.apache.olingo.commons.api.data.Entity;
import org.apache.olingo.commons.api.data.EntityStreamCollection;
import org.apache.olingo.commons.api.edm.EdmEntityType;
import org.apache.olingo.server.api.ServiceMetadata;
import org.apache.olingo.server.api.serializer.EntitySerializerOptions;
import org.apache.olingo.server.api.serializer.SerializerException;
import org.apache.olingo.server.api.serializer.SerializerResult;
import org.apache.olingo.server.core.serializer.json.ODataJsonStreamSerializer;
import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
public class ChannelSerializerResult implements SerializerResult {
private ReadableByteChannel channel;
private static class StreamChannel implements ReadableByteChannel {
private static final Charset DEFAULT = Charset.forName("UTF-8");
private ByteBuffer head;
private ByteBuffer tail;
private ODataJsonStreamSerializer jsonSerializer;
private EntityStreamCollection coll;
private ServiceMetadata metadata;
private EdmEntityType entityType;
private EntitySerializerOptions options;
public StreamChannel(EntityStreamCollection coll, EdmEntityType entityType, String head,
ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata,
EntitySerializerOptions options, String tail) {
this.coll = coll;
this.entityType = entityType;
this.head = ByteBuffer.wrap(head.getBytes(DEFAULT));
this.jsonSerializer = jsonSerializer;
this.metadata = metadata;
this.options = options;
this.tail = ByteBuffer.wrap(tail.getBytes(DEFAULT));
}
@Override
public int read(ByteBuffer dest) throws IOException {
ByteBuffer buffer = getCurrentBuffer();
if (buffer != null && buffer.hasRemaining()) {
int r = buffer.remaining();
if(r <= dest.remaining()) {
dest.put(buffer);
} else {
byte[] buf = new byte[dest.remaining()];
buffer.get(buf);
dest.put(buf);
}
return r;
}
return -1;
}
ByteBuffer currentBuffer;
private ByteBuffer getCurrentBuffer() {
if(currentBuffer == null) {
currentBuffer = head;
} if(!currentBuffer.hasRemaining()) {
if (coll.hasNext()) {
try {
// FIXME: mibo_160108: Inefficient buffer handling, replace
currentBuffer = serEntity(coll.nextEntity());
if(coll.hasNext()) {
ByteBuffer b = ByteBuffer.allocate(currentBuffer.position() + 1);
currentBuffer.flip();
b.put(currentBuffer).put(",".getBytes(DEFAULT));
currentBuffer = b;
}
currentBuffer.flip();
} catch (SerializerException e) {
return getCurrentBuffer();
}
} else if(tail.hasRemaining()) {
currentBuffer = tail;
} else {
return null;
}
}
return currentBuffer;
}
private ByteBuffer serEntity(Entity entity) throws SerializerException {
try {
CircleStreamBuffer buffer = new CircleStreamBuffer();
OutputStream outputStream = buffer.getOutputStream();
JsonGenerator json = new JsonFactory().createGenerator(outputStream);
jsonSerializer.writeEntity(metadata, entityType, entity, null,
options == null ? null : options.getExpand(),
options == null ? null : options.getSelect(),
options != null && options.getWriteOnlyReferences(),
json);
json.close();
outputStream.close();
return buffer.getBuffer();
} catch (final IOException e) {
return ByteBuffer.wrap(("ERROR" + e.getMessage()).getBytes());
}
}
@Override
public boolean isOpen() {
return false;
}
@Override
public void close() throws IOException {
}
}
@Override
public InputStream getContent() {
return Channels.newInputStream(this.channel);
}
@Override
public ReadableByteChannel getChannel() {
return this.channel;
}
@Override
public boolean isNioSupported() {
return true;
}
private ChannelSerializerResult(ReadableByteChannel channel) {
this.channel = channel;
}
public static SerializerResultBuilder with(EntityStreamCollection coll, EdmEntityType entityType,
ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
return new SerializerResultBuilder(coll, entityType, jsonSerializer, metadata, options);
}
public static class SerializerResultBuilder {
private ODataJsonStreamSerializer jsonSerializer;
private EntityStreamCollection coll;
private ServiceMetadata metadata;
private EdmEntityType entityType;
private EntitySerializerOptions options;
private String head;
private String tail;
public SerializerResultBuilder(EntityStreamCollection coll, EdmEntityType entityType,
ODataJsonStreamSerializer jsonSerializer, ServiceMetadata metadata, EntitySerializerOptions options) {
this.coll = coll;
this.entityType = entityType;
this.jsonSerializer = jsonSerializer;
this.metadata = metadata;
this.options = options;
}
public SerializerResultBuilder addHead(String head) {
this.head = head;
return this;
}
public SerializerResultBuilder addTail(String tail) {
this.tail = tail;
return this;
}
public SerializerResult build() {
ReadableByteChannel input = new StreamChannel(coll, entityType, head, jsonSerializer, metadata, options, tail);
return new ChannelSerializerResult(input);
}
}
}

View File

@ -19,6 +19,9 @@
package org.apache.olingo.server.core.serializer; package org.apache.olingo.server.core.serializer;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.olingo.server.api.serializer.SerializerResult; import org.apache.olingo.server.api.serializer.SerializerResult;
@ -30,6 +33,16 @@ public class SerializerResultImpl implements SerializerResult {
return content; return content;
} }
@Override
public ReadableByteChannel getChannel() {
return Channels.newChannel(getContent());
}
@Override
public boolean isNioSupported() {
return false;
}
public static SerializerResultBuilder with() { public static SerializerResultBuilder with() {
return new SerializerResultBuilder(); return new SerializerResultBuilder();
} }

View File

@ -34,6 +34,10 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
public class StreamSerializerResult implements SerializerResult { public class StreamSerializerResult implements SerializerResult {
private InputStream content; private InputStream content;
@ -121,6 +125,16 @@ public class StreamSerializerResult implements SerializerResult {
return content; return content;
} }
@Override
public ReadableByteChannel getChannel() {
return Channels.newChannel(getContent());
}
@Override
public boolean isNioSupported() {
return true;
}
private StreamSerializerResult(InputStream content) { private StreamSerializerResult(InputStream content) {
this.content = content; this.content = content;
} }

View File

@ -58,6 +58,7 @@ import org.apache.olingo.server.api.uri.queryoption.ExpandItem;
import org.apache.olingo.server.api.uri.queryoption.ExpandOption; import org.apache.olingo.server.api.uri.queryoption.ExpandOption;
import org.apache.olingo.server.api.uri.queryoption.SelectOption; import org.apache.olingo.server.api.uri.queryoption.SelectOption;
import org.apache.olingo.server.core.serializer.AbstractODataSerializer; import org.apache.olingo.server.core.serializer.AbstractODataSerializer;
import org.apache.olingo.server.core.serializer.ChannelSerializerResult;
import org.apache.olingo.server.core.serializer.SerializerResultImpl; import org.apache.olingo.server.core.serializer.SerializerResultImpl;
import org.apache.olingo.server.core.serializer.StreamSerializerResult; import org.apache.olingo.server.core.serializer.StreamSerializerResult;
import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer; import org.apache.olingo.server.core.serializer.utils.CircleStreamBuffer;
@ -134,7 +135,9 @@ public class ODataJsonStreamSerializer extends ODataJsonSerializer {
opt.expand(options.getExpand()).select(options opt.expand(options.getExpand()).select(options
.getSelect()).writeOnlyReferences(options.getWriteOnlyReferences()); .getSelect()).writeOnlyReferences(options.getWriteOnlyReferences());
} }
return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build()) // return StreamSerializerResult.with(coll, entityType, this, metadata, opt.build())
// .addHead(head).addTail(tail).build();
return ChannelSerializerResult.with(coll, entityType, this, metadata, opt.build())
.addHead(head).addTail(tail).build(); .addHead(head).addTail(tail).build();
} catch (final IOException e) { } catch (final IOException e) {
cachedException = cachedException =

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -193,6 +194,25 @@ public class CircleStreamBuffer {
return readBuffer.get(); return readBuffer.get();
} }
public ByteBuffer getBuffer() throws IOException {
if (readClosed) {
throw new IOException("Tried to read from closed stream.");
}
writeMode = false;
// FIXME: mibo_160108: This is not efficient and only for test/poc reasons
int reqSize = 0;
for (ByteBuffer byteBuffer : bufferQueue) {
reqSize += byteBuffer.position();
}
ByteBuffer tmp = ByteBuffer.allocateDirect(reqSize);
for (ByteBuffer byteBuffer : bufferQueue) {
byteBuffer.flip();
tmp.put(byteBuffer);
}
return tmp;
}
// ############################################# // #############################################
// # // #
// # Writing parts // # Writing parts

View File

@ -536,8 +536,11 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
serializeEntityStreamCollectionFixed(request, serializeEntityStreamCollectionFixed(request,
entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType, entitySetSerialization, edmEntitySet, edmEntityType, requestedContentType,
expand, select, countOption, id); expand, select, countOption, id);
response.setContent(serializerResult.getContent()); if(serializerResult.isNioSupported()) {
response.setChannel(serializerResult.getChannel());
} else {
response.setContent(serializerResult.getContent());
}
response.setStatusCode(HttpStatusCode.OK.getStatusCode()); response.setStatusCode(HttpStatusCode.OK.getStatusCode());
response.setHeader(HttpHeader.CONTENT_TYPE, requestedContentType.toContentTypeString()); response.setHeader(HttpHeader.CONTENT_TYPE, requestedContentType.toContentTypeString());
if (pageSize != null) { if (pageSize != null) {
@ -631,6 +634,9 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
@Override @Override
public Entity nextEntity() { public Entity nextEntity() {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) { }
return test.next(); return test.next();
} }
}; };
@ -647,6 +653,7 @@ public class TechnicalEntityProcessor extends TechnicalProcessor
.build()); .build());
} }
private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection private SerializerResult serializeEntityCollection(final ODataRequest request, final EntityCollection
entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType, entityCollection, final EdmEntitySet edmEntitySet, final EdmEntityType edmEntityType,
final ContentType requestedFormat, final ExpandOption expand, final SelectOption select, final ContentType requestedFormat, final ExpandOption expand, final SelectOption select,