HBASE-15177 Reduce garbage created under high load
This commit is contained in:
parent
d5d26f0804
commit
a69272efe1
|
@ -191,6 +191,13 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
if (Thread.interrupted()) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
|
||||
if (controller == null) {
|
||||
controller = controllerFactory.newController();
|
||||
controller.setPriority(getTableName());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
if (scannerId != -1) {
|
||||
close();
|
||||
|
@ -209,9 +216,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
|
||||
this.scanMetrics != null, renew);
|
||||
ScanResponse response = null;
|
||||
controller = controllerFactory.newController();
|
||||
controller.setPriority(getTableName());
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
response = getStub().scan(controller, request);
|
||||
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
|
||||
|
@ -371,7 +375,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
ScanRequest request =
|
||||
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
|
||||
try {
|
||||
getStub().scan(null, request);
|
||||
getStub().scan(controller, request);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
|
@ -388,7 +392,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
getLocation().getRegionInfo().getRegionName(),
|
||||
this.scan, 0, false);
|
||||
try {
|
||||
ScanResponse response = getStub().scan(null, request);
|
||||
ScanResponse response = getStub().scan(controller, request);
|
||||
long id = response.getScannerId();
|
||||
if (logScannerActivity) {
|
||||
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
|
||||
|
|
|
@ -412,7 +412,7 @@ public class AsyncRpcChannel {
|
|||
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
|
||||
}
|
||||
// Only pass priority if there one. Let zero be same as no priority.
|
||||
if (call.controller.getPriority() != 0) {
|
||||
if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
requestHeaderBuilder.setPriority(call.controller.getPriority());
|
||||
}
|
||||
|
||||
|
@ -660,6 +660,7 @@ public class AsyncRpcChannel {
|
|||
private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
|
||||
final UserGroupInformation user) throws IOException, InterruptedException {
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws IOException, InterruptedException {
|
||||
if (shouldAuthenticateOverKrb()) {
|
||||
if (currRetries < MAX_SASL_RETRIES) {
|
||||
|
@ -702,12 +703,12 @@ public class AsyncRpcChannel {
|
|||
public int getConnectionHashCode() {
|
||||
return ConnectionId.hashCode(ticket, serviceName, address);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getConnectionHashCode();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof AsyncRpcChannel) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -180,19 +180,18 @@ public class IPCUtil {
|
|||
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
|
||||
final byte [] cellBlock)
|
||||
throws IOException {
|
||||
return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
|
||||
return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param codec
|
||||
* @param cellBlock
|
||||
* @param offset
|
||||
* @param length
|
||||
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
|
||||
* position()'ed at the start of the cell block and limit()'ed at the end.
|
||||
* @return CellScanner to work against the content of <code>cellBlock</code>
|
||||
* @throws IOException
|
||||
*/
|
||||
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
|
||||
final byte [] cellBlock, final int offset, final int length)
|
||||
final ByteBuffer cellBlock)
|
||||
throws IOException {
|
||||
// If compressed, decompress it first before passing it on else we will leak compression
|
||||
// resources if the stream is not closed properly after we let it out.
|
||||
|
@ -202,18 +201,17 @@ public class IPCUtil {
|
|||
if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
|
||||
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
|
||||
CompressionInputStream cis =
|
||||
compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
|
||||
poolDecompressor);
|
||||
compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
|
||||
ByteBufferOutputStream bbos = null;
|
||||
try {
|
||||
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
|
||||
// TODO: Reuse buffers.
|
||||
bbos = new ByteBufferOutputStream((length - offset) *
|
||||
bbos = new ByteBufferOutputStream(cellBlock.remaining() *
|
||||
this.cellBlockDecompressionMultiplier);
|
||||
IOUtils.copy(cis, bbos);
|
||||
bbos.close();
|
||||
ByteBuffer bb = bbos.getByteBuffer();
|
||||
is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
|
||||
is = new ByteBufferInputStream(bb);
|
||||
} finally {
|
||||
if (is != null) is.close();
|
||||
if (bbos != null) bbos.close();
|
||||
|
@ -221,7 +219,7 @@ public class IPCUtil {
|
|||
CodecPool.returnDecompressor(poolDecompressor);
|
||||
}
|
||||
} else {
|
||||
is = new ByteArrayInputStream(cellBlock, offset, length);
|
||||
is = new ByteBufferInputStream(cellBlock);
|
||||
}
|
||||
return codec.getDecoder(is);
|
||||
}
|
||||
|
|
|
@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public class PayloadCarryingRpcController
|
||||
extends TimeLimitedRpcController implements CellScannable {
|
||||
|
||||
public static final int PRIORITY_UNSET = -1;
|
||||
/**
|
||||
* Priority to set on this request. Set it here in controller so available composing the
|
||||
* request. This is the ordained way of setting priorities going forward. We will be
|
||||
* undoing the old annotation-based mechanism.
|
||||
*/
|
||||
// Currently only multi call makes use of this. Eventually this should be only way to set
|
||||
// priority.
|
||||
private int priority = HConstants.NORMAL_QOS;
|
||||
private int priority = PRIORITY_UNSET;
|
||||
|
||||
/**
|
||||
* They are optionally set on construction, cleared after we make the call, and then optionally
|
||||
|
@ -67,6 +67,7 @@ public class PayloadCarryingRpcController
|
|||
/**
|
||||
* @return One-shot cell scanner (you cannot back it up and restart)
|
||||
*/
|
||||
@Override
|
||||
public CellScanner cellScanner() {
|
||||
return cellScanner;
|
||||
}
|
||||
|
|
|
@ -899,8 +899,10 @@ public class RpcClientImpl extends AbstractRpcClient {
|
|||
cellBlockBuilder.setLength(cellBlock.limit());
|
||||
builder.setCellBlockMeta(cellBlockBuilder.build());
|
||||
}
|
||||
// Only pass priority if there one. Let zero be same as no priority.
|
||||
if (priority != 0) builder.setPriority(priority);
|
||||
// Only pass priority if there is one set.
|
||||
if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
|
||||
builder.setPriority(priority);
|
||||
}
|
||||
RequestHeader header = builder.build();
|
||||
|
||||
setupIOstreams();
|
||||
|
|
|
@ -2430,13 +2430,13 @@ public final class ProtobufUtil {
|
|||
*/
|
||||
public static String getRegionEncodedName(
|
||||
final RegionSpecifier regionSpecifier) throws DoNotRetryIOException {
|
||||
byte[] value = regionSpecifier.getValue().toByteArray();
|
||||
ByteString value = regionSpecifier.getValue();
|
||||
RegionSpecifierType type = regionSpecifier.getType();
|
||||
switch (type) {
|
||||
case REGION_NAME:
|
||||
return HRegionInfo.encodeRegionName(value);
|
||||
return HRegionInfo.encodeRegionName(value.toByteArray());
|
||||
case ENCODED_REGION_NAME:
|
||||
return Bytes.toString(value);
|
||||
return value.toStringUtf8();
|
||||
default:
|
||||
throw new DoNotRetryIOException(
|
||||
"Unsupported region specifier type: " + type);
|
||||
|
@ -3135,6 +3135,19 @@ public final class ProtobufUtil {
|
|||
codedInput.checkLastTagWas(0);
|
||||
}
|
||||
|
||||
public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length)
|
||||
throws IOException {
|
||||
codedInput.resetSizeCounter();
|
||||
int prevLimit = codedInput.setSizeLimit(length);
|
||||
|
||||
int limit = codedInput.pushLimit(length);
|
||||
builder.mergeFrom(codedInput);
|
||||
codedInput.popLimit(limit);
|
||||
|
||||
codedInput.checkLastTagWas(0);
|
||||
codedInput.setSizeLimit(prevLimit);
|
||||
}
|
||||
|
||||
public static ReplicationLoadSink toReplicationLoadSink(
|
||||
ClusterStatusProtos.ReplicationLoadSink cls) {
|
||||
return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());
|
||||
|
|
|
@ -522,7 +522,7 @@ public class TestClientScanner {
|
|||
anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
|
||||
|
||||
try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
|
||||
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
|
||||
clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
|
||||
Iterator<Result> iter = scanner.iterator();
|
||||
while (iter.hasNext()) {
|
||||
iter.next();
|
||||
|
|
|
@ -58,7 +58,7 @@ public class TestIPCUtil {
|
|||
public void before() {
|
||||
this.util = new IPCUtil(new Configuration());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBuildCellBlock() throws IOException {
|
||||
doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
|
||||
|
@ -79,7 +79,7 @@ public class TestIPCUtil {
|
|||
CellScanner cellScanner = sized? getSizedCellScanner(cells):
|
||||
CellUtil.createCellScanner(Arrays.asList(cells).iterator());
|
||||
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
|
||||
cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
|
||||
cellScanner = util.createCellScanner(codec, compressor, bb);
|
||||
int i = 0;
|
||||
while (cellScanner.advance()) {
|
||||
i++;
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
|
||||
/**
|
||||
* Not thread safe!
|
||||
* <p>
|
||||
* Please note that the reads will cause position movement on wrapped ByteBuffer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ByteBufferInputStream extends InputStream {
|
||||
|
||||
private ByteBuffer buf;
|
||||
|
||||
public ByteBufferInputStream(ByteBuffer buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next byte of data from this input stream. The value byte is returned as an
|
||||
* <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
|
||||
* because the end of the stream has been reached, the value <code>-1</code> is returned.
|
||||
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
|
||||
*/
|
||||
@Override
|
||||
public int read() {
|
||||
if (this.buf.hasRemaining()) {
|
||||
return (this.buf.get() & 0xff);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
|
||||
* given offset).
|
||||
* @param b the array into which the data is read.
|
||||
* @param off the start offset in the destination array <code>b</code>
|
||||
* @param len the maximum number of bytes to read.
|
||||
* @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
|
||||
* 1 byte can be read because the end of the stream has been reached.
|
||||
*/
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) {
|
||||
int avail = available();
|
||||
if (avail <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (len > avail) {
|
||||
len = avail;
|
||||
}
|
||||
if (len <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len);
|
||||
this.buf.position(this.buf.position() + len); // we should advance the buffer position
|
||||
return len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
|
||||
* end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
|
||||
* equal to the smaller of <code>n</code> and remaining bytes in the stream.
|
||||
* @param n the number of bytes to be skipped.
|
||||
* @return the actual number of bytes skipped.
|
||||
*/
|
||||
@Override
|
||||
public long skip(long n) {
|
||||
long k = Math.min(n, available());
|
||||
if (k < 0) {
|
||||
k = 0;
|
||||
}
|
||||
this.buf.position((int) (this.buf.position() + k));
|
||||
return k;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of remaining bytes that can be read (or skipped
|
||||
* over) from this input stream.
|
||||
*/
|
||||
@Override
|
||||
public int available() {
|
||||
return this.buf.remaining();
|
||||
}
|
||||
}
|
|
@ -45,7 +45,7 @@ public class Threads {
|
|||
private static final Log LOG = LogFactory.getLog(Threads.class);
|
||||
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
|
||||
private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
|
||||
public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
|
||||
new UncaughtExceptionHandler() {
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.codec.Codec;
|
|||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
|
@ -110,6 +111,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Counter;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -529,10 +531,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
return this.size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResponseCellSize() {
|
||||
return responseCellSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementResponseCellSize(long cellSize) {
|
||||
responseCellSize += cellSize;
|
||||
}
|
||||
|
@ -621,7 +625,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
readPool = Executors.newFixedThreadPool(readThreads,
|
||||
new ThreadFactoryBuilder().setNameFormat(
|
||||
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
|
||||
",port=" + port).setDaemon(true).build());
|
||||
",port=" + port).setDaemon(true)
|
||||
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||
for (int i = 0; i < readThreads; ++i) {
|
||||
Reader reader = new Reader();
|
||||
readers[i] = reader;
|
||||
|
@ -898,7 +903,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
throw ieo;
|
||||
} catch (Exception e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
|
||||
LOG.debug(getName() + ": Caught exception while reading:", e);
|
||||
}
|
||||
count = -1; //so that the (count < 0) block is executed
|
||||
}
|
||||
|
@ -944,6 +949,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
Responder() throws IOException {
|
||||
this.setName("RpcServer.responder");
|
||||
this.setDaemon(true);
|
||||
this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
|
||||
writeSelector = Selector.open(); // create a selector
|
||||
}
|
||||
|
||||
|
@ -1361,17 +1367,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
return authorizedUgi;
|
||||
}
|
||||
|
||||
private void saslReadAndProcess(byte[] saslToken) throws IOException,
|
||||
private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
|
||||
InterruptedException {
|
||||
if (saslContextEstablished) {
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace("Have read input token of size " + saslToken.length
|
||||
LOG.trace("Have read input token of size " + saslToken.limit()
|
||||
+ " for processing by saslServer.unwrap()");
|
||||
|
||||
if (!useWrap) {
|
||||
processOneRpc(saslToken);
|
||||
} else {
|
||||
byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
|
||||
byte[] b = saslToken.array();
|
||||
byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
|
||||
processUnwrappedData(plaintextData);
|
||||
}
|
||||
} else {
|
||||
|
@ -1420,10 +1427,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Have read input token of size " + saslToken.length
|
||||
LOG.debug("Have read input token of size " + saslToken.limit()
|
||||
+ " for processing by saslServer.evaluateResponse()");
|
||||
}
|
||||
replyToken = saslServer.evaluateResponse(saslToken);
|
||||
replyToken = saslServer.evaluateResponse(saslToken.array());
|
||||
} catch (IOException e) {
|
||||
IOException sendToClient = e;
|
||||
Throwable cause = e;
|
||||
|
@ -1619,6 +1626,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
throw new IllegalArgumentException("Unexpected data length "
|
||||
+ dataLength + "!! from " + getHostAddress());
|
||||
}
|
||||
|
||||
// TODO: check dataLength against some limit so that the client cannot OOM the server
|
||||
data = ByteBuffer.allocate(dataLength);
|
||||
|
||||
// Increment the rpc count. This counter will be decreased when we write
|
||||
|
@ -1648,9 +1657,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
|
||||
if (useSasl) {
|
||||
saslReadAndProcess(data.array());
|
||||
saslReadAndProcess(data);
|
||||
} else {
|
||||
processOneRpc(data.array());
|
||||
processOneRpc(data);
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
@ -1679,8 +1688,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
|
||||
// Reads the connection header following version
|
||||
private void processConnectionHeader(byte[] buf) throws IOException {
|
||||
this.connectionHeader = ConnectionHeader.parseFrom(buf);
|
||||
private void processConnectionHeader(ByteBuffer buf) throws IOException {
|
||||
this.connectionHeader = ConnectionHeader.parseFrom(
|
||||
new ByteBufferInputStream(buf));
|
||||
String serviceName = connectionHeader.getServiceName();
|
||||
if (serviceName == null) throw new EmptyServiceNameException();
|
||||
this.service = getService(services, serviceName);
|
||||
|
@ -1794,13 +1804,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
if (unwrappedData.remaining() == 0) {
|
||||
unwrappedDataLengthBuffer.clear();
|
||||
unwrappedData.flip();
|
||||
processOneRpc(unwrappedData.array());
|
||||
processOneRpc(unwrappedData);
|
||||
unwrappedData = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
|
||||
|
||||
private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
|
||||
if (connectionHeaderRead) {
|
||||
processRequest(buf);
|
||||
} else {
|
||||
|
@ -1822,16 +1833,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
protected void processRequest(byte[] buf) throws IOException, InterruptedException {
|
||||
long totalRequestSize = buf.length;
|
||||
protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
|
||||
long totalRequestSize = buf.limit();
|
||||
int offset = 0;
|
||||
// Here we read in the header. We avoid having pb
|
||||
// do its default 4k allocation for CodedInputStream. We force it to use backing array.
|
||||
CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
|
||||
CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
|
||||
int headerSize = cis.readRawVarint32();
|
||||
offset = cis.getTotalBytesRead();
|
||||
Message.Builder builder = RequestHeader.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
|
||||
ProtobufUtil.mergeFrom(builder, cis, headerSize);
|
||||
RequestHeader header = (RequestHeader) builder.build();
|
||||
offset += headerSize;
|
||||
int id = header.getCallId();
|
||||
|
@ -1862,19 +1873,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
|
||||
if (md == null) throw new UnsupportedOperationException(header.getMethodName());
|
||||
builder = this.service.getRequestPrototype(md).newBuilderForType();
|
||||
// To read the varint, I need an inputstream; might as well be a CIS.
|
||||
cis = CodedInputStream.newInstance(buf, offset, buf.length);
|
||||
cis.resetSizeCounter();
|
||||
int paramSize = cis.readRawVarint32();
|
||||
offset += cis.getTotalBytesRead();
|
||||
if (builder != null) {
|
||||
ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
|
||||
ProtobufUtil.mergeFrom(builder, cis, paramSize);
|
||||
param = builder.build();
|
||||
}
|
||||
offset += paramSize;
|
||||
}
|
||||
if (header.hasCellBlockMeta()) {
|
||||
cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
|
||||
buf, offset, buf.length);
|
||||
buf.position(offset);
|
||||
cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
InetSocketAddress address = getListenerAddress();
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
|
@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
if (param == null) {
|
||||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
if (param instanceof MultiRequest) {
|
||||
// The multi call has its priority set in the header. All calls should work this way but
|
||||
// only this one has been converted so far. No priority == NORMAL_QOS.
|
||||
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
|
||||
|
||||
// Trust the client-set priorities if set
|
||||
if (header.hasPriority()) {
|
||||
return header.getPriority();
|
||||
}
|
||||
|
||||
String cls = param.getClass().getName();
|
||||
|
|
|
@ -5177,6 +5177,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
|
||||
* lock is requested
|
||||
*/
|
||||
@Override
|
||||
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
|
||||
// Make sure the row is inside of this region before getting the lock for it.
|
||||
checkRow(row, "row lock");
|
||||
|
@ -5592,8 +5593,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// Here we separate all scanners into two lists - scanner that provide data required
|
||||
// by the filter to operate (scanners list) and all others (joinedScanners list).
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
|
||||
List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
|
||||
List<KeyValueScanner> joinedScanners
|
||||
= new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
|
||||
if (additionalScanners != null) {
|
||||
scanners.addAll(additionalScanners);
|
||||
}
|
||||
|
|
|
@ -1147,8 +1147,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
Region getRegion(
|
||||
final RegionSpecifier regionSpecifier) throws IOException {
|
||||
return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
|
||||
ProtobufUtil.getRegionEncodedName(regionSpecifier));
|
||||
ByteString value = regionSpecifier.getValue();
|
||||
RegionSpecifierType type = regionSpecifier.getType();
|
||||
switch (type) {
|
||||
case REGION_NAME:
|
||||
byte[] regionName = value.toByteArray();
|
||||
String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
|
||||
return regionServer.getRegionByEncodedName(regionName, encodedRegionName);
|
||||
case ENCODED_REGION_NAME:
|
||||
return regionServer.getRegionByEncodedName(value.toStringUtf8());
|
||||
default:
|
||||
throw new DoNotRetryIOException(
|
||||
"Unsupported region specifier type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
Loading…
Reference in New Issue