From fb218babe30e79d8119f30ea25eed8e6af4dd74f Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 25 Dec 2011 17:37:58 +0200 Subject: [PATCH] add adapter streams that wrap another adapter and delegate all method calls, have handles use them --- .../common/io/stream/AdapterStreamInput.java | 119 +++++++++++++++ .../common/io/stream/AdapterStreamOutput.java | 136 ++++++++++++++++++ .../common/io/stream/HandlesStreamInput.java | 55 ++----- .../common/io/stream/HandlesStreamOutput.java | 40 +----- .../netty/MessageChannelHandler.java | 14 +- 5 files changed, 279 insertions(+), 85 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java create mode 100644 src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java new file mode 100644 index 00000000000..c5b61f33b6d --- /dev/null +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java @@ -0,0 +1,119 @@ +package org.elasticsearch.common.io.stream; + +import java.io.IOException; + +/** + */ +public abstract class AdapterStreamInput extends StreamInput { + + protected StreamInput in; + + protected AdapterStreamInput() { + } + + public AdapterStreamInput(StreamInput in) { + this.in = in; + } + + public void reset(StreamInput in) { + this.in = in; + } + + @Override + public byte readByte() throws IOException { + return in.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + in.readBytes(b, offset, len); + } + + @Override + public void reset() throws IOException { + in.reset(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + // override ones to direct them + + + @Override + public void readFully(byte[] b) throws IOException { + in.readFully(b); + } + + @Override + public short readShort() throws IOException { + return in.readShort(); + } + + @Override + public int readInt() throws IOException { + return in.readInt(); + } + + @Override + public int readVInt() throws IOException { + return in.readVInt(); + } + + @Override + public long readLong() throws IOException { + return in.readLong(); + } + + @Override + public long readVLong() throws IOException { + return in.readVLong(); + } + + @Override + public String readUTF() throws IOException { + return in.readUTF(); + } + + @Override + public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return in.skip(n); + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public String toString() { + return in.toString(); + } +} diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java new file mode 100644 index 00000000000..2010160407a --- /dev/null +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java @@ -0,0 +1,136 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import java.io.IOException; + +/** + */ +public class AdapterStreamOutput extends StreamOutput { + + protected StreamOutput out; + + public AdapterStreamOutput(StreamOutput out) { + this.out = out; + } + + public void reset(StreamOutput out) throws IOException { + this.out = out; + } + + public StreamOutput wrappedOut() { + return this.out; + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void reset() throws IOException { + out.reset(); + } + + @Override + public void writeBytes(byte[] b) throws IOException { + out.writeBytes(b); + } + + @Override + public void writeBytes(byte[] b, int length) throws IOException { + out.writeBytes(b, length); + } + + @Override + public void writeInt(int i) throws IOException { + out.writeInt(i); + } + + @Override + public void writeVInt(int i) throws IOException { + out.writeVInt(i); + } + + @Override + public void writeLong(long i) throws IOException { + out.writeLong(i); + } + + @Override + public void writeVLong(long i) throws IOException { + out.writeVLong(i); + } + + @Override + public void writeUTF(String str) throws IOException { + out.writeUTF(str); + } + + @Override + public void writeFloat(float v) throws IOException { + out.writeFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + out.writeDouble(v); + } + + @Override + public void writeBoolean(boolean b) throws IOException { + out.writeBoolean(b); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public String toString() { + return out.toString(); + } +} diff --git a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java index 35f63ec72e8..8551dedd70f 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamInput.java @@ -26,19 +26,18 @@ import java.io.IOException; /** * */ -public class HandlesStreamInput extends StreamInput { - - private StreamInput in; +public class HandlesStreamInput extends AdapterStreamInput { private final TIntObjectHashMap handles = new TIntObjectHashMap(); private final TIntObjectHashMap identityHandles = new TIntObjectHashMap(); HandlesStreamInput() { + super(); } public HandlesStreamInput(StreamInput in) { - this.in = in; + super(in); } @Override @@ -65,49 +64,21 @@ public class HandlesStreamInput extends StreamInput { } } - @Override - public int read() throws IOException { - return in.read(); - } - - @Override - public int read(byte[] b) throws IOException { - return in.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return in.read(b, off, len); - } - - @Override - public byte readByte() throws IOException { - return in.readByte(); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - in.readBytes(b, offset, len); - } - - public void cleanHandles() { - handles.clear(); - } - @Override public void reset() throws IOException { - in.reset(); - handles.clear(); - } - - public void reset(StreamInput in) { - this.in = in; + super.reset(); handles.clear(); identityHandles.clear(); } - @Override - public void close() throws IOException { - in.close(); + public void reset(StreamInput in) { + super.reset(in); + handles.clear(); + identityHandles.clear(); + } + + public void cleanHandles() { + handles.clear(); + identityHandles.clear(); } } diff --git a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java index fa1be3eac93..2ad85045109 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/HandlesStreamOutput.java @@ -28,15 +28,13 @@ import java.util.Arrays; /** * */ -public class HandlesStreamOutput extends StreamOutput { +public class HandlesStreamOutput extends AdapterStreamOutput { - private static final int DEFAULT_IDENTITY_THRESHOLD = 100; + private static final int DEFAULT_IDENTITY_THRESHOLD = 50; // a threshold above which strings will use identity check private final int identityThreshold; - private StreamOutput out; - private final TObjectIntHashMap handles = new TObjectIntHashMap(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR, -1); private final HandleTable identityHandles = new HandleTable(10, (float) 3.00); @@ -46,7 +44,7 @@ public class HandlesStreamOutput extends StreamOutput { } public HandlesStreamOutput(StreamOutput out, int identityThreshold) { - this.out = out; + super(out); this.identityThreshold = identityThreshold; } @@ -78,21 +76,6 @@ public class HandlesStreamOutput extends StreamOutput { } } - @Override - public void writeByte(byte b) throws IOException { - out.writeByte(b); - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - out.writeBytes(b, offset, length); - } - - public void cleanHandles() { - handles.clear(); - identityHandles.clear(); - } - @Override public void reset() throws IOException { handles.clear(); @@ -101,25 +84,10 @@ public class HandlesStreamOutput extends StreamOutput { } public void reset(StreamOutput out) throws IOException { - this.out = out; + super.reset(out); reset(); } - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - out.close(); - } - - public StreamOutput wrappedOut() { - return this.out; - } - - /** * Lightweight identity hash table which maps objects to integer handles, * assigned in ascending order. diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 71ebf872a72..7b98998e8da 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -186,15 +186,15 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { byte status = buffer.readByte(); boolean isRequest = TransportStreams.statusIsRequest(status); - HandlesStreamInput handlesStream; + HandlesStreamInput wrappedStream; if (TransportStreams.statusIsCompress(status)) { - handlesStream = CachedStreamInput.cachedHandlesLzf(streamIn); + wrappedStream = CachedStreamInput.cachedHandlesLzf(streamIn); } else { - handlesStream = CachedStreamInput.cachedHandles(streamIn); + wrappedStream = CachedStreamInput.cachedHandles(streamIn); } if (isRequest) { - String action = handleRequest(channel, handlesStream, requestId); + String action = handleRequest(channel, wrappedStream, requestId); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); @@ -208,9 +208,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { // ignore if its null, the adapter logs it if (handler != null) { if (TransportStreams.statusIsError(status)) { - handlerResponseError(handlesStream, handler); + handlerResponseError(wrappedStream, handler); } else { - handleResponse(handlesStream, handler); + handleResponse(wrappedStream, handler); } } else { // if its null, skip those bytes @@ -225,7 +225,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { buffer.readerIndex(expectedIndexReader); } } - handlesStream.cleanHandles(); + wrappedStream.cleanHandles(); } private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {