add adapter streams that wrap another adapter and delegate all method calls, have handles use them

This commit is contained in:
Shay Banon 2011-12-25 17:37:58 +02:00
parent aa078788f9
commit fb218babe3
5 changed files with 279 additions and 85 deletions

View File

@ -0,0 +1,119 @@
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
*/
public abstract class AdapterStreamInput extends StreamInput {
protected StreamInput in;
protected AdapterStreamInput() {
}
public AdapterStreamInput(StreamInput in) {
this.in = in;
}
public void reset(StreamInput in) {
this.in = in;
}
@Override
public byte readByte() throws IOException {
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}
@Override
public void reset() throws IOException {
in.reset();
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public int read() throws IOException {
return in.read();
}
// override ones to direct them
@Override
public void readFully(byte[] b) throws IOException {
in.readFully(b);
}
@Override
public short readShort() throws IOException {
return in.readShort();
}
@Override
public int readInt() throws IOException {
return in.readInt();
}
@Override
public int readVInt() throws IOException {
return in.readVInt();
}
@Override
public long readLong() throws IOException {
return in.readLong();
}
@Override
public long readVLong() throws IOException {
return in.readVLong();
}
@Override
public String readUTF() throws IOException {
return in.readUTF();
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return in.skip(n);
}
@Override
public int available() throws IOException {
return in.available();
}
@Override
public void mark(int readlimit) {
in.mark(readlimit);
}
@Override
public boolean markSupported() {
return in.markSupported();
}
@Override
public String toString() {
return in.toString();
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
*/
public class AdapterStreamOutput extends StreamOutput {
protected StreamOutput out;
public AdapterStreamOutput(StreamOutput out) {
this.out = out;
}
public void reset(StreamOutput out) throws IOException {
this.out = out;
}
public StreamOutput wrappedOut() {
return this.out;
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void reset() throws IOException {
out.reset();
}
@Override
public void writeBytes(byte[] b) throws IOException {
out.writeBytes(b);
}
@Override
public void writeBytes(byte[] b, int length) throws IOException {
out.writeBytes(b, length);
}
@Override
public void writeInt(int i) throws IOException {
out.writeInt(i);
}
@Override
public void writeVInt(int i) throws IOException {
out.writeVInt(i);
}
@Override
public void writeLong(long i) throws IOException {
out.writeLong(i);
}
@Override
public void writeVLong(long i) throws IOException {
out.writeVLong(i);
}
@Override
public void writeUTF(String str) throws IOException {
out.writeUTF(str);
}
@Override
public void writeFloat(float v) throws IOException {
out.writeFloat(v);
}
@Override
public void writeDouble(double v) throws IOException {
out.writeDouble(v);
}
@Override
public void writeBoolean(boolean b) throws IOException {
out.writeBoolean(b);
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
}
@Override
public String toString() {
return out.toString();
}
}

View File

@ -26,19 +26,18 @@ import java.io.IOException;
/** /**
* *
*/ */
public class HandlesStreamInput extends StreamInput { public class HandlesStreamInput extends AdapterStreamInput {
private StreamInput in;
private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>(); private final TIntObjectHashMap<String> handles = new TIntObjectHashMap<String>();
private final TIntObjectHashMap<String> identityHandles = new TIntObjectHashMap<String>(); private final TIntObjectHashMap<String> identityHandles = new TIntObjectHashMap<String>();
HandlesStreamInput() { HandlesStreamInput() {
super();
} }
public HandlesStreamInput(StreamInput in) { public HandlesStreamInput(StreamInput in) {
this.in = in; super(in);
} }
@Override @Override
@ -65,49 +64,21 @@ public class HandlesStreamInput extends StreamInput {
} }
} }
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public byte readByte() throws IOException {
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}
public void cleanHandles() {
handles.clear();
}
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
in.reset(); super.reset();
handles.clear();
}
public void reset(StreamInput in) {
this.in = in;
handles.clear(); handles.clear();
identityHandles.clear(); identityHandles.clear();
} }
@Override public void reset(StreamInput in) {
public void close() throws IOException { super.reset(in);
in.close(); handles.clear();
identityHandles.clear();
}
public void cleanHandles() {
handles.clear();
identityHandles.clear();
} }
} }

View File

@ -28,15 +28,13 @@ import java.util.Arrays;
/** /**
* *
*/ */
public class HandlesStreamOutput extends StreamOutput { public class HandlesStreamOutput extends AdapterStreamOutput {
private static final int DEFAULT_IDENTITY_THRESHOLD = 100; private static final int DEFAULT_IDENTITY_THRESHOLD = 50;
// a threshold above which strings will use identity check // a threshold above which strings will use identity check
private final int identityThreshold; private final int identityThreshold;
private StreamOutput out;
private final TObjectIntHashMap<String> handles = new TObjectIntHashMap<String>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR, -1); private final TObjectIntHashMap<String> handles = new TObjectIntHashMap<String>(Constants.DEFAULT_CAPACITY, Constants.DEFAULT_LOAD_FACTOR, -1);
private final HandleTable identityHandles = new HandleTable(10, (float) 3.00); private final HandleTable identityHandles = new HandleTable(10, (float) 3.00);
@ -46,7 +44,7 @@ public class HandlesStreamOutput extends StreamOutput {
} }
public HandlesStreamOutput(StreamOutput out, int identityThreshold) { public HandlesStreamOutput(StreamOutput out, int identityThreshold) {
this.out = out; super(out);
this.identityThreshold = identityThreshold; this.identityThreshold = identityThreshold;
} }
@ -78,21 +76,6 @@ public class HandlesStreamOutput extends StreamOutput {
} }
} }
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
}
public void cleanHandles() {
handles.clear();
identityHandles.clear();
}
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
handles.clear(); handles.clear();
@ -101,25 +84,10 @@ public class HandlesStreamOutput extends StreamOutput {
} }
public void reset(StreamOutput out) throws IOException { public void reset(StreamOutput out) throws IOException {
this.out = out; super.reset(out);
reset(); reset();
} }
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
public StreamOutput wrappedOut() {
return this.out;
}
/** /**
* Lightweight identity hash table which maps objects to integer handles, * Lightweight identity hash table which maps objects to integer handles,
* assigned in ascending order. * assigned in ascending order.

View File

@ -186,15 +186,15 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
byte status = buffer.readByte(); byte status = buffer.readByte();
boolean isRequest = TransportStreams.statusIsRequest(status); boolean isRequest = TransportStreams.statusIsRequest(status);
HandlesStreamInput handlesStream; HandlesStreamInput wrappedStream;
if (TransportStreams.statusIsCompress(status)) { if (TransportStreams.statusIsCompress(status)) {
handlesStream = CachedStreamInput.cachedHandlesLzf(streamIn); wrappedStream = CachedStreamInput.cachedHandlesLzf(streamIn);
} else { } else {
handlesStream = CachedStreamInput.cachedHandles(streamIn); wrappedStream = CachedStreamInput.cachedHandles(streamIn);
} }
if (isRequest) { if (isRequest) {
String action = handleRequest(channel, handlesStream, requestId); String action = handleRequest(channel, wrappedStream, requestId);
if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
@ -208,9 +208,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
// ignore if its null, the adapter logs it // ignore if its null, the adapter logs it
if (handler != null) { if (handler != null) {
if (TransportStreams.statusIsError(status)) { if (TransportStreams.statusIsError(status)) {
handlerResponseError(handlesStream, handler); handlerResponseError(wrappedStream, handler);
} else { } else {
handleResponse(handlesStream, handler); handleResponse(wrappedStream, handler);
} }
} else { } else {
// if its null, skip those bytes // if its null, skip those bytes
@ -225,7 +225,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
buffer.readerIndex(expectedIndexReader); buffer.readerIndex(expectedIndexReader);
} }
} }
handlesStream.cleanHandles(); wrappedStream.cleanHandles();
} }
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {