HBASE-5945 Reduce buffer copies in IPC server response path

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1543458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-11-19 15:39:35 +00:00
parent dc8ecd9a9a
commit 74f6e25c8f
13 changed files with 518 additions and 244 deletions

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
@ -117,10 +115,15 @@ class IPCUtil {
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
int count = 0;
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
count++;
}
encoder.flush();
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
if (count == 0) return null;
} finally {
os.close();
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
@ -187,24 +190,23 @@ class IPCUtil {
}
/**
* Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized
* to hold these elements.
* @param header
* @param param
* @param cellBlock
* @return A {@link ByteBufferOutputStream} filled with the content of the passed in
* <code>header</code>, <code>param</code>, and <code>cellBlock</code>.
* @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
* serialization.
* @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException
*/
static ByteBufferOutputStream write(final Message header, final Message param,
final ByteBuffer cellBlock)
throws IOException {
int totalSize = getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) totalSize += cellBlock.limit();
ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize);
write(bbos, header, param, cellBlock, totalSize);
bbos.close();
return bbos;
static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null;
int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
byte [] buffer = new byte[serializedSize + vintSize];
// Passing in a byte array saves COS creating a buffer which it does when using streams.
CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
// This will write out the vint preamble and the message serialized.
cos.writeMessageNoTag(m);
cos.flush();
cos.checkNoSpaceLeft();
return ByteBuffer.wrap(buffer);
}
/**
@ -230,8 +232,9 @@ class IPCUtil {
private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize)
throws IOException {
// I confirmed toBytes does same as say DataOutputStream#writeInt.
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
header.writeDelimitedTo(dos);
if (param != null) param.writeDelimitedTo(dos);
if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
@ -239,20 +242,6 @@ class IPCUtil {
return totalSize;
}
/**
* @param in Stream cue'd up just before a delimited message
* @return Bytes that hold the bytes that make up the message read from <code>in</code>
* @throws IOException
*/
static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException {
byte b = in.readByte();
int size = CodedInputStream.readRawVarint32(b, in);
// Allocate right-sized buffer rather than let pb allocate its default minimum 4k.
byte [] bytes = new byte[size];
IOUtils.readFully(in, bytes);
return bytes;
}
/**
* Read in chunks of 8K (HBASE-7239)
* @param in

View File

@ -175,10 +175,13 @@ public final class ProtobufUtil {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setExists(true);
builder.setAssociatedCellCount(0);
EMPTY_RESULT_PB_EXISTS_TRUE = builder.build();
builder.clear();
builder.setExists(false);
builder.setAssociatedCellCount(0);
EMPTY_RESULT_PB_EXISTS_FALSE = builder.build();
builder.clear();
@ -1192,19 +1195,11 @@ public final class ProtobufUtil {
* @return the converted protocol buffer Result
*/
public static ClientProtos.Result toResultNoData(final Result result) {
if (result.getExists() != null){
return result.getExists() ? EMPTY_RESULT_PB_EXISTS_TRUE : EMPTY_RESULT_PB_EXISTS_FALSE;
}
if (result.getExists() != null) return toResult(result.getExists());
int size = result.size();
if (size == 0){
return EMPTY_RESULT_PB;
}
if (size == 0) return EMPTY_RESULT_PB;
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
return builder.build();
}

View File

@ -62,7 +62,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void authenticationFailure();
void sentBytes(int count);
void sentBytes(long count);
void receivedBytes(int count);

View File

@ -86,7 +86,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
public void sentBytes(int count) {
public void sentBytes(long count) {
this.sentBytes.incr(count);
}

View File

@ -89,7 +89,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
public void sentBytes(int count) {
public void sentBytes(long count) {
this.sentBytes.incr(count);
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Chain of ByteBuffers.
* Used writing out an array of byte buffers. Writes in chunks.
*/
@InterfaceAudience.Private
class BufferChain {
private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
private final ByteBuffer[] buffers;
private int remaining = 0;
private int bufferOffset = 0;
BufferChain(ByteBuffer ... buffers) {
// Some of the incoming buffers can be null
List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
for (ByteBuffer b : buffers) {
if (b == null) continue;
bbs.add(b);
this.remaining += b.remaining();
}
this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
}
/**
* Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This
* call drains this instance; it cannot be used subsequent to the call.
* @return A new byte buffer with the content of all contained ByteBuffers.
*/
byte [] getBytes() {
if (!hasRemaining()) throw new IllegalAccessError();
byte [] bytes = new byte [this.remaining];
int offset = 0;
for (ByteBuffer bb: this.buffers) {
System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
offset += bb.capacity();
}
return bytes;
}
boolean hasRemaining() {
return remaining > 0;
}
/**
* Write out our chain of buffers in chunks
* @param channel Where to write
* @param chunkSize Size of chunks to write.
* @return Amount written.
* @throws IOException
*/
long write(GatheringByteChannel channel, int chunkSize) throws IOException {
int chunkRemaining = chunkSize;
ByteBuffer lastBuffer = null;
int bufCount = 0;
int restoreLimit = -1;
while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
lastBuffer = buffers[bufferOffset + bufCount];
if (!lastBuffer.hasRemaining()) {
bufferOffset++;
continue;
}
bufCount++;
if (lastBuffer.remaining() > chunkRemaining) {
restoreLimit = lastBuffer.limit();
lastBuffer.limit(lastBuffer.position() + chunkRemaining);
chunkRemaining = 0;
break;
} else {
chunkRemaining -= lastBuffer.remaining();
}
}
assert lastBuffer != null;
if (chunkRemaining == chunkSize) {
assert !hasRemaining();
// no data left to write
return 0;
}
try {
long ret = channel.write(buffers, bufferOffset, bufCount);
if (ret > 0) {
remaining -= ret;
}
return ret;
} finally {
if (restoreLimit >= 0) {
lastBuffer.limit(restoreLimit);
}
}
}
}

View File

@ -47,7 +47,7 @@ public class MetricsHBaseServer {
source.authenticationSuccess();
}
void sentBytes(int count) {
void sentBytes(long count) {
source.sentBytes(count);
}

View File

@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -276,7 +276,10 @@ public class RpcServer implements RpcServerInterface {
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected ByteBuffer response; // the response for this call
/**
* Chain of buffers to send as response.
*/
protected BufferChain response;
protected boolean delayResponse;
protected Responder responder;
protected boolean delayReturnValue; // if the return value should be
@ -341,14 +344,14 @@ public class RpcServer implements RpcServerInterface {
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
this.response = response;
this.response = new BufferChain(response);
}
protected synchronized void setResponse(Object m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
ByteBufferOutputStream bbos = null;
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
// Presume it a pb Message. Could be null.
@ -380,42 +383,44 @@ public class RpcServer implements RpcServerInterface {
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
bbos = IPCUtil.write(header, result, cellBlock);
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
(cellBlock == null? 0: cellBlock.limit());
ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
if (connection.useWrap) {
wrapWithSasl(bbos);
bc = wrapWithSasl(bc);
}
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
ByteBuffer bb = null;
if (bbos != null) {
// TODO: If SASL, maybe buffer already been flipped and written?
bb = bbos.getByteBuffer();
bb.position(0);
}
this.response = bb;
this.response = bc;
}
private void wrapWithSasl(ByteBufferOutputStream response)
throws IOException {
if (connection.useSasl) {
// getByteBuffer calls flip()
ByteBuffer buf = response.getByteBuffer();
byte[] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer to wrap responses.
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(buf.array(),
buf.arrayOffset(), buf.remaining());
}
if (LOG.isDebugEnabled())
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
buf.clear();
DataOutputStream saslOut = new DataOutputStream(response);
saslOut.writeInt(token.length);
saslOut.write(token, 0, token.length);
private BufferChain wrapWithSasl(BufferChain bc)
throws IOException {
if (bc == null) return bc;
if (!this.connection.useSasl) return bc;
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
// THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes();
byte [] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer to wrap responses.
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
}
if (LOG.isDebugEnabled())
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
return new BufferChain(bbTokenLength, bbTokenBytes);
}
@Override
@ -994,7 +999,7 @@ public class RpcServer implements RpcServerInterface {
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);
long numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
@ -1347,6 +1352,7 @@ public class RpcServer implements RpcServerInterface {
}
}
}
/**
* No protobuf encoding of raw sasl messages
*/
@ -2169,19 +2175,15 @@ public class RpcServer implements RpcServerInterface {
* buffer.
*
* @param channel writable byte channel to write to
* @param buffer buffer to write
* @param bufferChain Chain of buffers to write
* @return number of bytes written
* @throws java.io.IOException e
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/
protected int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
metrics.sentBytes(count);
}
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
throws IOException {
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
if (count > 0) this.metrics.sentBytes(count);
return count;
}

View File

@ -2834,7 +2834,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (existence != null){
ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
builder.setResult(pbr);
}else if (r != null) {
} else if (r != null) {
ClientProtos.Result pbr = ProtobufUtil.toResult(r);
builder.setResult(pbr);
}
@ -3294,6 +3294,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
@Override
public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;

View File

@ -64,7 +64,6 @@ public class TestFromClientSide3 {
private final static byte[] VAL_BYTES = Bytes.toBytes("v1");
private final static byte[] ROW_BYTES = Bytes.toBytes("r1");
/**
* @throws java.lang.Exception
*/
@ -345,6 +344,7 @@ public class TestFromClientSide3 {
gets.add(new Get(ROW));
gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
LOG.info("Calling exists");
Boolean[] results = table.exists(gets);
assertEquals(results[0], false);
assertEquals(results[1], false);

View File

@ -82,8 +82,7 @@ import org.junit.experimental.categories.Category;
public class TestRegionObserverInterface {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
public static final TableName TEST_TABLE =
TableName.valueOf("TestTable");
public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
@ -111,126 +110,132 @@ public class TestRegionObserverInterface {
@Test
public void testRegionObserver() throws IOException {
TableName tableName = TEST_TABLE;
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRegionObserver");
// recreate table every time in order to reset the status of the
// coprocessor.
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {false, false, false, false, false});
try {
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
tableName,
new Boolean[] {false, false, false, false, false});
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
tableName,
new Boolean[] {false, false, true, true, true, true, false}
);
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
TEST_TABLE,
new Integer[] {1, 1, 0, 0});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
tableName,
new Integer[] {1, 1, 0, 0});
Get get = new Get(ROW);
get.addColumn(A, A);
get.addColumn(B, B);
get.addColumn(C, C);
table.get(get);
Get get = new Get(ROW);
get.addColumn(A, A);
get.addColumn(B, B);
get.addColumn(C, C);
table.get(get);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
TEST_TABLE,
new Boolean[] {true, true, true, true, false}
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDelete"},
tableName,
new Boolean[] {true, true, true, true, false}
);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
table.delete(delete);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
table.delete(delete);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
tableName,
new Boolean[] {true, true, true, true, true, true, true}
);
util.deleteTable(tableName);
table.close();
);
} finally {
util.deleteTable(tableName);
table.close();
}
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
TEST_TABLE,
tableName,
new Integer[] {1, 1, 1, 1});
}
@Test
public void testRowMutation() throws IOException {
TableName tableName = TEST_TABLE;
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRowMutation");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
try {
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
TEST_TABLE,
tableName,
new Boolean[] {false, false, false, false, false});
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
RowMutations arm = new RowMutations(ROW);
arm.add(put);
arm.add(delete);
table.mutateRow(arm);
RowMutations arm = new RowMutations(ROW);
arm.add(put);
arm.add(delete);
table.mutateRow(arm);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
TEST_TABLE,
new Boolean[] {false, false, true, true, true}
);
util.deleteTable(tableName);
table.close();
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
tableName,
new Boolean[] {false, false, true, true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testIncrementHook() throws IOException {
TableName tableName = TEST_TABLE;
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
Increment inc = new Increment(Bytes.toBytes(0));
inc.addColumn(A, A, 1);
try {
Increment inc = new Increment(Bytes.toBytes(0));
inc.addColumn(A, A, 1);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {false, false}
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {false, false}
);
table.increment(inc);
table.increment(inc);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {true, true}
);
util.deleteTable(tableName);
table.close();
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreIncrement", "hadPostIncrement"},
tableName,
new Boolean[] {true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
@ -464,84 +469,82 @@ public class TestRegionObserverInterface {
@Test
public void bulkLoadHFileTest() throws Exception {
String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
TableName tableName = TEST_TABLE;
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest");
Configuration conf = util.getConfiguration();
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try {
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {false, false}
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {false, false}
);
FileSystem fs = util.getTestFileSystem();
final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(A));
FileSystem fs = util.getTestFileSystem();
final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(A));
createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
//Bulk load
new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
//Bulk load
new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {true, true}
);
util.deleteTable(tableName);
table.close();
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {true, true}
);
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testRecovery() throws Exception {
LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery");
TableName tableName = TEST_TABLE;
LOG.info(TestRegionObserverInterface.class.getName() +".testRecovery");
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRecovery");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
try {
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
ServerName sn2 = rs1.getRegionServer().getServerName();
String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
ServerName sn2 = rs1.getRegionServer().getServerName();
String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
Thread.sleep(100);
}
util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
Thread.sleep(100);
}
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
tableName,
new Boolean[] {false, false, true, true, true, true, false}
);
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
TEST_TABLE,
new Integer[] {0, 0, 1, 1});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
tableName,
new Integer[] {0, 0, 1, 1});
cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(20000); // just to be sure that the kill has fully started.
util.waitUntilAllRegionsAssigned(tableName);
cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(1000); // Let the kill soak in.
util.waitUntilAllRegionsAssigned(tableName);
LOG.info("All regions assigned");
verifyMethodResult(SimpleRegionObserver.class,
new String[]{"getCtPreWALRestore", "getCtPostWALRestore"},
TEST_TABLE,
new Integer[]{1, 1});
verifyMethodResult(SimpleRegionObserver.class,
new String[]{"getCtPrePut", "getCtPostPut"},
TEST_TABLE,
new Integer[]{0, 0});
util.deleteTable(tableName);
table.close();
verifyMethodResult(SimpleRegionObserver.class,
new String[]{"getCtPrePut", "getCtPostPut"},
tableName,
new Integer[]{0, 0});
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
@Category(SmallTests.class)
public class TestBufferChain {
private File tmpFile;
private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] {
"hello".getBytes(Charsets.UTF_8),
" ".getBytes(Charsets.UTF_8),
"world".getBytes(Charsets.UTF_8)
};
@Before
public void setup() throws IOException {
tmpFile = File.createTempFile("TestBufferChain", "txt");
}
@After
public void teardown() {
tmpFile.delete();
}
@Test
public void testGetBackBytesWePutIn() {
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
BufferChain chain = new BufferChain(bufs);
assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
}
@Test
public void testChainChunkBiggerThanWholeArray() throws IOException {
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain, "hello world", 8192);
assertNoRemaining(bufs);
}
@Test
public void testChainChunkBiggerThanSomeArrays() throws IOException {
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain, "hello world", 3);
assertNoRemaining(bufs);
}
@Test
public void testLimitOffset() throws IOException {
ByteBuffer[] bufs = new ByteBuffer[] {
stringBuf("XXXhelloYYY", 3, 5),
stringBuf(" ", 0, 1),
stringBuf("XXXXworldY", 4, 5) };
BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain , "hello world", 3);
assertNoRemaining(bufs);
}
@Test
public void testWithSpy() throws IOException {
ByteBuffer[] bufs = new ByteBuffer[] {
stringBuf("XXXhelloYYY", 3, 5),
stringBuf(" ", 0, 1),
stringBuf("XXXXworldY", 4, 5) };
BufferChain chain = new BufferChain(bufs);
FileOutputStream fos = new FileOutputStream(tmpFile);
FileChannel ch = Mockito.spy(fos.getChannel());
try {
chain.write(ch, 2);
assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 2);
assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 3);
assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 8);
assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
} finally {
ch.close();
fos.close();
}
}
private ByteBuffer stringBuf(String string, int position, int length) {
ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
buf.position(position);
buf.limit(position + length);
assertTrue(buf.hasRemaining());
return buf;
}
private void assertNoRemaining(ByteBuffer[] bufs) {
for (ByteBuffer buf : bufs) {
assertFalse(buf.hasRemaining());
}
}
private ByteBuffer[] wrapArrays(byte[][] arrays) {
ByteBuffer[] ret = new ByteBuffer[arrays.length];
for (int i = 0; i < arrays.length; i++) {
ret[i] = ByteBuffer.wrap(arrays[i]);
}
return ret;
}
private void writeAndVerify(BufferChain chain, String string, int chunkSize)
throws IOException {
FileOutputStream fos = new FileOutputStream(tmpFile);
FileChannel ch = fos.getChannel();
try {
long remaining = string.length();
while (chain.hasRemaining()) {
long n = chain.write(ch, chunkSize);
assertTrue(n == chunkSize || n == remaining);
remaining -= n;
}
assertEquals(0, remaining);
} finally {
fos.close();
}
assertFalse(chain.hasRemaining());
assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8));
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SmallTests;
@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -76,6 +79,7 @@ import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
@ -89,6 +93,8 @@ public class TestIPC {
public static final Log LOG = LogFactory.getLog(TestIPC.class);
static byte [] CELL_BYTES = Bytes.toBytes("xyz");
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
static byte [] BIG_CELL_BYTES = new byte [10 * 1024];
static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
@ -303,8 +309,10 @@ public class TestIPC {
int cellcount = Integer.parseInt(args[1]);
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
KeyValue kv = KeyValueUtil.ensureKeyValue(CELL);
KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
Put p = new Put(kv.getRow());
for (int i = 0; i < cellcount; i++) {
p.add(kv);
@ -324,15 +332,17 @@ public class TestIPC {
RegionAction.newBuilder(),
ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
CellScanner cellScanner = CellUtil.createCellScanner(cells);
if (i % 1000 == 0) {
builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME).
setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
if (i % 100000 == 0) {
LOG.info("" + i);
// Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName());
}
CellScanner cellScanner = CellUtil.createCellScanner(cells);
Pair<Message, CellScanner> response =
client.call(null, builder.build(), cellScanner, null, user, address, 0);
client.call(md, builder.build(), cellScanner, param, user, address, 0);
/*
int count = 0;
while (p.getSecond().advance()) {