HBASE-3199 large response handling: some fixups and cleanups

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-11-09 22:40:26 +00:00
parent 2640b277cc
commit 0cb9652dd1
6 changed files with 250 additions and 12 deletions

View File

@ -664,6 +664,7 @@ Release 0.90.0 - Unreleased
HBASE-3112 Enable and disable of table needs a bit of loving in new master HBASE-3112 Enable and disable of table needs a bit of loving in new master
HBASE-3207 If we get IOException when closing a region, we should still HBASE-3207 If we get IOException when closing a region, we should still
remove it from online regions and complete the close in ZK remove it from online regions and complete the close in ZK
HBASE-3199 large response handling: some fixups and cleanups
IMPROVEMENTS IMPROVEMENTS

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Ordering;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -65,7 +66,7 @@ import java.util.TreeMap;
* through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()}, * through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
* {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}. * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.
*/ */
public class Result implements Writable { public class Result implements Writable, WritableWithSize {
private static final byte RESULT_VERSION = (byte)1; private static final byte RESULT_VERSION = (byte)1;
private KeyValue [] kvs = null; private KeyValue [] kvs = null;
@ -523,6 +524,20 @@ public class Result implements Writable {
this.kvs = kvs.toArray(new KeyValue[kvs.size()]); this.kvs = kvs.toArray(new KeyValue[kvs.size()]);
} }
public long getWritableSize() {
if (isEmpty())
return Bytes.SIZEOF_INT; // int size = 0
long size = Bytes.SIZEOF_INT; // totalLen
for (KeyValue kv : kvs) {
size += kv.getLength();
size += Bytes.SIZEOF_INT; // kv.getLength
}
return size;
}
public void write(final DataOutput out) public void write(final DataOutput out)
throws IOException { throws IOException {
if(isEmpty()) { if(isEmpty()) {
@ -540,6 +555,29 @@ public class Result implements Writable {
} }
} }
public static long getWriteArraySize(Result [] results) {
long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION
if (results == null || results.length == 0) {
size += Bytes.SIZEOF_INT;
return size;
}
size += Bytes.SIZEOF_INT; // results.length
size += Bytes.SIZEOF_INT; // bufLen
for (Result result : results) {
size += Bytes.SIZEOF_INT; // either 0 or result.size()
if (result == null || result.isEmpty())
continue;
for (KeyValue kv : result.raw()) {
size += Bytes.SIZEOF_INT; // kv.getLength();
size += kv.getLength();
}
}
return size;
}
public static void writeArray(final DataOutput out, Result [] results) public static void writeArray(final DataOutput out, Result [] results)
throws IOException { throws IOException {
// Write version when writing array form. // Write version when writing array form.

View File

@ -96,7 +96,7 @@ import org.apache.hadoop.io.WritableFactories;
* name and reflection to instantiate class was costing in excess of the cell * name and reflection to instantiate class was costing in excess of the cell
* handling). * handling).
*/ */
public class HbaseObjectWritable implements Writable, Configurable { public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable {
protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class); protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
// Here we maintain two static maps of classes to code and vice versa. // Here we maintain two static maps of classes to code and vice versa.
@ -260,6 +260,10 @@ public class HbaseObjectWritable implements Writable, Configurable {
writeObject(out, instance, declaredClass, conf); writeObject(out, instance, declaredClass, conf);
} }
public long getWritableSize() {
return getWritableSize(instance, declaredClass, conf);
}
private static class NullInstance extends Configured implements Writable { private static class NullInstance extends Configured implements Writable {
Class<?> declaredClass; Class<?> declaredClass;
/** default constructor for writable */ /** default constructor for writable */
@ -314,6 +318,27 @@ public class HbaseObjectWritable implements Writable, Configurable {
out.writeByte(code); out.writeByte(code);
} }
public static long getWritableSize(Object instance, Class declaredClass,
Configuration conf) {
long size = Bytes.SIZEOF_BYTE; // code
if (instance == null) {
return 0L;
}
if (declaredClass.isArray()) {
if (declaredClass.equals(Result[].class)) {
return size + Result.getWriteArraySize((Result[])instance);
}
}
if (declaredClass.equals(Result.class)) {
Result r = (Result) instance;
// one extra class code for writable instance.
return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
}
return 0L; // no hint is the default.
}
/** /**
* Write a {@link Writable}, {@link String}, primitive type, or an array of * Write a {@link Writable}, {@link String}, primitive type, or an array of
* the preceding. * the preceding.

View File

@ -0,0 +1,36 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
/**
* An optional interface to 'size' writables.
*/
public interface WritableWithSize {
/**
* Provide a size hint to the caller. write() should ideally
* not go beyond this if at all possible.
*
* You can return 0 if there is no size hint.
*
* @return the size of the writable
*/
public long getWritableSize();
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Not thread safe!
*/
public class ByteBufferOutputStream extends OutputStream {
protected ByteBuffer buf;
public ByteBufferOutputStream(int capacity) {
this(capacity, false);
}
public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
if (useDirectByteBuffer) {
buf = ByteBuffer.allocateDirect(capacity);
} else {
buf = ByteBuffer.allocate(capacity);
}
}
public int size() {
return buf.position();
}
/**
* This flips the underlying BB so be sure to use it _last_!
* @return
*/
public ByteBuffer getByteBuffer() {
buf.flip();
return buf;
}
private void checkSizeAndGrow(int extra) {
if ( (buf.position() + extra) > buf.limit()) {
// size calculation is complex, because we could overflow negative,
// and/or not allocate enough space. this fixes that.
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra);
ByteBuffer newBuf = ByteBuffer.allocate(newSize);
buf.flip();
newBuf.put(buf);
buf = newBuf;
}
}
// OutputStream
@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf.put((byte)b);
}
@Override
public void write(byte[] b) throws IOException {
checkSizeAndGrow(b.length);
buf.put(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
buf.put(b, off, len);
}
@Override
public void flush() throws IOException {
// noop
}
@Override
public void close() throws IOException {
// noop again. heh
}
}

View File

@ -55,6 +55,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -89,6 +91,14 @@ public abstract class HBaseServer {
*/ */
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
private static final String WARN_RESPONSE_SIZE =
"hbase.ipc.warn.response.size";
/** Default value for above param */
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
private final int warnResponseSize;
public static final Log LOG = public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@ -989,6 +999,8 @@ public abstract class HBaseServer {
/** Handles queued calls . */ /** Handles queued calls . */
private class Handler extends Thread { private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue; private final BlockingQueue<Call> myCallQueue;
static final int BUFFER_INITIAL_SIZE = 1024;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) { public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq; this.myCallQueue = cq;
this.setDaemon(true); this.setDaemon(true);
@ -1005,8 +1017,6 @@ public abstract class HBaseServer {
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
SERVER.set(HBaseServer.this); SERVER.set(HBaseServer.this);
final int buffersize = 16 * 1024;
ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
while (running) { while (running) {
try { try {
Call call = myCallQueue.take(); // pop the queue; maybe blocked here Call call = myCallQueue.take(); // pop the queue; maybe blocked here
@ -1031,14 +1041,24 @@ public abstract class HBaseServer {
} }
CurCall.set(null); CurCall.set(null);
if (buf.size() > buffersize) { int size = BUFFER_INITIAL_SIZE;
// Allocate a new BAOS as reset only moves size back to zero but if (value instanceof WritableWithSize) {
// keeps the buffer of whatever the largest write was -- see // get the size hint.
// hbase-900. WritableWithSize ohint = (WritableWithSize)value;
buf = new ByteArrayOutputStream(buffersize); long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
} else { if (hint > 0) {
buf.reset(); if ((hint) > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
new IOException("Result buffer size too large: " + hint);
errorClass = ioe.getClass().getName();
error = StringUtils.stringifyException(ioe);
} else {
size = (int)hint;
}
}
} }
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf); DataOutputStream out = new DataOutputStream(buf);
out.writeInt(call.id); // write call id out.writeInt(call.id); // write call id
out.writeBoolean(error != null); // write error flag out.writeBoolean(error != null); // write error flag
@ -1049,7 +1069,14 @@ public abstract class HBaseServer {
WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error); WritableUtils.writeString(out, error);
} }
call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
if (buf.size() > warnResponseSize) {
LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+ StringUtils.humanReadableInt(buf.size()));
}
call.setResponse(buf.getByteBuffer());
responder.doRespond(call); responder.doRespond(call);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it
@ -1140,6 +1167,10 @@ public abstract class HBaseServer {
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true); this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
// Create the responder here // Create the responder here
responder = new Responder(); responder = new Responder();
} }