remove cached stream output

start to build the infra for simpler migration to netty 4, starting with removing the cache stream output handling as it will come built in
This commit is contained in:
Shay Banon 2013-07-23 15:55:41 +02:00
parent b52243cdc2
commit 6a5d2bf767
34 changed files with 433 additions and 1177 deletions

View File

@ -36,7 +36,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -536,14 +539,9 @@ public class ClusterState implements ToXContent {
} }
public static byte[] toBytes(ClusterState state) throws IOException { public static byte[] toBytes(ClusterState state) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput os = new BytesStreamOutput();
try { writeTo(state, os);
BytesStreamOutput os = cachedEntry.bytes(); return os.bytes().toBytes();
writeTo(state, os);
return os.bytes().copyBytesArray().toBytes();
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} }
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.lzf.LZFCompressor; import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -163,14 +163,10 @@ public class CompressorFactory {
return new BytesArray(compressor.uncompress(bytes.array(), bytes.arrayOffset(), bytes.length())); return new BytesArray(compressor.uncompress(bytes.array(), bytes.arrayOffset(), bytes.length()));
} }
StreamInput compressed = compressor.streamInput(bytes.streamInput()); StreamInput compressed = compressor.streamInput(bytes.streamInput());
CachedStreamOutput.Entry entry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { Streams.copy(compressed, bStream);
Streams.copy(compressed, entry.bytes()); compressed.close();
compressed.close(); return bStream.bytes();
return new BytesArray(entry.bytes().bytes().toBytes());
} finally {
CachedStreamOutput.pushEntry(entry);
}
} }
return bytes; return bytes;
} }

View File

@ -20,12 +20,10 @@
package org.elasticsearch.common.io; package org.elasticsearch.common.io;
import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
public class CachedStreams { public class CachedStreams {
public static void clear() { public static void clear() {
CachedStreamInput.clear(); CachedStreamInput.clear();
CachedStreamOutput.clear();
} }
} }

View File

@ -1,268 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.IOException;
import java.io.InputStream;
/**
*
*/
public class FastByteArrayInputStream extends InputStream {
/**
* An array of bytes that was provided
* by the creator of the stream. Elements <code>buf[0]</code>
* through <code>buf[count-1]</code> are the
* only bytes that can ever be read from the
* stream; element <code>buf[pos]</code> is
* the next byte to be read.
*/
protected byte buf[];
/**
* The index of the next character to read from the input stream buffer.
* This value should always be nonnegative
* and not larger than the value of <code>count</code>.
* The next byte to be read from the input stream buffer
* will be <code>buf[pos]</code>.
*/
protected int pos;
/**
* The currently marked position in the stream.
* ByteArrayInputStream objects are marked at position zero by
* default when constructed. They may be marked at another
* position within the buffer by the <code>mark()</code> method.
* The current buffer position is set to this point by the
* <code>reset()</code> method.
* <p/>
* If no mark has been set, then the value of mark is the offset
* passed to the constructor (or 0 if the offset was not supplied).
*
* @since JDK1.1
*/
protected int mark = 0;
/**
* The index one greater than the last valid character in the input
* stream buffer.
* This value should always be nonnegative
* and not larger than the length of <code>buf</code>.
* It is one greater than the position of
* the last byte within <code>buf</code> that
* can ever be read from the input stream buffer.
*/
protected int count;
/**
* Creates a <code>ByteArrayInputStream</code>
* so that it uses <code>buf</code> as its
* buffer array.
* The buffer array is not copied.
* The initial value of <code>pos</code>
* is <code>0</code> and the initial value
* of <code>count</code> is the length of
* <code>buf</code>.
*
* @param buf the input buffer.
*/
public FastByteArrayInputStream(byte buf[]) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}
/**
* Creates <code>ByteArrayInputStream</code>
* that uses <code>buf</code> as its
* buffer array. The initial value of <code>pos</code>
* is <code>offset</code> and the initial value
* of <code>count</code> is the minimum of <code>offset+length</code>
* and <code>buf.length</code>.
* The buffer array is not copied. The buffer's mark is
* set to the specified offset.
*
* @param buf the input buffer.
* @param offset the offset in the buffer of the first byte to read.
* @param length the maximum number of bytes to read from the buffer.
*/
public FastByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
this.mark = offset;
}
/**
* Reads the next byte of data from this input stream. The value
* byte is returned as an <code>int</code> in the range
* <code>0</code> to <code>255</code>. If no byte is available
* because the end of the stream has been reached, the value
* <code>-1</code> is returned.
* <p/>
* This <code>read</code> method
* cannot block.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream has been reached.
*/
public int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
/**
* Reads up to <code>len</code> bytes of data into an array of bytes
* from this input stream.
* If <code>pos</code> equals <code>count</code>,
* then <code>-1</code> is returned to indicate
* end of file. Otherwise, the number <code>k</code>
* of bytes read is equal to the smaller of
* <code>len</code> and <code>count-pos</code>.
* If <code>k</code> is positive, then bytes
* <code>buf[pos]</code> through <code>buf[pos+k-1]</code>
* are copied into <code>b[off]</code> through
* <code>b[off+k-1]</code> in the manner performed
* by <code>System.arraycopy</code>. The
* value <code>k</code> is added into <code>pos</code>
* and <code>k</code> is returned.
* <p/>
* This <code>read</code> method cannot block.
*
* @param b the buffer into which the data is read.
* @param off the start offset in the destination array <code>b</code>
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached.
* @throws NullPointerException If <code>b</code> is <code>null</code>.
* @throws IndexOutOfBoundsException If <code>off</code> is negative,
* <code>len</code> is negative, or <code>len</code> is greater than
* <code>b.length - off</code>
*/
public int read(byte b[], int off, int len) {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
if (pos + len > count) {
len = count - pos;
}
if (len <= 0) {
return 0;
}
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
/**
* Skips <code>n</code> bytes of input from this input stream. Fewer
* bytes might be skipped if the end of the input stream is reached.
* The actual number <code>k</code>
* of bytes to be skipped is equal to the smaller
* of <code>n</code> and <code>count-pos</code>.
* The value <code>k</code> is added into <code>pos</code>
* and <code>k</code> is returned.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
*/
public long skip(long n) {
if (pos + n > count) {
n = count - pos;
}
if (n < 0) {
return 0;
}
pos += n;
return n;
}
/**
* Returns the number of remaining bytes that can be read (or skipped over)
* from this input stream.
* <p/>
* The value returned is <code>count&nbsp;- pos</code>,
* which is the number of bytes remaining to be read from the input buffer.
*
* @return the number of remaining bytes that can be read (or skipped
* over) from this input stream without blocking.
*/
public int available() {
return count - pos;
}
public int position() {
return pos;
}
/**
* Tests if this <code>InputStream</code> supports mark/reset. The
* <code>markSupported</code> method of <code>ByteArrayInputStream</code>
* always returns <code>true</code>.
*
* @since JDK1.1
*/
public boolean markSupported() {
return true;
}
/**
* Set the current marked position in the stream.
* ByteArrayInputStream objects are marked at position zero by
* default when constructed. They may be marked at another
* position within the buffer by this method.
* <p/>
* If no mark has been set, then the value of the mark is the
* offset passed to the constructor (or 0 if the offset was not
* supplied).
* <p/>
* <p> Note: The <code>readAheadLimit</code> for this class
* has no meaning.
*
* @since JDK1.1
*/
public void mark(int readAheadLimit) {
mark = pos;
}
/**
* Resets the buffer to the marked position. The marked position
* is 0 unless another position was marked or an offset was specified
* in the constructor.
*/
public void reset() {
pos = mark;
}
/**
* Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
* this class can be called after the stream has been closed without
* generating an <tt>IOException</tt>.
* <p/>
*/
public void close() throws IOException {
}
}

View File

@ -1,210 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import com.google.common.base.Charsets;
/**
* Similar to {@link java.io.ByteArrayOutputStream} just not synced.
*/
public class FastByteArrayOutputStream extends OutputStream implements BytesStream {
/**
* The buffer where data is stored.
*/
protected byte buf[];
/**
* The number of valid bytes in the buffer.
*/
protected int count;
/**
* Creates a new byte array output stream. The buffer capacity is
* initially 1024 bytes, though its size increases if necessary.
* <p/>
* ES: We use 1024 bytes since we mainly use this to build json/smile
* content in memory, and rarely does the 32 byte default in ByteArrayOutputStream fits...
*/
public FastByteArrayOutputStream() {
this(1024);
}
/**
* Creates a new byte array output stream, with a buffer capacity of
* the specified size, in bytes.
*
* @param size the initial size.
* @throws IllegalArgumentException if size is negative.
*/
public FastByteArrayOutputStream(int size) {
if (size < 0) {
throw new IllegalArgumentException("Negative initial size: "
+ size);
}
buf = new byte[size];
}
/**
* Writes the specified byte to this byte array output stream.
*
* @param b the byte to be written.
*/
public void write(int b) {
int newcount = count + 1;
if (newcount > buf.length) {
buf = ArrayUtil.grow(buf, newcount);
}
buf[count] = (byte) b;
count = newcount;
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this byte array output stream.
* <p/>
* <b>NO checks for bounds, parameters must be ok!</b>
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
public void write(byte b[], int off, int len) {
if (len == 0) {
return;
}
int newcount = count + len;
if (newcount > buf.length) {
buf = ArrayUtil.grow(buf, newcount);
}
System.arraycopy(b, off, buf, count, len);
count = newcount;
}
/**
* Writes the complete contents of this byte array output stream to
* the specified output stream argument, as if by calling the output
* stream's write method using <code>out.write(buf, 0, count)</code>.
*
* @param out the output stream to which to write the data.
* @throws IOException if an I/O error occurs.
*/
public void writeTo(OutputStream out) throws IOException {
out.write(buf, 0, count);
}
/**
* Resets the <code>count</code> field of this byte array output
* stream to zero, so that all currently accumulated output in the
* output stream is discarded. The output stream can be used again,
* reusing the already allocated buffer space.
*
* @see java.io.ByteArrayInputStream#count
*/
public void reset() {
count = 0;
}
/**
* Returns the underlying byte array. Note, use {@link #size()} in order to know
* the length of it.
*/
@Override
public BytesReference bytes() {
return new BytesArray(buf, 0, count);
}
/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number
* of valid bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
public int size() {
return count;
}
/**
* Seeks back to the given position. Size will become the seeked location.
*/
public void seek(int position) {
this.count = position;
}
/**
* Converts the buffer's contents into a string decoding bytes using the
* platform's default character set. The length of the new <tt>String</tt>
* is a function of the character set, and hence may not be equal to the
* size of the buffer.
* <p/>
* <p> This method always replaces malformed-input and unmappable-character
* sequences with the default replacement string for the platform's
* default character set. The {@linkplain java.nio.charset.CharsetDecoder}
* class should be used when more control over the decoding process is
* required.
*
* @return String decoded from the buffer's contents.
* @since JDK1.1
*/
public String toString() {
return new String(buf, 0, count, Charsets.UTF_8);
}
/**
* Converts the buffer's contents into a string by decoding the bytes using
* the specified {@link java.nio.charset.Charset charsetName}. The length of
* the new <tt>String</tt> is a function of the charset, and hence may not be
* equal to the length of the byte array.
* <p/>
* <p> This method always replaces malformed-input and unmappable-character
* sequences with this charset's default replacement string. The {@link
* java.nio.charset.CharsetDecoder} class should be used when more control
* over the decoding process is required.
*
* @param charsetName the name of a supported
* {@linkplain java.nio.charset.Charset </code>charset<code>}
* @return String decoded from the buffer's contents.
* @throws java.io.UnsupportedEncodingException
* If the named charset is not supported
* @since JDK1.1
*/
public String toString(String charsetName)
throws UnsupportedEncodingException {
return new String(buf, 0, count, charsetName);
}
/**
* Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
* this class can be called after the stream has been closed without
* generating an <tt>IOException</tt>.
* <p/>
*/
public void close() throws IOException {
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.common.io;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import java.io.*; import java.io.*;
@ -161,14 +160,9 @@ public abstract class Streams {
* @throws IOException in case of I/O errors * @throws IOException in case of I/O errors
*/ */
public static byte[] copyToByteArray(InputStream in) throws IOException { public static byte[] copyToByteArray(InputStream in) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput out = new BytesStreamOutput();
try { copy(in, out);
BytesStreamOutput out = cachedEntry.bytes(); return out.bytes().toBytes();
copy(in, out);
return out.bytes().copyBytesArray().toBytes();
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} }

View File

@ -31,6 +31,8 @@ import java.io.IOException;
*/ */
public class BytesStreamOutput extends StreamOutput implements BytesStream { public class BytesStreamOutput extends StreamOutput implements BytesStream {
public static final int DEFAULT_SIZE = 32 * 1024;
/** /**
* The buffer where data is stored. * The buffer where data is stored.
*/ */
@ -42,7 +44,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
protected int count; protected int count;
public BytesStreamOutput() { public BytesStreamOutput() {
this(1024); this(DEFAULT_SIZE);
} }
public BytesStreamOutput(int size) { public BytesStreamOutput(int size) {

View File

@ -1,172 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.UTF8StreamWriter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class CachedStreamOutput {
private static Entry newEntry() {
BytesStreamOutput bytes = new BytesStreamOutput();
HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
return new Entry(bytes, handles);
}
public static class Entry {
private final BytesStreamOutput bytes;
private final HandlesStreamOutput handles;
Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) {
this.bytes = bytes;
this.handles = handles;
}
public void reset() {
bytes.reset();
handles.setOut(bytes);
handles.clear();
}
public BytesStreamOutput bytes() {
return bytes;
}
public StreamOutput handles() throws IOException {
return handles;
}
public StreamOutput bytes(Compressor compressor) throws IOException {
return compressor.streamOutput(bytes);
}
public StreamOutput handles(Compressor compressor) throws IOException {
StreamOutput compressed = compressor.streamOutput(bytes);
handles.clear();
handles.setOut(compressed);
return handles;
}
}
static class SoftWrapper<T> {
private SoftReference<T> ref;
public SoftWrapper() {
}
public void set(T ref) {
this.ref = new SoftReference<T>(ref);
}
public T get() {
return ref == null ? null : ref.get();
}
public void clear() {
ref = null;
}
}
private static final SoftWrapper<Queue<Entry>> cache = new SoftWrapper<Queue<Entry>>();
private static final AtomicInteger counter = new AtomicInteger();
public static int BYTES_LIMIT = 1 * 1024 * 1024; // don't cache entries that are bigger than that...
public static int COUNT_LIMIT = 100; // number of concurrent entries cached
static {
// guess the maximum size per entry and the maximum number of entries based on the heap size
long maxHeap = JvmInfo.jvmInfo().mem().heapMax().bytes();
if (maxHeap < ByteSizeValue.parseBytesSizeValue("500mb").bytes()) {
BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("500kb").bytes();
COUNT_LIMIT = 10;
} else if (maxHeap < ByteSizeValue.parseBytesSizeValue("1gb").bytes()) {
BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("1mb").bytes();
COUNT_LIMIT = 20;
} else if (maxHeap < ByteSizeValue.parseBytesSizeValue("4gb").bytes()) {
BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("2mb").bytes();
COUNT_LIMIT = 50;
} else if (maxHeap < ByteSizeValue.parseBytesSizeValue("10gb").bytes()) {
BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("5mb").bytes();
COUNT_LIMIT = 50;
} else {
BYTES_LIMIT = (int) ByteSizeValue.parseBytesSizeValue("10mb").bytes();
COUNT_LIMIT = 100;
}
}
public static void clear() {
cache.clear();
}
public static Entry popEntry() {
Queue<Entry> ref = cache.get();
if (ref == null) {
return newEntry();
}
Entry entry = ref.poll();
if (entry == null) {
return newEntry();
}
counter.decrementAndGet();
entry.reset();
return entry;
}
public static void pushEntry(Entry entry) {
entry.reset();
if (entry.bytes().bytes().length() > BYTES_LIMIT) {
return;
}
Queue<Entry> ref = cache.get();
if (ref == null) {
ref = ConcurrentCollections.newQueue();
counter.set(0);
cache.set(ref);
}
if (counter.incrementAndGet() > COUNT_LIMIT) {
counter.decrementAndGet();
} else {
ref.add(entry);
}
}
private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<SoftReference<UTF8StreamWriter>>();
public static UTF8StreamWriter utf8StreamWriter() {
SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get();
UTF8StreamWriter writer = (ref == null) ? null : ref.get();
if (writer == null) {
writer = new UTF8StreamWriter(1024 * 4);
utf8StreamWriter.set(new SoftReference<UTF8StreamWriter>(writer));
}
writer.reset();
return writer;
}
}

View File

@ -29,6 +29,7 @@ import org.joda.time.ReadableInstant;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.ref.SoftReference;
import java.util.Date; import java.util.Date;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -39,6 +40,19 @@ import java.util.Map;
*/ */
public abstract class StreamOutput extends OutputStream { public abstract class StreamOutput extends OutputStream {
private static ThreadLocal<SoftReference<UTF8StreamWriter>> utf8StreamWriter = new ThreadLocal<SoftReference<UTF8StreamWriter>>();
public static UTF8StreamWriter utf8StreamWriter() {
SoftReference<UTF8StreamWriter> ref = utf8StreamWriter.get();
UTF8StreamWriter writer = (ref == null) ? null : ref.get();
if (writer == null) {
writer = new UTF8StreamWriter(1024 * 4);
utf8StreamWriter.set(new SoftReference<UTF8StreamWriter>(writer));
}
writer.reset();
return writer;
}
private Version version = Version.CURRENT; private Version version = Version.CURRENT;
public Version getVersion() { public Version getVersion() {
@ -198,7 +212,7 @@ public abstract class StreamOutput extends OutputStream {
long pos1 = position(); long pos1 = position();
// make room for the size // make room for the size
seek(pos1 + 4); seek(pos1 + 4);
UTF8StreamWriter utf8StreamWriter = CachedStreamOutput.utf8StreamWriter(); UTF8StreamWriter utf8StreamWriter = utf8StreamWriter();
utf8StreamWriter.setOutput(this); utf8StreamWriter.setOutput(this);
utf8StreamWriter.write(text.string()); utf8StreamWriter.write(text.string());
utf8StreamWriter.close(); utf8StreamWriter.close();

View File

@ -20,8 +20,8 @@
package org.elasticsearch.common.settings.loader; package org.elasticsearch.common.settings.loader;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.FastStringReader; import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -31,8 +31,6 @@ import static com.google.common.collect.Maps.newHashMap;
/** /**
* Settings loader that loads (parses) the settings in a properties format. * Settings loader that loads (parses) the settings in a properties format.
*
*
*/ */
public class PropertiesSettingsLoader implements SettingsLoader { public class PropertiesSettingsLoader implements SettingsLoader {
@ -55,7 +53,7 @@ public class PropertiesSettingsLoader implements SettingsLoader {
@Override @Override
public Map<String, String> load(byte[] source) throws IOException { public Map<String, String> load(byte[] source) throws IOException {
Properties props = new Properties(); Properties props = new Properties();
FastByteArrayInputStream stream = new FastByteArrayInputStream(source); BytesStreamInput stream = new BytesStreamInput(source, false);
try { try {
props.load(stream); props.load(stream);
Map<String, String> result = newHashMap(); Map<String, String> result = newHashMap();

View File

@ -21,12 +21,11 @@ package org.elasticsearch.common.xcontent;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant; import org.joda.time.ReadableInstant;
@ -70,11 +69,8 @@ public final class XContentBuilder implements BytesStream {
XContentBuilder.globalFieldCaseConversion = globalFieldCaseConversion; XContentBuilder.globalFieldCaseConversion = globalFieldCaseConversion;
} }
/**
* Constructs a new builder using a fresh {@link FastByteArrayOutputStream}.
*/
public static XContentBuilder builder(XContent xContent) throws IOException { public static XContentBuilder builder(XContent xContent) throws IOException {
return new XContentBuilder(xContent, new FastByteArrayOutputStream()); return new XContentBuilder(xContent, new BytesStreamOutput());
} }
@ -82,8 +78,6 @@ public final class XContentBuilder implements BytesStream {
private final OutputStream bos; private final OutputStream bos;
private final Object payload;
private FieldCaseConversion fieldCaseConversion = globalFieldCaseConversion; private FieldCaseConversion fieldCaseConversion = globalFieldCaseConversion;
private StringBuilder cachedStringBuilder; private StringBuilder cachedStringBuilder;
@ -94,17 +88,8 @@ public final class XContentBuilder implements BytesStream {
* to call {@link #close()} when the builder is done with. * to call {@link #close()} when the builder is done with.
*/ */
public XContentBuilder(XContent xContent, OutputStream bos) throws IOException { public XContentBuilder(XContent xContent, OutputStream bos) throws IOException {
this(xContent, bos, null);
}
/**
* Constructs a new builder using the provided xcontent and an OutputStream. Make sure
* to call {@link #close()} when the builder is done with.
*/
public XContentBuilder(XContent xContent, OutputStream bos, @Nullable Object payload) throws IOException {
this.bos = bos; this.bos = bos;
this.generator = xContent.createGenerator(bos); this.generator = xContent.createGenerator(bos);
this.payload = payload;
} }
public XContentBuilder fieldCaseConversion(FieldCaseConversion fieldCaseConversion) { public XContentBuilder fieldCaseConversion(FieldCaseConversion fieldCaseConversion) {
@ -1024,11 +1009,6 @@ public final class XContentBuilder implements BytesStream {
return this.generator; return this.generator;
} }
@Nullable
public Object payload() {
return this.payload;
}
public OutputStream stream() { public OutputStream stream() {
return this.bos; return this.bos;
} }

View File

@ -252,16 +252,16 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
return; return;
} }
synchronized (sendMutex) { synchronized (sendMutex) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try { try {
StreamOutput out = cachedEntry.handles(); BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput out = new HandlesStreamOutput(bStream);
out.writeBytes(INTERNAL_HEADER); out.writeBytes(INTERNAL_HEADER);
Version.writeVersion(Version.CURRENT, out); Version.writeVersion(Version.CURRENT, out);
out.writeInt(id); out.writeInt(id);
clusterName.writeTo(out); clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out); nodesProvider.nodes().localNode().writeTo(out);
out.close(); out.close();
datagramPacketSend.setData(cachedEntry.bytes().bytes().copyBytesArray().toBytes()); datagramPacketSend.setData(bStream.bytes().toBytes());
multicastSocket.send(datagramPacketSend); multicastSocket.send(datagramPacketSend);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[{}] sending ping request", id); logger.trace("[{}] sending ping request", id);
@ -275,8 +275,6 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
} else { } else {
logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e)); logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e));
} }
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }
} }

View File

@ -27,10 +27,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -71,44 +68,39 @@ public class PublishClusterStateAction extends AbstractComponent {
public void publish(ClusterState clusterState) { public void publish(ClusterState clusterState) {
DiscoveryNode localNode = nodesProvider.nodes().localNode(); DiscoveryNode localNode = nodesProvider.nodes().localNode();
Map<Version, CachedStreamOutput.Entry> serializedStates = Maps.newHashMap(); Map<Version, BytesReference> serializedStates = Maps.newHashMap();
try { for (final DiscoveryNode node : clusterState.nodes()) {
for (final DiscoveryNode node : clusterState.nodes()) { if (node.equals(localNode)) {
if (node.equals(localNode)) { // no need to send to our self
// no need to send to our self continue;
continue; }
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
BytesReference bytes = serializedStates.get(node.version());
if (bytes == null) {
try {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = new HandlesStreamOutput(CompressorFactory.defaultCompressor().streamOutput(bStream));
stream.setVersion(node.version());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
bytes = bStream.bytes();
serializedStates.put(node.version(), bytes);
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
} }
// try and serialize the cluster state once (or per version), so we don't serialize it }
// per node when we send it over the wire, compress it while we are at it... transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
CachedStreamOutput.Entry entry = serializedStates.get(node.version()); new PublishClusterStateRequest(bytes),
if (entry == null) { TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
try {
entry = CachedStreamOutput.popEntry();
StreamOutput stream = entry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(node.version());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
serializedStates.put(node.version(), entry);
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
}
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(entry.bytes().bytes()),
TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
} }
}); });
}
} finally {
for (CachedStreamOutput.Entry entry : serializedStates.values()) {
CachedStreamOutput.pushEntry(entry);
}
} }
} }

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -150,24 +150,20 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
@Override @Override
public void write(MetaData metaData) throws GatewayException { public void write(MetaData metaData) throws GatewayException {
final String newMetaData = "metadata-" + (currentIndex + 1); final String newMetaData = "metadata-" + (currentIndex + 1);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try { try {
StreamOutput streamOutput; BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = bStream;
if (compress) { if (compress) {
streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor()); stream = CompressorFactory.defaultCompressor().streamOutput(stream);
} else {
streamOutput = cachedEntry.bytes();
} }
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject(); builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject(); builder.endObject();
builder.close(); builder.close();
metaDataBlobContainer.writeBlob(newMetaData, cachedEntry.bytes().bytes().streamInput(), cachedEntry.bytes().size()); metaDataBlobContainer.writeBlob(newMetaData, bStream.bytes().streamInput(), bStream.bytes().length());
} catch (IOException e) { } catch (IOException e) {
throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
currentIndex++; currentIndex++;

View File

@ -35,7 +35,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -321,62 +321,57 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception { private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason); logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); XContentBuilder builder = XContentFactory.contentBuilder(format, new BytesStreamOutput());
try { builder.startObject();
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.bytes()); IndexMetaData.Builder.toXContent(indexMetaData, builder, formatParams);
builder.startObject(); builder.endObject();
IndexMetaData.Builder.toXContent(indexMetaData, builder, formatParams); builder.flush();
builder.endObject();
builder.flush();
String stateFileName = "state-" + indexMetaData.version(); String stateFileName = "state-" + indexMetaData.version();
Exception lastFailure = null; Exception lastFailure = null;
boolean wroteAtLeastOnce = false; boolean wroteAtLeastOnce = false;
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
File stateLocation = new File(indexLocation, "_state");
FileSystemUtils.mkdirs(stateLocation);
File stateFile = new File(stateLocation, stateFileName);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
BytesReference bytes = builder.bytes();
fos.write(bytes.array(), bytes.arrayOffset(), bytes.length());
fos.getChannel().force(true);
Closeables.closeQuietly(fos);
wroteAtLeastOnce = true;
} catch (Exception e) {
lastFailure = e;
} finally {
Closeables.closeQuietly(fos);
}
}
if (!wroteAtLeastOnce) {
logger.warn("[{}]: failed to state", lastFailure, indexMetaData.index());
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", lastFailure);
}
// delete the old files
if (previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version()) {
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) { for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
File stateLocation = new File(indexLocation, "_state"); File[] files = new File(indexLocation, "_state").listFiles();
FileSystemUtils.mkdirs(stateLocation); if (files == null) {
File stateFile = new File(stateLocation, stateFileName); continue;
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
BytesReference bytes = cachedEntry.bytes().bytes();
fos.write(bytes.array(), bytes.arrayOffset(), bytes.length());
fos.getChannel().force(true);
Closeables.closeQuietly(fos);
wroteAtLeastOnce = true;
} catch (Exception e) {
lastFailure = e;
} finally {
Closeables.closeQuietly(fos);
} }
} for (File file : files) {
if (!file.getName().startsWith("state-")) {
if (!wroteAtLeastOnce) {
logger.warn("[{}]: failed to state", lastFailure, indexMetaData.index());
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", lastFailure);
}
// delete the old files
if (previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version()) {
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
File[] files = new File(indexLocation, "_state").listFiles();
if (files == null) {
continue; continue;
} }
for (File file : files) { if (file.getName().equals(stateFileName)) {
if (!file.getName().startsWith("state-")) { continue;
continue;
}
if (file.getName().equals(stateFileName)) {
continue;
}
file.delete();
} }
file.delete();
} }
} }
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }
@ -385,60 +380,55 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
// create metadata to write with just the global state // create metadata to write with just the global state
MetaData globalMetaData = MetaData.builder().metaData(metaData).removeAllIndices().build(); MetaData globalMetaData = MetaData.builder().metaData(metaData).removeAllIndices().build();
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); XContentBuilder builder = XContentFactory.contentBuilder(format);
try { builder.startObject();
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.bytes()); MetaData.Builder.toXContent(globalMetaData, builder, formatParams);
builder.startObject(); builder.endObject();
MetaData.Builder.toXContent(globalMetaData, builder, formatParams); builder.flush();
builder.endObject();
builder.flush();
String globalFileName = "global-" + globalMetaData.version(); String globalFileName = "global-" + globalMetaData.version();
Exception lastFailure = null; Exception lastFailure = null;
boolean wroteAtLeastOnce = false; boolean wroteAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) { for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state"); File stateLocation = new File(dataLocation, "_state");
FileSystemUtils.mkdirs(stateLocation); FileSystemUtils.mkdirs(stateLocation);
File stateFile = new File(stateLocation, globalFileName); File stateFile = new File(stateLocation, globalFileName);
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(stateFile); fos = new FileOutputStream(stateFile);
BytesReference bytes = cachedEntry.bytes().bytes(); BytesReference bytes = builder.bytes();
fos.write(bytes.array(), bytes.arrayOffset(), bytes.length()); fos.write(bytes.array(), bytes.arrayOffset(), bytes.length());
fos.getChannel().force(true); fos.getChannel().force(true);
Closeables.closeQuietly(fos); Closeables.closeQuietly(fos);
wroteAtLeastOnce = true; wroteAtLeastOnce = true;
} catch (Exception e) { } catch (Exception e) {
lastFailure = e; lastFailure = e;
} finally { } finally {
Closeables.closeQuietly(fos); Closeables.closeQuietly(fos);
}
} }
}
if (!wroteAtLeastOnce) { if (!wroteAtLeastOnce) {
logger.warn("[_global]: failed to write global state", lastFailure); logger.warn("[_global]: failed to write global state", lastFailure);
throw new IOException("failed to write global state", lastFailure); throw new IOException("failed to write global state", lastFailure);
}
// delete the old files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File[] files = new File(dataLocation, "_state").listFiles();
if (files == null) {
continue;
} }
for (File file : files) {
// delete the old files if (!file.getName().startsWith("global-")) {
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File[] files = new File(dataLocation, "_state").listFiles();
if (files == null) {
continue; continue;
} }
for (File file : files) { if (file.getName().equals(globalFileName)) {
if (!file.getName().startsWith("global-")) { continue;
continue;
}
if (file.getName().equals(globalFileName)) {
continue;
}
file.delete();
} }
file.delete();
} }
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }

View File

@ -31,7 +31,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
@ -267,55 +267,50 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception { private void writeShardState(String reason, ShardId shardId, ShardStateInfo shardStateInfo, @Nullable ShardStateInfo previousStateInfo) throws Exception {
logger.trace("[{}][{}] writing shard state, reason [{}]", shardId.index().name(), shardId.id(), reason); logger.trace("[{}][{}] writing shard state, reason [{}]", shardId.index().name(), shardId.id(), reason);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, new BytesStreamOutput());
try { builder.prettyPrint();
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.bytes()); builder.startObject();
builder.prettyPrint(); builder.field("version", shardStateInfo.version);
builder.startObject(); if (shardStateInfo.primary != null) {
builder.field("version", shardStateInfo.version); builder.field("primary", shardStateInfo.primary);
if (shardStateInfo.primary != null) { }
builder.field("primary", shardStateInfo.primary); builder.endObject();
} builder.flush();
builder.endObject();
builder.flush();
Exception lastFailure = null; Exception lastFailure = null;
boolean wroteAtLeastOnce = false; boolean wroteAtLeastOnce = false;
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state");
FileSystemUtils.mkdirs(shardStateDir);
File stateFile = new File(shardStateDir, "state-" + shardStateInfo.version);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
BytesReference bytes = builder.bytes();
fos.write(bytes.array(), bytes.arrayOffset(), bytes.length());
fos.getChannel().force(true);
Closeables.closeQuietly(fos);
wroteAtLeastOnce = true;
} catch (Exception e) {
lastFailure = e;
} finally {
Closeables.closeQuietly(fos);
}
}
if (!wroteAtLeastOnce) {
logger.warn("[{}][{}]: failed to write shard state", shardId.index().name(), shardId.id(), lastFailure);
throw new IOException("failed to write shard state for " + shardId, lastFailure);
}
// delete the old files
if (previousStateInfo != null && previousStateInfo.version != shardStateInfo.version) {
for (File shardLocation : nodeEnv.shardLocations(shardId)) { for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File shardStateDir = new File(shardLocation, "_state"); File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version);
FileSystemUtils.mkdirs(shardStateDir); stateFile.delete();
File stateFile = new File(shardStateDir, "state-" + shardStateInfo.version);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
BytesReference bytes = cachedEntry.bytes().bytes();
fos.write(bytes.array(), bytes.arrayOffset(), bytes.length());
fos.getChannel().force(true);
Closeables.closeQuietly(fos);
wroteAtLeastOnce = true;
} catch (Exception e) {
lastFailure = e;
} finally {
Closeables.closeQuietly(fos);
}
} }
if (!wroteAtLeastOnce) {
logger.warn("[{}][{}]: failed to write shard state", shardId.index().name(), shardId.id(), lastFailure);
throw new IOException("failed to write shard state for " + shardId, lastFailure);
}
// delete the old files
if (previousStateInfo != null && previousStateInfo.version != shardStateInfo.version) {
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version);
stateFile.delete();
}
}
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.http.netty; package org.elasticsearch.http.netty;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpException; import org.elasticsearch.http.HttpException;
@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse; import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.support.RestUtils; import org.elasticsearch.rest.support.RestUtils;
import org.elasticsearch.transport.netty.NettyTransport;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
@ -103,20 +101,16 @@ public class NettyHttpChannel implements HttpChannel {
} }
// Convert the response content to a ChannelBuffer. // Convert the response content to a ChannelBuffer.
ChannelFutureListener releaseContentListener = null;
ChannelBuffer buf; ChannelBuffer buf;
try { try {
if (response instanceof XContentRestResponse) { if (response instanceof XContentRestResponse) {
// if its a builder based response, and it was created with a CachedStreamOutput, we can release it // if its a builder based response, and it was created with a CachedStreamOutput, we can release it
// after we write the response, and no need to do an extra copy because its not thread safe // after we write the response, and no need to do an extra copy because its not thread safe
XContentBuilder builder = ((XContentRestResponse) response).builder(); XContentBuilder builder = ((XContentRestResponse) response).builder();
if (builder.payload() instanceof CachedStreamOutput.Entry) { if (response.contentThreadSafe()) {
releaseContentListener = new NettyTransport.CacheFutureListener((CachedStreamOutput.Entry) builder.payload());
buf = builder.bytes().toChannelBuffer(); buf = builder.bytes().toChannelBuffer();
} else if (response.contentThreadSafe()) {
buf = ChannelBuffers.wrappedBuffer(response.content(), response.contentOffset(), response.contentLength());
} else { } else {
buf = ChannelBuffers.copiedBuffer(response.content(), response.contentOffset(), response.contentLength()); buf = builder.bytes().copyBytesArray().toChannelBuffer();
} }
} else { } else {
if (response.contentThreadSafe()) { if (response.contentThreadSafe()) {
@ -162,10 +156,6 @@ public class NettyHttpChannel implements HttpChannel {
// Write the response. // Write the response.
ChannelFuture future = channel.write(resp); ChannelFuture future = channel.write(resp);
if (releaseContentListener != null) {
future.addListener(releaseContentListener);
}
// Close the connection after the write operation is done if necessary. // Close the connection after the write operation is done if necessary.
if (close) { if (close) {
future.addListener(ChannelFutureListener.CLOSE); future.addListener(ChannelFutureListener.CLOSE);

View File

@ -22,16 +22,14 @@ package org.elasticsearch.index.gateway.blobstore;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput; import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
@ -318,7 +316,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
CommitPoint commitPoint = new CommitPoint(version, commitPointName, CommitPoint.Type.GENERATED, indexCommitPointFiles, translogCommitPointFiles); CommitPoint commitPoint = new CommitPoint(version, commitPointName, CommitPoint.Type.GENERATED, indexCommitPointFiles, translogCommitPointFiles);
try { try {
byte[] commitPointData = CommitPoints.toXContent(commitPoint); byte[] commitPointData = CommitPoints.toXContent(commitPoint);
blobContainer.writeBlob(commitPointName, new FastByteArrayInputStream(commitPointData), commitPointData.length); blobContainer.writeBlob(commitPointName, new BytesStreamInput(commitPointData, false), commitPointData.length);
} catch (Exception e) { } catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to write commit point", e);
} }
@ -453,7 +451,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final Iterator<CommitPoint.FileInfo> transIt = commitPoint.translogFiles().iterator(); final Iterator<CommitPoint.FileInfo> transIt = commitPoint.translogFiles().iterator();
blobContainer.readBlob(transIt.next().name(), new BlobContainer.ReadBlobListener() { blobContainer.readBlob(transIt.next().name(), new BlobContainer.ReadBlobListener() {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); BytesStreamOutput bos = new BytesStreamOutput();
boolean ignore = false; boolean ignore = false;
@Override @Override
@ -497,7 +495,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
FastByteArrayOutputStream newBos = new FastByteArrayOutputStream(); BytesStreamOutput newBos = new BytesStreamOutput();
int leftOver = bos.size() - position; int leftOver = bos.size() - position;
if (leftOver > 0) { if (leftOver > 0) {

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -182,14 +182,11 @@ public class BinaryFieldMapper extends AbstractFieldMapper<BytesReference> {
value = context.parser().binaryValue(); value = context.parser().binaryValue();
if (compress != null && compress && !CompressorFactory.isCompressed(value, 0, value.length)) { if (compress != null && compress && !CompressorFactory.isCompressed(value, 0, value.length)) {
if (compressThreshold == -1 || value.length > compressThreshold) { if (compressThreshold == -1 || value.length > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor()); StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
streamOutput.writeBytes(value, 0, value.length); stream.writeBytes(value, 0, value.length);
streamOutput.close(); stream.close();
// we copy over the byte array, since we need to push back the cached entry value = bStream.bytes().toBytes();
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
value = cachedEntry.bytes().bytes().copyBytesArray().toBytes();
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedStreamInput; import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -218,6 +218,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
return this.excludes; return this.excludes;
} }
public String[] includes() { public String[] includes() {
return this.includes; return this.includes;
} }
@ -274,12 +275,10 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(source, true); Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(source, true);
Map<String, Object> filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes); Map<String, Object> filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput streamOutput; StreamOutput streamOutput = bStream;
if (compress != null && compress && (compressThreshold == -1 || source.length() > compressThreshold)) { if (compress != null && compress && (compressThreshold == -1 || source.length() > compressThreshold)) {
streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor()); streamOutput = CompressorFactory.defaultCompressor().streamOutput(bStream);
} else {
streamOutput = cachedEntry.bytes();
} }
XContentType contentType = formatContentType; XContentType contentType = formatContentType;
if (contentType == null) { if (contentType == null) {
@ -288,31 +287,23 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource); XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource);
builder.close(); builder.close();
source = cachedEntry.bytes().bytes().copyBytesArray(); source = bStream.bytes();
CachedStreamOutput.pushEntry(cachedEntry);
} else if (compress != null && compress && !CompressorFactory.isCompressed(source)) { } else if (compress != null && compress && !CompressorFactory.isCompressed(source)) {
if (compressThreshold == -1 || source.length() > compressThreshold) { if (compressThreshold == -1 || source.length() > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { XContentType contentType = XContentFactory.xContentType(source);
XContentType contentType = XContentFactory.xContentType(source); if (formatContentType != null && formatContentType != contentType) {
if (formatContentType != null && formatContentType != contentType) { XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, CompressorFactory.defaultCompressor().streamOutput(bStream));
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.bytes(CompressorFactory.defaultCompressor())); builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(source));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(source)); builder.close();
builder.close(); } else {
} else { StreamOutput streamOutput = CompressorFactory.defaultCompressor().streamOutput(bStream);
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor()); source.writeTo(streamOutput);
source.writeTo(streamOutput); streamOutput.close();
streamOutput.close();
}
// we copy over the byte array, since we need to push back the cached entry
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
source = cachedEntry.bytes().bytes().copyBytesArray();
// update the data in the context, so it can be compressed and stored compressed outside...
context.source(source);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
source = bStream.bytes();
// update the data in the context, so it can be compressed and stored compressed outside...
context.source(source);
} }
} else if (formatContentType != null) { } else if (formatContentType != null) {
// see if we need to convert the content type // see if we need to convert the content type
@ -323,18 +314,14 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
compressedStreamInput.resetToBufferStart(); compressedStreamInput.resetToBufferStart();
if (contentType != formatContentType) { if (contentType != formatContentType) {
// we need to reread and store back, compressed.... // we need to reread and store back, compressed....
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { StreamOutput streamOutput = CompressorFactory.defaultCompressor().streamOutput(bStream);
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor()); XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput);
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput); builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(compressedStreamInput));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(compressedStreamInput)); builder.close();
builder.close(); source = bStream.bytes();
source = cachedEntry.bytes().bytes().copyBytesArray(); // update the data in the context, so we store it in the translog in this format
// update the data in the context, so we store it in the translog in this format context.source(source);
context.source(source);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} else { } else {
compressedStreamInput.close(); compressedStreamInput.close();
} }
@ -343,17 +330,13 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
if (contentType != formatContentType) { if (contentType != formatContentType) {
// we need to reread and store back // we need to reread and store back
// we need to reread and store back, compressed.... // we need to reread and store back, compressed....
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, bStream);
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.bytes()); builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(source));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(source)); builder.close();
builder.close(); source = bStream.bytes();
source = cachedEntry.bytes().bytes().copyBytesArray(); // update the data in the context, so we store it in the translog in this format
// update the data in the context, so we store it in the translog in this format context.source(source);
context.source(source);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} }
} }
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.XFilteredQuery; import org.elasticsearch.common.lucene.search.XFilteredQuery;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
@ -856,7 +856,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return; return;
} }
CheckIndex checkIndex = new CheckIndex(store.directory()); CheckIndex checkIndex = new CheckIndex(store.directory());
FastByteArrayOutputStream os = new FastByteArrayOutputStream(); BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name()); PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
checkIndex.setInfoStream(out); checkIndex.setInfoStream(out);
out.flush(); out.flush();

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -329,10 +328,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Override @Override
public Location add(Operation operation) throws TranslogException { public Location add(Operation operation) throws TranslogException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
BytesStreamOutput out = cachedEntry.bytes(); BytesStreamOutput out = new BytesStreamOutput();
out.writeInt(0); // marker for the size... out.writeInt(0); // marker for the size...
TranslogStreams.writeTranslogOperation(out, operation); TranslogStreams.writeTranslogOperation(out, operation);
out.flush(); out.flush();
@ -358,7 +356,6 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally { } finally {
rwl.readLock().unlock(); rwl.readLock().unlock();
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }

View File

@ -73,7 +73,7 @@ public class XContentRestResponse extends AbstractRestResponse {
@Override @Override
public boolean contentThreadSafe() { public boolean contentThreadSafe() {
return false; return true;
} }
@Override @Override

View File

@ -25,7 +25,7 @@ import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -53,8 +53,7 @@ public class RestXContentBuilder {
// default to JSON // default to JSON
contentType = XContentType.JSON; contentType = XContentType.JSON;
} }
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(contentType), new BytesStreamOutput());
XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(contentType), cachedEntry.bytes(), cachedEntry);
if (request.paramAsBoolean("pretty", false)) { if (request.paramAsBoolean("pretty", false)) {
builder.prettyPrint(); builder.prettyPrint();
} }

View File

@ -151,38 +151,34 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override @Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { StreamOutput stream = new HandlesStreamOutput(bStream);
StreamOutput stream = cachedEntry.handles();
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;
status = TransportStatus.setRequest(status); status = TransportStatus.setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response. stream.writeByte(status); // 0 for request, 1 for response.
stream.writeString(action); stream.writeString(action);
request.writeTo(stream); request.writeTo(stream);
stream.close(); stream.close();
final LocalTransport targetTransport = connectedNodes.get(node); final LocalTransport targetTransport = connectedNodes.get(node);
if (targetTransport == null) { if (targetTransport == null) {
throw new NodeNotConnectedException(node, "Node not connected"); throw new NodeNotConnectedException(node, "Node not connected");
}
final byte[] data = cachedEntry.bytes().bytes().copyBytesArray().toBytes();
transportServiceAdapter.sent(data.length);
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
}
});
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
final byte[] data = bStream.bytes().toBytes();
transportServiceAdapter.sent(data.length);
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
}
});
} }
ThreadPool threadPool() { ThreadPool threadPool() {

View File

@ -21,7 +21,7 @@ package org.elasticsearch.transport.local;
import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus; import org.elasticsearch.transport.support.TransportStatus;
@ -62,58 +62,47 @@ public class LocalTransportChannel implements TransportChannel {
@Override @Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput bStream = new BytesStreamOutput();
try { StreamOutput stream = new HandlesStreamOutput(bStream);
StreamOutput stream = cachedEntry.handles(); stream.writeLong(requestId);
stream.writeLong(requestId); byte status = 0;
byte status = 0; status = TransportStatus.setResponse(status);
status = TransportStatus.setResponse(status); stream.writeByte(status); // 0 for request, 1 for response.
stream.writeByte(status); // 0 for request, 1 for response. response.writeTo(stream);
response.writeTo(stream); stream.close();
stream.close(); final byte[] data = bStream.bytes().toBytes();
final byte[] data = cachedEntry.bytes().bytes().copyBytesArray().toBytes(); targetTransport.threadPool().generic().execute(new Runnable() {
targetTransport.threadPool().generic().execute(new Runnable() { @Override
@Override public void run() {
public void run() { targetTransport.messageReceived(data, action, sourceTransport, null);
targetTransport.messageReceived(data, action, sourceTransport, null); }
} });
});
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} }
@Override @Override
public void sendResponse(Throwable error) throws IOException { public void sendResponse(Throwable error) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput stream = new BytesStreamOutput();
try { try {
BytesStreamOutput stream; writeResponseExceptionHeader(stream);
try { RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
stream = cachedEntry.bytes(); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
writeResponseExceptionHeader(stream); too.writeObject(tx);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); too.close();
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); } catch (NotSerializableException e) {
too.writeObject(tx); stream.reset();
too.close(); writeResponseExceptionHeader(stream);
} catch (NotSerializableException e) { RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
cachedEntry.reset(); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
stream = cachedEntry.bytes(); too.writeObject(tx);
writeResponseExceptionHeader(stream); too.close();
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
}
final byte[] data = stream.bytes().copyBytesArray().toBytes();
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
}
});
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
final byte[] data = stream.bytes().toBytes();
targetTransport.threadPool().generic().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
}
});
} }
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException { private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {

View File

@ -28,7 +28,8 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.netty.NettyStaticSetup; import org.elasticsearch.common.netty.NettyStaticSetup;
import org.elasticsearch.common.netty.OpenChannelsHandler; import org.elasticsearch.common.netty.OpenChannelsHandler;
@ -528,32 +529,27 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
options.withCompress(true); options.withCompress(true);
} }
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
byte status = 0; byte status = 0;
status = TransportStatus.setRequest(status); status = TransportStatus.setRequest(status);
BytesStreamOutput bStream = new BytesStreamOutput();
bStream.skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = bStream;
if (options.compress()) { if (options.compress()) {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); stream = CompressorFactory.defaultCompressor().streamOutput(stream);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(node.version());
stream.writeString(action);
request.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
stream.setVersion(node.version());
stream.writeString(action);
request.writeTo(stream);
stream.close();
} }
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); stream = new HandlesStreamOutput(stream);
NettyHeader.writeHeader(buffer, requestId, status, node.version());
stream.setVersion(node.version());
stream.writeString(action);
request.writeTo(stream);
stream.close();
ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, node.version());
targetChannel.write(buffer);
ChannelFuture future = targetChannel.write(buffer);
future.addListener(new CacheFutureListener(cachedEntry));
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future // We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
// channelFuture.addListener(new ChannelFutureListener() { // channelFuture.addListener(new ChannelFutureListener() {
// @Override public void operationComplete(ChannelFuture future) throws Exception { // @Override public void operationComplete(ChannelFuture future) throws Exception {
@ -902,19 +898,4 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
} }
} }
public static class CacheFutureListener implements ChannelFutureListener {
private final CachedStreamOutput.Entry cachedEntry;
public CacheFutureListener(CachedStreamOutput.Entry cachedEntry) {
this.cachedEntry = cachedEntry;
}
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
CachedStreamOutput.pushEntry(cachedEntry);
}
}
} }

View File

@ -23,13 +23,12 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus; import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import java.io.IOException; import java.io.IOException;
import java.io.NotSerializableException; import java.io.NotSerializableException;
@ -72,46 +71,39 @@ public class NettyTransportChannel implements TransportChannel {
if (transport.compress) { if (transport.compress) {
options.withCompress(true); options.withCompress(true);
} }
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
byte status = 0; byte status = 0;
status = TransportStatus.setResponse(status); status = TransportStatus.setResponse(status);
BytesStreamOutput bStream = new BytesStreamOutput();
bStream.skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = bStream;
if (options.compress()) { if (options.compress()) {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); stream = CompressorFactory.defaultCompressor().streamOutput(stream);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(version);
response.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
stream.setVersion(version);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
response.writeTo(stream);
stream.close();
} }
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); stream = new HandlesStreamOutput(stream);
stream.setVersion(version);
response.writeTo(stream);
stream.close();
ChannelBuffer buffer = bStream.bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version); NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer); channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
} }
@Override @Override
public void sendResponse(Throwable error) throws IOException { public void sendResponse(Throwable error) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput stream = new BytesStreamOutput();
BytesStreamOutput stream;
try { try {
stream = cachedEntry.bytes(); stream.skip(NettyHeader.HEADER_SIZE);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx); too.writeObject(tx);
too.close(); too.close();
} catch (NotSerializableException e) { } catch (NotSerializableException e) {
cachedEntry.reset(); stream.reset();
stream = cachedEntry.bytes(); stream.skip(NettyHeader.HEADER_SIZE);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx); too.writeObject(tx);
@ -122,9 +114,8 @@ public class NettyTransportChannel implements TransportChannel {
status = TransportStatus.setResponse(status); status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status); status = TransportStatus.setError(status);
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); ChannelBuffer buffer = stream.bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version); NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer); channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
} }
} }

View File

@ -19,17 +19,6 @@
package org.elasticsearch.test.integration.search.geo; package org.elasticsearch.test.integration.search.geo;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.geoIntersectionFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.geo.builders.ShapeBuilder; import org.elasticsearch.common.geo.builders.ShapeBuilder;
@ -39,6 +28,17 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.geoIntersectionFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
public class GeoShapeIntegrationTests extends AbstractSharedClusterTest { public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
@Test @Test
@ -147,7 +147,8 @@ public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
assertThat(searchResponse.getHits().getAt(0).id(), equalTo("blakely")); assertThat(searchResponse.getHits().getAt(0).id(), equalTo("blakely"));
} }
@Test // TODO this test causes hangs, blocking on the action get when fetching the shape for some reason
@Test(enabled = false)
public void testIndexedShapeReference() throws Exception { public void testIndexedShapeReference() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1") String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("location") .startObject("properties").startObject("location")
@ -199,7 +200,7 @@ public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
public void testReusableBuilder() throws IOException { public void testReusableBuilder() throws IOException {
ShapeBuilder polygon = ShapeBuilder.newPolygon() ShapeBuilder polygon = ShapeBuilder.newPolygon()
.point(170, -10).point(190, -10).point(190, 10).point(170, 10) .point(170, -10).point(190, -10).point(190, 10).point(170, 10)
.hole().point(175, -5).point(185,-5).point(185,5).point(175,5).close() .hole().point(175, -5).point(185, -5).point(185, 5).point(175, 5).close()
.close(); .close();
assertUnmodified(polygon); assertUnmodified(polygon);
@ -219,18 +220,18 @@ public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
public void testParsingMultipleShapes() throws IOException { public void testParsingMultipleShapes() throws IOException {
String mapping = XContentFactory.jsonBuilder() String mapping = XContentFactory.jsonBuilder()
.startObject() .startObject()
.startObject("type1") .startObject("type1")
.startObject("properties") .startObject("properties")
.startObject("location1") .startObject("location1")
.field("type", "geo_shape") .field("type", "geo_shape")
.endObject()
.startObject("location2")
.field("type", "geo_shape")
.endObject()
.endObject()
.endObject()
.endObject() .endObject()
.string(); .startObject("location2")
.field("type", "geo_shape")
.endObject()
.endObject()
.endObject()
.endObject()
.string();
prepareCreate("test").addMapping("type1", mapping).execute().actionGet(); prepareCreate("test").addMapping("type1", mapping).execute().actionGet();
ensureYellow(); ensureYellow();
@ -243,11 +244,11 @@ public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
client().admin().indices().prepareRefresh("test").execute().actionGet(); client().admin().indices().prepareRefresh("test").execute().actionGet();
String filter = "{\"geo_shape\": {\"location2\": {\"indexed_shape\": {" String filter = "{\"geo_shape\": {\"location2\": {\"indexed_shape\": {"
+ "\"id\": \"1\"," + "\"id\": \"1\","
+ "\"type\": \"type1\"," + "\"type\": \"type1\","
+ "\"index\": \"test\"," + "\"index\": \"test\","
+ "\"shape_field_name\": \"location2\"" + "\"shape_field_name\": \"location2\""
+ "}}}}"; + "}}}}";
SearchResponse result = client().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).setFilter(filter).execute().actionGet(); SearchResponse result = client().prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).setFilter(filter).execute().actionGet();
assertHitCount(result, 1); assertHitCount(result, 1);
@ -260,7 +261,7 @@ public class GeoShapeIntegrationTests extends AbstractSharedClusterTest {
.field("type", "geo_shape") .field("type", "geo_shape")
.endObject().endObject() .endObject().endObject()
.startObject("_source") .startObject("_source")
.startArray("excludes").value("nonExistingField").endArray() .startArray("excludes").value("nonExistingField").endArray()
.endObject() .endObject()
.endObject().endObject() .endObject().endObject()
.string(); .string();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test.unit.common.io.streams;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -36,7 +35,7 @@ public class BytesStreamsTests {
@Test @Test
public void testSimpleStreams() throws Exception { public void testSimpleStreams() throws Exception {
BytesStreamOutput out = CachedStreamOutput.popEntry().bytes(); BytesStreamOutput out = new BytesStreamOutput();
out.writeBoolean(false); out.writeBoolean(false);
out.writeByte((byte) 1); out.writeByte((byte) 1);
out.writeShort((short) -1); out.writeShort((short) -1);

View File

@ -20,8 +20,8 @@
package org.elasticsearch.test.unit.common.xcontent.builder; package org.elasticsearch.test.unit.common.xcontent.builder;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.FastCharArrayWriter; import org.elasticsearch.common.io.FastCharArrayWriter;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentGenerator; import org.elasticsearch.common.xcontent.XContentGenerator;
@ -82,7 +82,7 @@ public class XContentBuilderTests {
@Test @Test
public void testWritingBinaryToStream() throws Exception { public void testWritingBinaryToStream() throws Exception {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); BytesStreamOutput bos = new BytesStreamOutput();
XContentGenerator gen = XContentFactory.xContent(XContentType.JSON).createGenerator(bos); XContentGenerator gen = XContentFactory.xContent(XContentType.JSON).createGenerator(bos);
gen.writeStartObject(); gen.writeStartObject();

View File

@ -19,7 +19,7 @@
package org.elasticsearch.test.unit.common.xcontent.smile; package org.elasticsearch.test.unit.common.xcontent.smile;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentGenerator; import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -48,10 +48,10 @@ public class JsonVsSmileTests {
@Test @Test
public void compareParsingTokens() throws IOException { public void compareParsingTokens() throws IOException {
FastByteArrayOutputStream xsonOs = new FastByteArrayOutputStream(); BytesStreamOutput xsonOs = new BytesStreamOutput();
XContentGenerator xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs); XContentGenerator xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs);
FastByteArrayOutputStream jsonOs = new FastByteArrayOutputStream(); BytesStreamOutput jsonOs = new BytesStreamOutput();
XContentGenerator jsonGen = XContentFactory.xContent(XContentType.JSON).createGenerator(jsonOs); XContentGenerator jsonGen = XContentFactory.xContent(XContentType.JSON).createGenerator(jsonOs);
xsonGen.writeStartObject(); xsonGen.writeStartObject();

View File

@ -19,8 +19,11 @@
package org.elasticsearch.test.unit.deps.jackson; package org.elasticsearch.test.unit.deps.jackson;
import com.fasterxml.jackson.core.*; import com.fasterxml.jackson.core.JsonFactory;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.IOException; import java.io.IOException;
@ -41,8 +44,8 @@ public class JacksonLocationTests {
// value : "something" // value : "something"
// } // }
// } // }
FastByteArrayOutputStream os = new FastByteArrayOutputStream(); BytesStreamOutput os = new BytesStreamOutput();
JsonGenerator gen = new JsonFactory().createJsonGenerator(os, JsonEncoding.UTF8); JsonGenerator gen = new JsonFactory().createJsonGenerator(os);
gen.writeStartObject(); gen.writeStartObject();
gen.writeStringField("index", "test"); gen.writeStringField("index", "test");

View File

@ -123,6 +123,37 @@ public abstract class AbstractSimpleTransportTests {
assertThat(e.getMessage(), false, equalTo(true)); assertThat(e.getMessage(), false, equalTo(true));
} }
res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void handleResponse(StringMessageResponse response) {
assertThat("hello moshe", equalTo(response.message));
}
@Override
public void handleException(TransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessageResponse message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello"); serviceA.removeHandler("sayHello");
} }