better implemenation of InputStream by different StreamInput

This commit is contained in:
kimchy 2010-05-05 10:24:36 +03:00
parent 5b231c68f8
commit 5b5a95ea77
11 changed files with 160 additions and 60 deletions

View File

@ -37,6 +37,16 @@ public class ChannelBufferStreamInput extends StreamInput {
this.buffer = buffer;
}
// Not really maps to InputStream, but good enough for us
@Override public int read() throws IOException {
return buffer.readByte() & 0xFF;
}
@Override public int read(byte[] b, int off, int len) throws IOException {
readBytes(b, off, len);
return len;
}
@Override public byte readByte() throws IOException {
return buffer.readByte();
}

View File

@ -92,7 +92,7 @@ public class FinalizableReferenceQueue {
= Logger.getLogger(FinalizableReferenceQueue.class.getName());
private static final String FINALIZER_CLASS_NAME
= "org.elasticsearch.util.gcommon.base.internal.Finalizer";
= "org.elasticsearch.util.base.internal.Finalizer";
/** Reference to Finalizer.startFinalizer(). */
private static final Method startFinalizer;

View File

@ -53,7 +53,7 @@ public class Finalizer extends Thread {
/** Name of FinalizableReference.class. */
private static final String FINALIZABLE_REFERENCE
= "org.elasticsearch.util.gcommon.base.FinalizableReference";
= "org.elasticsearch.util.base.FinalizableReference";
/**
* Starts the Finalizer thread. FinalizableReferenceQueue calls this method

View File

@ -1,46 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
/**
* A wrapper {@link java.io.InputStream} around {@link java.io.DataInput}.
*
* @author kimchy (Shay Banon)
*/
public class DataInputInputStream extends InputStream {
private final DataInput dataInput;
public DataInputInputStream(DataInput dataInput) {
this.dataInput = dataInput;
}
@Override public int read() throws IOException {
return dataInput.readByte();
}
@Override public long skip(long n) throws IOException {
return dataInput.skipBytes((int) n);
}
}

View File

@ -214,6 +214,10 @@ public class FastByteArrayInputStream extends InputStream {
return count - pos;
}
public int position() {
return pos;
}
/**
* Tests if this <code>InputStream</code> supports mark/reset. The
* <code>markSupported</code> method of <code>ByteArrayInputStream</code>

View File

@ -45,6 +45,30 @@ public class BytesStreamInput extends StreamInput {
this.count = count;
}
@Override public int read() throws IOException {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
@Override public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
if (pos + len > count) {
len = count - pos;
}
if (len <= 0) {
return 0;
}
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
@Override public byte readByte() throws IOException {
if (pos >= count) {
throw new EOFException();

View File

@ -34,6 +34,10 @@ public class DataInputStreamInput extends StreamInput {
this.in = in;
}
@Override public int read() throws IOException {
return in.readByte() & 0xFF;
}
@Override public byte readByte() throws IOException {
return in.readByte();
}

View File

@ -84,6 +84,18 @@ public class HandlesStreamInput extends StreamInput {
}
}
@Override public int read() throws IOException {
return in.read();
}
@Override public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override public byte readByte() throws IOException {
return in.readByte();
}

View File

@ -34,6 +34,18 @@ public class InputStreamStreamInput extends StreamInput {
this.is = is;
}
@Override public int read() throws IOException {
return is.read();
}
@Override public int read(byte[] b) throws IOException {
return is.read(b);
}
@Override public int read(byte[] b, int off, int len) throws IOException {
return is.read(b, off, len);
}
@Override public byte readByte() throws IOException {
return (byte) is.read();
}

View File

@ -147,16 +147,16 @@ public abstract class StreamInput extends InputStream {
*/
public abstract void close() throws IOException;
// IS
@Override public int read() throws IOException {
return readByte();
}
// Here, we assume that we always can read the full byte array
@Override public int read(byte[] b, int off, int len) throws IOException {
readBytes(b, off, len);
return len;
}
// // IS
//
// @Override public int read() throws IOException {
// return readByte();
// }
//
// // Here, we assume that we always can read the full byte array
//
// @Override public int read(byte[] b, int off, int len) throws IOException {
// readBytes(b, off, len);
// return len;
// }
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.deps.jackson;
import org.codehaus.jackson.*;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import org.elasticsearch.util.io.FastByteArrayOutputStream;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class JacksonLocationTests {
@Test public void testLocationExtraction() throws IOException {
// {
// "index" : "test",
// "source" : {
// value : "something"
// }
// }
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
JsonGenerator gen = new JsonFactory().createJsonGenerator(os, JsonEncoding.UTF8);
gen.writeStartObject();
gen.writeStringField("index", "test");
gen.writeFieldName("source");
gen.writeStartObject();
gen.writeStringField("value", "something");
gen.writeEndObject();
gen.writeEndObject();
gen.close();
byte[] data = os.copiedByteArray();
FastByteArrayInputStream is = new FastByteArrayInputStream(data);
JsonParser parser = new JsonFactory().createJsonParser(is);
assertThat(parser.nextToken(), equalTo(JsonToken.START_OBJECT));
assertThat(parser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "index"
assertThat(parser.nextToken(), equalTo(JsonToken.VALUE_STRING));
assertThat(parser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "source"
assertThat(parser.nextToken(), equalTo(JsonToken.START_OBJECT));
// int location1 = is.position();
// parser.skipChildren();
// int location2 = is.position();
// byte[] sourceData = new byte[location2 - location1];
// System.arraycopy(data, location1, sourceData, 0, sourceData.length);
// System.out.println(Unicode.fromBytes(sourceData));
// JsonParser sourceParser = new JsonFactory().createJsonParser(new FastByteArrayInputStream(sourceData));
// assertThat(sourceParser.nextToken(), equalTo(JsonToken.START_OBJECT));
// assertThat(sourceParser.nextToken(), equalTo(JsonToken.FIELD_NAME)); // "value"
// assertThat(sourceParser.nextToken(), equalTo(JsonToken.VALUE_STRING));
// assertThat(sourceParser.nextToken(), equalTo(JsonToken.END_OBJECT));
}
}