improve writing text as utf8 when having a string if we can write directly the utf8 bytes

This commit is contained in:
Shay Banon 2012-07-28 20:21:20 +02:00
parent 6e20056619
commit a41477e0fa
10 changed files with 447 additions and 16 deletions

View File

@ -0,0 +1,333 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.CharConversionException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
/**
*/
public final class UTF8StreamWriter extends Writer {
/**
* Holds the current output stream or <code>null</code> if closed.
*/
private OutputStream _outputStream;
/**
* Holds the bytes' buffer.
*/
private final byte[] _bytes;
/**
* Holds the bytes buffer index.
*/
private int _index;
/**
* Creates a UTF-8 writer having a byte buffer of moderate capacity (2048).
*/
public UTF8StreamWriter() {
_bytes = new byte[2048];
}
/**
* Creates a UTF-8 writer having a byte buffer of specified capacity.
*
* @param capacity the capacity of the byte buffer.
*/
public UTF8StreamWriter(int capacity) {
_bytes = new byte[capacity];
}
/**
* Sets the output stream to use for writing until this writer is closed.
* For example:[code]
* Writer writer = new UTF8StreamWriter().setOutputStream(out);
* [/code] is equivalent but writes faster than [code]
* Writer writer = new java.io.OutputStreamWriter(out, "UTF-8");
* [/code]
*
* @param out the output stream.
* @return this UTF-8 writer.
* @throws IllegalStateException if this writer is being reused and
* it has not been {@link #close closed} or {@link #reset reset}.
*/
public UTF8StreamWriter setOutput(OutputStream out) {
if (_outputStream != null)
throw new IllegalStateException("Writer not closed or reset");
_outputStream = out;
return this;
}
/**
* Writes a single character. This method supports 16-bits
* character surrogates.
*
* @param c <code>char</code> the character to be written (possibly
* a surrogate).
* @throws IOException if an I/O error occurs.
*/
public void write(char c) throws IOException {
if ((c < 0xd800) || (c > 0xdfff)) {
write((int) c);
} else if (c < 0xdc00) { // High surrogate.
_highSurrogate = c;
} else { // Low surrogate.
int code = ((_highSurrogate - 0xd800) << 10) + (c - 0xdc00)
+ 0x10000;
write(code);
}
}
private char _highSurrogate;
/**
* Writes a character given its 31-bits Unicode.
*
* @param code the 31 bits Unicode of the character to be written.
* @throws IOException if an I/O error occurs.
*/
public void write(int code) throws IOException {
if ((code & 0xffffff80) == 0) {
_bytes[_index] = (byte) code;
if (++_index >= _bytes.length) {
flushBuffer();
}
} else { // Writes more than one byte.
write2(code);
}
}
private void write2(int c) throws IOException {
if ((c & 0xfffff800) == 0) { // 2 bytes.
_bytes[_index] = (byte) (0xc0 | (c >> 6));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | (c & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
} else if ((c & 0xffff0000) == 0) { // 3 bytes.
_bytes[_index] = (byte) (0xe0 | (c >> 12));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 6) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | (c & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
} else if ((c & 0xff200000) == 0) { // 4 bytes.
_bytes[_index] = (byte) (0xf0 | (c >> 18));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 12) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 6) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | (c & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
} else if ((c & 0xf4000000) == 0) { // 5 bytes.
_bytes[_index] = (byte) (0xf8 | (c >> 24));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 18) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 12) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 6) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | (c & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
} else if ((c & 0x80000000) == 0) { // 6 bytes.
_bytes[_index] = (byte) (0xfc | (c >> 30));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 24) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 18) & 0x3f));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 12) & 0x3F));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | ((c >> 6) & 0x3F));
if (++_index >= _bytes.length) {
flushBuffer();
}
_bytes[_index] = (byte) (0x80 | (c & 0x3F));
if (++_index >= _bytes.length) {
flushBuffer();
}
} else {
throw new CharConversionException("Illegal character U+"
+ Integer.toHexString(c));
}
}
/**
* Writes a portion of an array of characters.
*
* @param cbuf the array of characters.
* @param off the offset from which to start writing characters.
* @param len the number of characters to write.
* @throws IOException if an I/O error occurs.
*/
public void write(char cbuf[], int off, int len) throws IOException {
final int off_plus_len = off + len;
for (int i = off; i < off_plus_len; ) {
char c = cbuf[i++];
if (c < 0x80) {
_bytes[_index] = (byte) c;
if (++_index >= _bytes.length) {
flushBuffer();
}
} else {
write(c);
}
}
}
/**
* Writes a portion of a string.
*
* @param str a String.
* @param off the offset from which to start writing characters.
* @param len the number of characters to write.
* @throws IOException if an I/O error occurs
*/
public void write(String str, int off, int len) throws IOException {
final int off_plus_len = off + len;
for (int i = off; i < off_plus_len; ) {
char c = str.charAt(i++);
if (c < 0x80) {
_bytes[_index] = (byte) c;
if (++_index >= _bytes.length) {
flushBuffer();
}
} else {
write(c);
}
}
}
/**
* Writes the specified character sequence.
*
* @param csq the character sequence.
* @throws IOException if an I/O error occurs
*/
public void write(CharSequence csq) throws IOException {
final int length = csq.length();
for (int i = 0; i < length; ) {
char c = csq.charAt(i++);
if (c < 0x80) {
_bytes[_index] = (byte) c;
if (++_index >= _bytes.length) {
flushBuffer();
}
} else {
write(c);
}
}
}
/**
* Flushes the stream. If the stream has saved any characters from the
* various write() methods in a buffer, write them immediately to their
* intended destination. Then, if that destination is another character or
* byte stream, flush it. Thus one flush() invocation will flush all the
* buffers in a chain of Writers and OutputStreams.
*
* @throws IOException if an I/O error occurs.
*/
public void flush() throws IOException {
flushBuffer();
_outputStream.flush();
}
/**
* Closes and {@link #reset resets} this writer for reuse.
*
* @throws IOException if an I/O error occurs
*/
public void close() throws IOException {
if (_outputStream != null) {
flushBuffer();
_outputStream.close();
reset();
}
}
/**
* Flushes the internal bytes buffer.
*
* @throws IOException if an I/O error occurs
*/
private void flushBuffer() throws IOException {
if (_outputStream == null)
throw new IOException("Stream closed");
_outputStream.write(_bytes, 0, _index);
_index = 0;
}
// Implements Reusable.
public void reset() {
_highSurrogate = 0;
_index = 0;
_outputStream = null;
}
/**
* @deprecated Replaced by {@link #setOutput(OutputStream)}
*/
public UTF8StreamWriter setOutputStream(OutputStream out) {
return this.setOutput(out);
}
}

View File

@ -37,6 +37,11 @@ public abstract class AdapterStreamInput extends StreamInput {
return in.readBytesReference(); return in.readBytesReference();
} }
@Override
public BytesReference readBytesReference(int length) throws IOException {
return in.readBytesReference(length);
}
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
in.reset(); in.reset();

View File

@ -43,6 +43,21 @@ public class AdapterStreamOutput extends StreamOutput {
return this.out; return this.out;
} }
@Override
public boolean seekPositionSupported() {
return out.seekPositionSupported();
}
@Override
public long position() throws IOException {
return out.position();
}
@Override
public void seek(long position) throws IOException {
out.seek(position);
}
@Override @Override
public void writeByte(byte b) throws IOException { public void writeByte(byte b) throws IOException {
out.writeByte(b); out.writeByte(b);

View File

@ -60,13 +60,9 @@ public class BytesStreamInput extends StreamInput {
} }
@Override @Override
public BytesReference readBytesReference() throws IOException { public BytesReference readBytesReference(int length) throws IOException {
if (unsafe) { if (unsafe) {
return super.readBytesReference(); return super.readBytesReference(length);
}
int length = readVInt();
if (length == 0) {
return BytesArray.EMPTY;
} }
BytesArray bytes = new BytesArray(buf, pos, length); BytesArray bytes = new BytesArray(buf, pos, length);
pos += length; pos += length;

View File

@ -50,6 +50,24 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
this.buf = new byte[size]; this.buf = new byte[size];
} }
@Override
public boolean seekPositionSupported() {
return true;
}
@Override
public long position() throws IOException {
return count;
}
@Override
public void seek(long position) throws IOException {
if (position > Integer.MAX_VALUE) {
throw new UnsupportedOperationException();
}
count = (int) position;
}
@Override @Override
public void writeByte(byte b) throws IOException { public void writeByte(byte b) throws IOException {
int newcount = count + 1; int newcount = count + 1;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
import jsr166y.LinkedTransferQueue; import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.UTF8StreamWriter;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
@ -132,4 +133,17 @@ public class CachedStreamOutput {
ref.add(entry); ref.add(entry);
} }
} }
private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<SoftReference<UTF8StreamWriter>>();
public static UTF8StreamWriter utf8StreamWriter() {
SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get();
UTF8StreamWriter writer = (ref == null) ? null : ref.get();
if (writer == null) {
writer = new UTF8StreamWriter(1024 * 4);
utf8StreamWriter.set(new SoftReference<UTF8StreamWriter>(writer));
}
writer.reset();
return writer;
}
} }

View File

@ -56,6 +56,14 @@ public abstract class StreamInput extends InputStream {
*/ */
public BytesReference readBytesReference() throws IOException { public BytesReference readBytesReference() throws IOException {
int length = readVInt(); int length = readVInt();
return readBytesReference(length);
}
/**
* Reads a bytes reference from this stream, might hold an actual reference to the underlying
* bytes of the stream.
*/
public BytesReference readBytesReference(int length) throws IOException {
if (length == 0) { if (length == 0) {
return BytesArray.EMPTY; return BytesArray.EMPTY;
} }
@ -159,7 +167,8 @@ public abstract class StreamInput extends InputStream {
public Text readText() throws IOException { public Text readText() throws IOException {
// use StringAndBytes so we can cache the string if its ever converted to it // use StringAndBytes so we can cache the string if its ever converted to it
return new StringAndBytesText(readBytesReference()); int length = readInt();
return new StringAndBytesText(readBytesReference(length));
} }
@Nullable @Nullable

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.UTF8StreamWriter;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.joda.time.ReadableInstant; import org.joda.time.ReadableInstant;
@ -36,6 +37,18 @@ import java.util.Map;
*/ */
public abstract class StreamOutput extends OutputStream { public abstract class StreamOutput extends OutputStream {
public boolean seekPositionSupported() {
return false;
}
public long position() throws IOException {
throw new UnsupportedOperationException();
}
public void seek(long position) throws IOException {
throw new UnsupportedOperationException();
}
/** /**
* Writes a single byte. * Writes a single byte.
*/ */
@ -150,9 +163,23 @@ public abstract class StreamOutput extends OutputStream {
} }
public void writeText(Text text) throws IOException { public void writeText(Text text) throws IOException {
// always write the bytes... if (!text.hasBytes() && seekPositionSupported()) {
// TODO: TextBytesOptimization we could potentially optimize this, and write the bytes directly to the output stream converting to UTF8 in case its a string long pos1 = position();
writeBytesReference(text.bytes()); // make room for the size
seek(pos1 + 4);
UTF8StreamWriter utf8StreamWriter = CachedStreamOutput.utf8StreamWriter();
utf8StreamWriter.setOutput(this);
utf8StreamWriter.write(text.string());
utf8StreamWriter.close();
long pos2 = position();
seek(pos1);
writeInt((int) (pos2 - pos1 - 4));
seek(pos2);
} else {
BytesReference bytes = text.bytes();
writeInt(bytes.length());
bytes.writeTo(this);
}
} }
public void writeString(String str) throws IOException { public void writeString(String str) throws IOException {

View File

@ -19,6 +19,9 @@
package org.elasticsearch.common.xcontent.support; package org.elasticsearch.common.xcontent.support;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentGenerator; import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -175,6 +178,22 @@ public class XContentMapConverter {
gen.writeBinary((byte[]) value); gen.writeBinary((byte[]) value);
} else if (value instanceof Date) { } else if (value instanceof Date) {
gen.writeString(XContentBuilder.defaultDatePrinter.print(((Date) value).getTime())); gen.writeString(XContentBuilder.defaultDatePrinter.print(((Date) value).getTime()));
} else if (value instanceof BytesReference) {
BytesReference bytes = (BytesReference) value;
if (!bytes.hasArray()) {
bytes = bytes.toBytesArray();
}
gen.writeBinary(bytes.array(), bytes.arrayOffset(), bytes.length());
} else if (value instanceof Text) {
Text text = (Text) value;
if (text.hasBytes() && text.bytes().hasArray()) {
gen.writeUTF8String(text.bytes().array(), text.bytes().arrayOffset(), text.bytes().length());
} else if (text.hasString()) {
gen.writeString(text.string());
} else {
BytesArray bytesArray = text.bytes().toBytesArray();
gen.writeUTF8String(bytesArray.array(), bytesArray.arrayOffset(), bytesArray.length());
}
} else { } else {
gen.writeString(value.toString()); gen.writeString(value.toString());
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ChannelBufferBytesReference; import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -52,11 +51,7 @@ public class ChannelBufferStreamInput extends StreamInput {
} }
@Override @Override
public BytesReference readBytesReference() throws IOException { public BytesReference readBytesReference(int length) throws IOException {
int length = readVInt();
if (length == 0) {
return BytesArray.EMPTY;
}
ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length)); ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length));
buffer.skipBytes(length); buffer.skipBytes(length);
return ref; return ref;