From 5b5a95ea778bfbd42a6046ae4ca704c7fc3d27b9 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 5 May 2010 10:24:36 +0300 Subject: [PATCH] better implemenation of InputStream by different StreamInput --- .../netty/ChannelBufferStreamInput.java | 10 +++ .../util/base/FinalizableReferenceQueue.java | 2 +- .../util/base/internal/Finalizer.java | 2 +- .../util/io/DataInputInputStream.java | 46 ----------- .../util/io/FastByteArrayInputStream.java | 4 + .../util/io/stream/BytesStreamInput.java | 24 ++++++ .../util/io/stream/DataInputStreamInput.java | 4 + .../util/io/stream/HandlesStreamInput.java | 12 +++ .../io/stream/InputStreamStreamInput.java | 12 +++ .../util/io/stream/StreamInput.java | 24 +++--- .../deps/jackson/JacksonLocationTests.java | 80 +++++++++++++++++++ 11 files changed, 160 insertions(+), 60 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/io/DataInputInputStream.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/deps/jackson/JacksonLocationTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java index ff5c4eb7bd6..36e1f84dbeb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java @@ -37,6 +37,16 @@ public class ChannelBufferStreamInput extends StreamInput { this.buffer = buffer; } + // Not really maps to InputStream, but good enough for us + @Override public int read() throws IOException { + return buffer.readByte() & 0xFF; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + readBytes(b, off, len); + return len; + } + @Override public byte readByte() throws IOException { return buffer.readByte(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/FinalizableReferenceQueue.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/FinalizableReferenceQueue.java index f5f02c60be2..d1750702317 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/FinalizableReferenceQueue.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/FinalizableReferenceQueue.java @@ -92,7 +92,7 @@ public class FinalizableReferenceQueue { = Logger.getLogger(FinalizableReferenceQueue.class.getName()); private static final String FINALIZER_CLASS_NAME - = "org.elasticsearch.util.gcommon.base.internal.Finalizer"; + = "org.elasticsearch.util.base.internal.Finalizer"; /** Reference to Finalizer.startFinalizer(). */ private static final Method startFinalizer; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/internal/Finalizer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/internal/Finalizer.java index 82b6aa6852b..e0136add7ce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/internal/Finalizer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/base/internal/Finalizer.java @@ -53,7 +53,7 @@ public class Finalizer extends Thread { /** Name of FinalizableReference.class. */ private static final String FINALIZABLE_REFERENCE - = "org.elasticsearch.util.gcommon.base.FinalizableReference"; + = "org.elasticsearch.util.base.FinalizableReference"; /** * Starts the Finalizer thread. FinalizableReferenceQueue calls this method diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/DataInputInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/DataInputInputStream.java deleted file mode 100644 index f1361939a72..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/DataInputInputStream.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.util.io; - -import java.io.DataInput; -import java.io.IOException; -import java.io.InputStream; - -/** - * A wrapper {@link java.io.InputStream} around {@link java.io.DataInput}. - * - * @author kimchy (Shay Banon) - */ -public class DataInputInputStream extends InputStream { - - private final DataInput dataInput; - - public DataInputInputStream(DataInput dataInput) { - this.dataInput = dataInput; - } - - @Override public int read() throws IOException { - return dataInput.readByte(); - } - - @Override public long skip(long n) throws IOException { - return dataInput.skipBytes((int) n); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayInputStream.java index 637a254ac40..5d2dea566dd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayInputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayInputStream.java @@ -214,6 +214,10 @@ public class FastByteArrayInputStream extends InputStream { return count - pos; } + public int position() { + return pos; + } + /** * Tests if this InputStream supports mark/reset. The * markSupported method of ByteArrayInputStream diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamInput.java index 2946c021355..b97dd838632 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamInput.java @@ -45,6 +45,30 @@ public class BytesStreamInput extends StreamInput { this.count = count; } + @Override public int read() throws IOException { + return (pos < count) ? (buf[pos++] & 0xff) : -1; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (pos >= count) { + return -1; + } + if (pos + len > count) { + len = count - pos; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + @Override public byte readByte() throws IOException { if (pos >= count) { throw new EOFException(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/DataInputStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/DataInputStreamInput.java index ae49e87d392..4c79561e590 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/DataInputStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/DataInputStreamInput.java @@ -34,6 +34,10 @@ public class DataInputStreamInput extends StreamInput { this.in = in; } + @Override public int read() throws IOException { + return in.readByte() & 0xFF; + } + @Override public byte readByte() throws IOException { return in.readByte(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java index 203aeea4eba..f5af82e68a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java @@ -84,6 +84,18 @@ public class HandlesStreamInput extends StreamInput { } } + @Override public int read() throws IOException { + return in.read(); + } + + @Override public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + @Override public byte readByte() throws IOException { return in.readByte(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/InputStreamStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/InputStreamStreamInput.java index 76a21533e13..2c53958ac37 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/InputStreamStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/InputStreamStreamInput.java @@ -34,6 +34,18 @@ public class InputStreamStreamInput extends StreamInput { this.is = is; } + @Override public int read() throws IOException { + return is.read(); + } + + @Override public int read(byte[] b) throws IOException { + return is.read(b); + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + return is.read(b, off, len); + } + @Override public byte readByte() throws IOException { return (byte) is.read(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java index da4b8d2c9aa..dcadb10bd33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java @@ -147,16 +147,16 @@ public abstract class StreamInput extends InputStream { */ public abstract void close() throws IOException; - // IS - - @Override public int read() throws IOException { - return readByte(); - } - - // Here, we assume that we always can read the full byte array - - @Override public int read(byte[] b, int off, int len) throws IOException { - readBytes(b, off, len); - return len; - } +// // IS +// +// @Override public int read() throws IOException { +// return readByte(); +// } +// +// // Here, we assume that we always can read the full byte array +// +// @Override public int read(byte[] b, int off, int len) throws IOException { +// readBytes(b, off, len); +// return len; +// } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/deps/jackson/JacksonLocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/jackson/JacksonLocationTests.java new file mode 100644 index 00000000000..8ca3c85c9b8 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/jackson/JacksonLocationTests.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.deps.jackson; + +import org.codehaus.jackson.*; +import org.elasticsearch.util.io.FastByteArrayInputStream; +import org.elasticsearch.util.io.FastByteArrayOutputStream; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class JacksonLocationTests { + + @Test public void testLocationExtraction() throws IOException { + // { + // "index" : "test", + // "source" : { + // value : "something" + // } + // } + FastByteArrayOutputStream os = new FastByteArrayOutputStream(); + JsonGenerator gen = new JsonFactory().createJsonGenerator(os, JsonEncoding.UTF8); + gen.writeStartObject(); + + gen.writeStringField("index", "test"); + + gen.writeFieldName("source"); + gen.writeStartObject(); + gen.writeStringField("value", "something"); + gen.writeEndObject(); + + gen.writeEndObject(); + + gen.close(); + + byte[] data = os.copiedByteArray(); + FastByteArrayInputStream is = new FastByteArrayInputStream(data); + JsonParser parser = new JsonFactory().createJsonParser(is); + + assertThat(parser.nextToken(), equalTo(JsonToken.START_OBJECT)); + assertThat(parser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "index" + assertThat(parser.nextToken(), equalTo(JsonToken.VALUE_STRING)); + assertThat(parser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "source" + assertThat(parser.nextToken(), equalTo(JsonToken.START_OBJECT)); +// int location1 = is.position(); +// parser.skipChildren(); +// int location2 = is.position(); +// byte[] sourceData = new byte[location2 - location1]; +// System.arraycopy(data, location1, sourceData, 0, sourceData.length); +// System.out.println(Unicode.fromBytes(sourceData)); +// JsonParser sourceParser = new JsonFactory().createJsonParser(new FastByteArrayInputStream(sourceData)); +// assertThat(sourceParser.nextToken(), equalTo(JsonToken.START_OBJECT)); +// assertThat(sourceParser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "value" +// assertThat(sourceParser.nextToken(), equalTo(JsonToken.VALUE_STRING)); +// assertThat(sourceParser.nextToken(), equalTo(JsonToken.END_OBJECT)); + } +}