HBASE-9230 Fix the server so it can take a pure pb request param and return a pure pb result

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-08-28 19:59:09 +00:00
parent 89df414955
commit 02de8c40d9
27 changed files with 1257 additions and 941 deletions

View File

@ -48,17 +48,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@ -73,26 +72,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListTableNamesByNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos
.ListTableNamesByNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
@ -101,10 +80,14 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceReque
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
@ -117,17 +100,27 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalo
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListTableDescriptorsByNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListTableDescriptorsByNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListTableNamesByNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListTableNamesByNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionLocation;
@ -42,12 +43,14 @@ import com.google.protobuf.ServiceException;
*/
class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
private final MultiAction<R> multi;
private final boolean cellBlock;
MultiServerCallable(final HConnection connection, final TableName tableName,
final HRegionLocation location, final MultiAction<R> multi) {
super(connection, tableName, null);
this.multi = multi;
setLocation(location);
this.cellBlock = isCellBlock();
}
MultiAction<R> getMulti() {
@ -67,16 +70,21 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Row Mutations are a set of Puts and/or Deletes all to be applied atomically
// on the one row. We do these a row at a time.
if (row instanceof RowMutations) {
RowMutations rms = (RowMutations)row;
List<CellScannable> cells = null;
MultiRequest multiRequest;
try {
RowMutations rms = (RowMutations)row;
// Stick all Cells for all RowMutations in here into 'cells'. Populated when we call
// buildNoDataMultiRequest in the below.
List<CellScannable> cells = new ArrayList<CellScannable>(rms.getMutations().size());
// Build a multi request absent its Cell payload (this is the 'nodata' in the below).
MultiRequest multiRequest =
RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
// Carry the cells over the proxy/pb Service interface using the payload carrying
// rpc controller.
if (this.cellBlock) {
// Stick all Cells for all RowMutations in here into 'cells'. Populated when we call
// buildNoDataMultiRequest in the below.
cells = new ArrayList<CellScannable>(rms.getMutations().size());
// Build a multi request absent its Cell payload (this is the 'nodata' in the below).
multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
} else {
multiRequest = RequestConverter.buildMultiRequest(regionName, rms);
}
// Carry the cells if any over the proxy/pb Service interface using the payload
// carrying rpc controller.
getStub().multi(new PayloadCarryingRpcController(cells), multiRequest);
// This multi call does not return results.
response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
@ -91,14 +99,17 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
if (actions.size() > rowMutations) {
Exception ex = null;
List<Object> results = null;
// Stick all Cells for the multiRequest in here into 'cells'. Gets filled in when we
// call buildNoDataMultiRequest
List<CellScannable> cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
List<CellScannable> cells = null;
MultiRequest multiRequest;
try {
// The call to buildNoDataMultiRequest will skip RowMutations. They have
// already been handled above.
MultiRequest multiRequest =
RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
if (isCellBlock()) {
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above.
cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
} else {
multiRequest = RequestConverter.buildMultiRequest(regionName, actions);
}
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
@ -116,9 +127,25 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
return response;
}
/**
* @return True if we should send data in cellblocks. This is an expensive call. Cache the
* result if you can rather than call each time.
*/
private boolean isCellBlock() {
// This is not exact -- the configuration could have changed on us after connection was set up
// but it will do for now.
HConnection connection = getConnection();
if (connection == null) return true; // Default is to do cellblocks.
Configuration configuration = connection.getConfiguration();
if (configuration == null) return true;
String codec = configuration.get("hbase.client.rpc.codec", "");
return codec != null && codec.length() > 0;
}
@Override
public void prepare(boolean reload) throws IOException {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(getLocation().getServerName()));
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
@ -70,6 +71,11 @@ class IPCUtil {
ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
}
/**
* Thrown if a cellscanner but no codec to encode it with.
*/
public static class CellScannerButNoCodecException extends HBaseIOException {};
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
@ -86,6 +92,7 @@ class IPCUtil {
final CellScanner cellScanner)
throws IOException {
if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException();
int bufferSize = this.cellBlockBuildingInitialBufferSize;
if (cellScanner instanceof HeapSize) {
long longSize = ((HeapSize)cellScanner).heapSize();

View File

@ -55,7 +55,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
}
public PayloadCarryingRpcController(final List<CellScannable> cellIterables) {
this.cellScanner = CellUtil.createCellScanner(cellIterables);
this.cellScanner = cellIterables == null? null: CellUtil.createCellScanner(cellIterables);
}
/**

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.security.token.TokenSelector;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
@ -426,7 +427,9 @@ public class RpcClient {
if ((userInfoPB = getUserInfo(ticket)) != null) {
builder.setUserInfo(userInfoPB);
}
builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
if (this.codec != null) {
builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
}
if (this.compressor != null) {
builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
}
@ -1249,7 +1252,7 @@ public class RpcClient {
this.pingInterval = getPingInterval(conf);
this.ipcUtil = new IPCUtil(conf);
this.conf = conf;
this.codec = getCodec(conf);
this.codec = getCodec();
this.compressor = getCompressor(conf);
this.socketFactory = factory;
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
@ -1291,18 +1294,28 @@ public class RpcClient {
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf
* @return Codec to use on this client.
*/
private static Codec getCodec(final Configuration conf) {
String className = conf.get("hbase.client.rpc.codec", KeyValueCodec.class.getCanonicalName());
Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be the empty string AND
// "hbase.client.default.rpc.codec" -- because default is to do cell block encoding.
String className = conf.get("hbase.client.rpc.codec", getDefaultCodec(this.conf));
if (className == null || className.length() == 0) return null;
try {
return (Codec)Class.forName(className).newInstance();
return (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting codec " + className, e);
}
}
@VisibleForTesting
public static String getDefaultCodec(final Configuration c) {
// If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
// Configuration will complain -- then no default codec (and we'll pb everything). Else
// default is KeyValueCodec
return c.get("hbase.client.default.rpc.codec", KeyValueCodec.class.getCanonicalName());
}
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
@ -277,36 +276,44 @@ public final class ResponseConverter {
*/
public static Result[] getResults(CellScanner cellScanner, ScanResponse response)
throws IOException {
if (response == null || cellScanner == null) return null;
ResultCellMeta resultCellMeta = response.getResultCellMeta();
if (resultCellMeta == null) return null;
int noOfResults = resultCellMeta.getCellsLengthCount();
if (response == null) return null;
// If cellscanner, then the number of Results to return is the count of elements in the
// cellsPerResult list. Otherwise, it is how many results are embedded inside the response.
int noOfResults = cellScanner != null?
response.getCellsPerResultCount(): response.getResultsCount();
Result[] results = new Result[noOfResults];
for (int i = 0; i < noOfResults; i++) {
int noOfCells = resultCellMeta.getCellsLength(i);
List<Cell> cells = new ArrayList<Cell>(noOfCells);
for (int j = 0; j < noOfCells; j++) {
try {
if (cellScanner.advance() == false) {
// We are not able to retrieve the exact number of cells which ResultCellMeta says us.
if (cellScanner != null) {
// Cells are out in cellblocks. Group them up again as Results. How many to read at a
// time will be found in getCellsLength -- length here is how many Cells in the i'th Result
int noOfCells = response.getCellsPerResult(i);
List<Cell> cells = new ArrayList<Cell>(noOfCells);
for (int j = 0; j < noOfCells; j++) {
try {
if (cellScanner.advance() == false) {
// We are not able to retrieve the exact number of cells which ResultCellMeta says us.
// We have to scan for the same results again. Throwing DNRIOE as a client retry on the
// same scanner will result in OutOfOrderScannerNextException
String msg = "Results sent from server=" + noOfResults + ". But only got " + i
+ " results completely at client. Resetting the scanner to scan again.";
LOG.error(msg);
throw new DoNotRetryIOException(msg);
}
} catch (IOException ioe) {
// We are getting IOE while retrieving the cells for Results.
// We have to scan for the same results again. Throwing DNRIOE as a client retry on the
// same scanner will result in OutOfOrderScannerNextException
String msg = "Results sent from server=" + noOfResults + ". But only got " + i
+ " results completely at client. Resetting the scanner to scan again.";
LOG.error(msg);
throw new DoNotRetryIOException(msg);
}
} catch (IOException ioe) {
// We are getting IOE while retrieving the cells for Results.
// We have to scan for the same results again. Throwing DNRIOE as a client retry on the
// same scanner will result in OutOfOrderScannerNextException
LOG.error("Exception while reading cells from result."
LOG.error("Exception while reading cells from result."
+ "Resetting the scanner to scan again.", ioe);
throw new DoNotRetryIOException("Resetting the scanner.", ioe);
throw new DoNotRetryIOException("Resetting the scanner.", ioe);
}
cells.add(cellScanner.current());
}
cells.add(cellScanner.current());
results[i] = new Result(cells);
} else {
// Result is pure pb.
results[i] = ProtobufUtil.toResult(response.getResults(i));
}
results[i] = new Result(cells);
}
return results;
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.protobuf.RpcController;
@ -49,6 +51,7 @@ import com.google.protobuf.ServiceException;
* Test client behavior w/o setting up a cluster.
* Mock up cluster emissions.
*/
@Category(SmallTests.class)
public class TestClientNoCluster {
private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
private Configuration conf;

View File

@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.Cell;
* Typical usage:
*
* <pre>
* while (scanner.next()) {
* Cell cell = scanner.get();
* while (scanner.advance()) {
* Cell cell = scanner.current();
* // do something
* }
* </pre>

View File

@ -172,14 +172,17 @@ public final class CellUtil {
* @return CellScanner interface over <code>cellIterable</code>
*/
public static CellScanner createCellScanner(final Iterable<Cell> cellIterable) {
if (cellIterable == null) return null;
return createCellScanner(cellIterable.iterator());
}
/**
* @param cells
* @return CellScanner interface over <code>cellIterable</code>
* @return CellScanner interface over <code>cellIterable</code> or null if <code>cells</code> is
* null
*/
public static CellScanner createCellScanner(final Iterator<Cell> cells) {
if (cells == null) return null;
return new CellScanner() {
private final Iterator<Cell> iterator = cells;
private Cell current = null;

View File

@ -730,31 +730,31 @@ public final class RPCProtos {
com.google.protobuf.ByteString
getServiceNameBytes();
// optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
// optional string cell_block_codec_class = 3;
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
boolean hasCellBlockCodecClass();
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
java.lang.String getCellBlockCodecClass();
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
com.google.protobuf.ByteString
@ -766,7 +766,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
boolean hasCellBlockCompressorClass();
@ -775,7 +775,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
java.lang.String getCellBlockCompressorClass();
@ -784,7 +784,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
com.google.protobuf.ByteString
@ -978,26 +978,26 @@ public final class RPCProtos {
}
}
// optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
// optional string cell_block_codec_class = 3;
public static final int CELL_BLOCK_CODEC_CLASS_FIELD_NUMBER = 3;
private java.lang.Object cellBlockCodecClass_;
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public boolean hasCellBlockCodecClass() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public java.lang.String getCellBlockCodecClass() {
@ -1015,11 +1015,11 @@ public final class RPCProtos {
}
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public com.google.protobuf.ByteString
@ -1044,7 +1044,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public boolean hasCellBlockCompressorClass() {
@ -1055,7 +1055,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public java.lang.String getCellBlockCompressorClass() {
@ -1077,7 +1077,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public com.google.protobuf.ByteString
@ -1097,7 +1097,7 @@ public final class RPCProtos {
private void initFields() {
userInfo_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation.getDefaultInstance();
serviceName_ = "";
cellBlockCodecClass_ = "org.apache.hadoop.hbase.codec.KeyValueCodec";
cellBlockCodecClass_ = "";
cellBlockCompressorClass_ = "";
}
private byte memoizedIsInitialized = -1;
@ -1349,7 +1349,7 @@ public final class RPCProtos {
bitField0_ = (bitField0_ & ~0x00000001);
serviceName_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
cellBlockCodecClass_ = "org.apache.hadoop.hbase.codec.KeyValueCodec";
cellBlockCodecClass_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
cellBlockCompressorClass_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
@ -1659,25 +1659,25 @@ public final class RPCProtos {
return this;
}
// optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
private java.lang.Object cellBlockCodecClass_ = "org.apache.hadoop.hbase.codec.KeyValueCodec";
// optional string cell_block_codec_class = 3;
private java.lang.Object cellBlockCodecClass_ = "";
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public boolean hasCellBlockCodecClass() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public java.lang.String getCellBlockCodecClass() {
@ -1692,11 +1692,11 @@ public final class RPCProtos {
}
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public com.google.protobuf.ByteString
@ -1713,11 +1713,11 @@ public final class RPCProtos {
}
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public Builder setCellBlockCodecClass(
@ -1731,11 +1731,11 @@ public final class RPCProtos {
return this;
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public Builder clearCellBlockCodecClass() {
@ -1745,11 +1745,11 @@ public final class RPCProtos {
return this;
}
/**
* <code>optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];</code>
* <code>optional string cell_block_codec_class = 3;</code>
*
* <pre>
* Cell block codec we will use sending over optional cell blocks. Server throws exception
* if cannot deal.
* if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
* </pre>
*/
public Builder setCellBlockCodecClassBytes(
@ -1770,7 +1770,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public boolean hasCellBlockCompressorClass() {
@ -1781,7 +1781,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public java.lang.String getCellBlockCompressorClass() {
@ -1800,7 +1800,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public com.google.protobuf.ByteString
@ -1821,7 +1821,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public Builder setCellBlockCompressorClass(
@ -1839,7 +1839,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public Builder clearCellBlockCompressorClass() {
@ -1853,7 +1853,7 @@ public final class RPCProtos {
*
* <pre>
* Compressor we will use if cell block is compressed. Server will throw exception if not supported.
* Class must implement hadoop's CompressionCodec Interface
* Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
* </pre>
*/
public Builder setCellBlockCompressorClassBytes(
@ -6002,25 +6002,24 @@ public final class RPCProtos {
java.lang.String[] descriptorData = {
"\n\tRPC.proto\032\rTracing.proto\032\013hbase.proto\"" +
"<\n\017UserInformation\022\026\n\016effective_user\030\001 \002" +
"(\t\022\021\n\treal_user\030\002 \001(\t\"\277\001\n\020ConnectionHead" +
"(\t\022\021\n\treal_user\030\002 \001(\t\"\222\001\n\020ConnectionHead" +
"er\022#\n\tuser_info\030\001 \001(\0132\020.UserInformation\022" +
"\024\n\014service_name\030\002 \001(\t\022K\n\026cell_block_code" +
"c_class\030\003 \001(\t:+org.apache.hadoop.hbase.c" +
"odec.KeyValueCodec\022#\n\033cell_block_compres" +
"sor_class\030\004 \001(\t\"\037\n\rCellBlockMeta\022\016\n\006leng" +
"th\030\001 \001(\r\"|\n\021ExceptionResponse\022\034\n\024excepti" +
"on_class_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t",
"\022\020\n\010hostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_n" +
"ot_retry\030\005 \001(\010\"\254\001\n\rRequestHeader\022\017\n\007call" +
"_id\030\001 \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo" +
"\022\023\n\013method_name\030\003 \001(\t\022\025\n\rrequest_param\030\004" +
" \001(\010\022\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBloc" +
"kMeta\022\026\n\016effective_user\030\006 \001(\t\"q\n\016Respons" +
"eHeader\022\017\n\007call_id\030\001 \001(\r\022%\n\texception\030\002 " +
"\001(\0132\022.ExceptionResponse\022\'\n\017cell_block_me" +
"ta\030\003 \001(\0132\016.CellBlockMetaB<\n*org.apache.h" +
"adoop.hbase.protobuf.generatedB\tRPCProto",
"sH\001\240\001\001"
"\024\n\014service_name\030\002 \001(\t\022\036\n\026cell_block_code" +
"c_class\030\003 \001(\t\022#\n\033cell_block_compressor_c" +
"lass\030\004 \001(\t\"\037\n\rCellBlockMeta\022\016\n\006length\030\001 " +
"\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" +
"ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h" +
"ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re",
"try\030\005 \001(\010\"\254\001\n\rRequestHeader\022\017\n\007call_id\030\001" +
" \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" +
"ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" +
"\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" +
"\022\026\n\016effective_user\030\006 \001(\t\"q\n\016ResponseHead" +
"er\022\017\n\007call_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022" +
".ExceptionResponse\022\'\n\017cell_block_meta\030\003 " +
"\001(\0132\016.CellBlockMetaB<\n*org.apache.hadoop" +
".hbase.protobuf.generatedB\tRPCProtosH\001\240\001" +
"\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -263,14 +263,21 @@ message ScanRequest {
* be false. If it is not specified, it means there are more.
*/
message ScanResponse {
optional ResultCellMeta result_cell_meta = 1;
// This field is filled in if we are doing cellblocks. A cellblock is made up
// of all Cells serialized out as one cellblock BUT responses from a server
// have their Cells grouped by Result. So we can reconstitute the
// Results on the client-side, this field is a list of counts of Cells
// in each Result that makes up the response. For example, if this field
// has 3, 3, 3 in it, then we know that on the client, we are to make
// three Results each of three Cells each.
repeated uint32 cells_per_result = 1;
optional uint64 scanner_id = 2;
optional bool more_results = 3;
optional uint32 ttl = 4;
}
message ResultCellMeta {
repeated uint32 cells_length = 1;
// If cells are not carried in an accompanying cellblock, then they are pb'd here.
// This field is mutually exclusive with cells_per_result (since the Cells will
// be inside the pb'd Result)
repeated Result results = 5;
}
/**

View File

@ -81,10 +81,10 @@ message ConnectionHeader {
optional UserInformation user_info = 1;
optional string service_name = 2;
// Cell block codec we will use sending over optional cell blocks. Server throws exception
// if cannot deal.
optional string cell_block_codec_class = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
// if cannot deal. Null means no codec'ing going on so we are pb all the time (SLOW!!!)
optional string cell_block_codec_class = 3;
// Compressor we will use if cell block is compressed. Server will throw exception if not supported.
// Class must implement hadoop's CompressionCodec Interface
// Class must implement hadoop's CompressionCodec Interface. Can't compress if no codec.
optional string cell_block_compressor_class = 4;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;

View File

@ -25,4 +25,14 @@ public interface RpcCallContext extends Delayable {
* @throws CallerDisconnectedException
*/
void throwExceptionIfCallerDisconnected(String regionName) throws CallerDisconnectedException;
}
/**
* If the client connected and specified a codec to use, then we will use this codec making
* cellblocks to return. If the client did not specify a codec, we assume it does not support
* cellblocks and will return all content protobuf'd (though it makes our serving slower).
* We need to ask this question per call because a server could be hosting both clients that
* support cellblocks while fielding requests from clients that do not.
* @return True if the client supports cellblocks, else return all content in pb
*/
boolean isClientCellBlockSupport();
}

View File

@ -59,18 +59,16 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@ -113,12 +111,12 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.TraceScope;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.BlockingService;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Descriptors.MethodDescriptor;
@ -203,7 +201,6 @@ public class RpcServer implements RpcServerInterface {
protected final Configuration conf;
private int maxQueueLength;
private int maxQueueSize;
protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@ -443,6 +440,11 @@ public class RpcServer implements RpcServerInterface {
return this.delayReturnValue;
}
@Override
public boolean isClientCellBlockSupport() {
return this.connection != null && this.connection.codec != null;
}
@Override
public void throwExceptionIfCallerDisconnected(String regionName)
throws CallerDisconnectedException {
@ -1542,7 +1544,9 @@ public class RpcServer implements RpcServerInterface {
private void setupCellBlockCodecs(final ConnectionHeader header)
throws FatalConnectionException {
// TODO: Plug in other supported decoders.
if (!header.hasCellBlockCodecClass()) return;
String className = header.getCellBlockCodecClass();
if (className == null || className.length() == 0) return;
try {
this.codec = (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
@ -2335,9 +2339,10 @@ public class RpcServer implements RpcServerInterface {
}
/**
* Needed for delayed calls. We need to be able to store the current call
* so that we can complete it later.
* @return Call the server is currently handling.
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing
* call.
* @return An RpcCallConext backed by the currently ongoing call (gotten from a thread local)
*/
public static RpcCallContext getCurrentCall() {
return CurCall.get();

View File

@ -25,12 +25,10 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
@ -71,4 +69,4 @@ public interface RpcServerInterface {
*/
@VisibleForTesting
void refreshAuthManager(PolicyProvider pp);
}
}

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -104,6 +103,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -164,7 +164,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
@ -2957,8 +2956,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
* @throws ServiceException
*/
@Override
public ScanResponse scan(final RpcController controller,
final ScanRequest request) throws ServiceException {
public ScanResponse scan(final RpcController controller, final ScanRequest request)
throws ServiceException {
Leases.Lease lease = null;
String scannerName = null;
try {
@ -3021,7 +3020,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (!isLoadingCfsOnDemandSet) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
region.prepareScanner(scan);
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().preScannerOpen(scan);
@ -3128,16 +3127,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
moreResults = false;
results = null;
} else {
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
for (Result res : results) {
cellScannables.add(res);
rcmBuilder.addCellsLength(res.size());
}
builder.setResultCellMeta(rcmBuilder.build());
// TODO is this okey to assume the type and cast
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
.createCellScanner(cellScannables));
formatResults(builder, results, controller);
}
} finally {
// We're done. On way out re-add the above removed lease.
@ -3185,6 +3175,26 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
}
private void formatResults(final ScanResponse.Builder builder, final List<Result> results,
final RpcController controller) {
if (results == null || results.isEmpty()) return;
RpcCallContext context = RpcServer.getCurrentCall();
if (context != null && context.isClientCellBlockSupport()) {
List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
for (Result res : results) {
cellScannables.add(res);
builder.addCellsPerResult(res.size());
}
((PayloadCarryingRpcController)controller).
setCellScanner(CellUtil.createCellScanner(cellScannables));
} else {
for (Result res: results) {
ClientProtos.Result pbr = ProtobufUtil.toResult(res);
builder.addResults(pbr);
}
}
}
/**
* Atomically bulk load several HFiles into an open region
* @return true if successful, false is failed but recoverably (no action)

View File

@ -18,23 +18,12 @@
package org.apache.hadoop.hbase;
import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.Suite;
/**

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
@ -167,11 +166,9 @@ public class TestMetaReaderEditorNoCluster {
final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
cellScannables.add(new Result(kvs));
final ScanResponse.Builder builder = ScanResponse.newBuilder();
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
for (CellScannable result : cellScannables) {
metaBuilder.addCellsLength(((Result)result).size());
builder.addCellsPerResult(((Result)result).size());
}
builder.setResultCellMeta(metaBuilder.build());
Mockito.when(implementation.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
.thenThrow(new ServiceException("Server not running (1 of 3)"))
.thenThrow(new ServiceException("Server not running (2 of 3)"))

View File

@ -40,9 +40,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.ArrayUtils;
@ -51,7 +48,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -63,6 +59,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@ -94,7 +91,6 @@ import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.junit.After;

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Do some ops and prove that client and server can work w/o codecs; that we can pb all the time.
* Good for third-party clients or simple scripts that want to talk direct to hbase.
*/
@Category(MediumTests.class)
public class TestFromClientSideNoCodec {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Turn off codec use
TEST_UTIL.getConfiguration().set("hbase.client.default.rpc.codec", "");
TEST_UTIL.startMiniCluster(1);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testBasics() throws IOException {
final byte [] t = Bytes.toBytes("testBasics");
final byte [][] fs = new byte[][] {Bytes.toBytes("cf1"), Bytes.toBytes("cf2"),
Bytes.toBytes("cf3") };
HTable ht = TEST_UTIL.createTable(t, fs);
// Check put and get.
final byte [] row = Bytes.toBytes("row");
Put p = new Put(row);
for (byte [] f: fs) p.add(f, f, f);
ht.put(p);
Result r = ht.get(new Get(row));
int i = 0;
for (CellScanner cellScanner = r.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
byte [] f = fs[i++];
assertTrue(Bytes.toString(f),
Bytes.equals(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
f, 0, f.length));
}
// Check getRowOrBefore
byte [] f = fs[0];
r = ht.getRowOrBefore(row, f);
assertTrue(r.toString(), r.containsColumn(f, f));
// Check scan.
ResultScanner scanner = ht.getScanner(new Scan());
int count = 0;
while ((r = scanner.next()) != null) {
assertTrue(r.list().size() == 3);
count++;
}
assertTrue(count == 1);
}
@Test
public void testNoCodec() {
Configuration c = new Configuration();
c.set("hbase.client.default.rpc.codec", "");
String codec = RpcClient.getDefaultCodec(c);
assertTrue(codec == null || codec.length() == 0);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -64,7 +62,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
@Category(MediumTests.class)
public class TestRegionObserverScannerOpenHook {
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
static final Path DIR = UTIL.getDataTestDir();
@ -212,7 +210,6 @@ public class TestRegionObserverScannerOpenHook {
* region
*/
@Test
@Category(MediumTests.class)
public void testRegionObserverCompactionTimeStacking() throws Exception {
// setup a mini cluster so we can do a real compaction on a region
Configuration conf = UTIL.getConfiguration();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -113,13 +114,16 @@ public class TestIPC {
// an echo, just put them back on the controller creating a new block. Tests our block
// building.
CellScanner cellScanner = pcrc.cellScanner();
List<Cell> list = new ArrayList<Cell>();
try {
while(cellScanner.advance()) {
list.add(cellScanner.current());
List<Cell> list = null;
if (cellScanner != null) {
list = new ArrayList<Cell>();
try {
while(cellScanner.advance()) {
list.add(cellScanner.current());
}
} catch (IOException e) {
throw new ServiceException(e);
}
} catch (IOException e) {
throw new ServiceException(e);
}
cellScanner = CellUtil.createCellScanner(list);
((PayloadCarryingRpcController)controller).setCellScanner(cellScanner);
@ -148,6 +152,38 @@ public class TestIPC {
}
}
/**
* Ensure we do not HAVE TO HAVE a codec.
* @throws InterruptedException
* @throws IOException
*/
@Test
public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT) {
@Override
Codec getCodec() {
return null;
}
};
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r = client.call(md, param, null,
md.getOutputType().toProto(), User.getCurrent(), address, 0);
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
} finally {
client.stop();
rpcServer.stop();
}
}
/**
* It is hard to verify the compression is actually happening under the wraps. Hope that if
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually
@ -160,11 +196,14 @@ public class TestIPC {
@Test
public void testCompressCellBlock()
throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
// Currently, you set
Configuration conf = HBaseConfiguration.create();
Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
doSimpleTest(conf, new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT));
}
private void doSimpleTest(final Configuration conf, final RpcClient client)
throws InterruptedException, IOException {
TestRpcServer rpcServer = new TestRpcServer();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
List<Cell> cells = new ArrayList<Cell>();
int count = 3;
for (int i = 0; i < count; i++) cells.add(CELL);

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
@ -382,9 +381,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
long scannerId = request.getScannerId();
Result result = next(scannerId);
if (result != null) {
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
metaBuilder.addCellsLength(result.size());
builder.setResultCellMeta(metaBuilder.build());
builder.addCellsPerResult(result.size());
List<CellScannable> results = new ArrayList<CellScannable>(1);
results.add(result);
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
@ -606,9 +605,7 @@ public class TestAssignmentManager {
final ScanResponse.Builder builder = ScanResponse.newBuilder();
builder.setMoreResults(true);
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
metaBuilder.addCellsLength(r.size());
builder.setResultCellMeta(metaBuilder.build());
builder.addCellsPerResult(r.size());
final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
cellScannables.add(r);
Mockito.when(implementation.scan(
@ -1077,9 +1074,7 @@ public class TestAssignmentManager {
Result r = MetaMockingUtil.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
final ScanResponse.Builder builder = ScanResponse.newBuilder();
builder.setMoreResults(true);
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
metaBuilder.addCellsLength(r.size());
builder.setResultCellMeta(metaBuilder.build());
builder.addCellsPerResult(r.size());
final List<CellScannable> rows = new ArrayList<CellScannable>(1);
rows.add(r);
Answer<ScanResponse> ans = new Answer<ClientProtos.ScanResponse>() {

View File

@ -212,9 +212,18 @@
<section><title>CellBlock Codecs</title>
<para>To enable a codec other than the default <classname>KeyValueCodec</classname>,
set <varname>hbase.client.rpc.codec</varname>
to the name of the Codec to use. Codec must implement hbase's <classname>Codec</classname> Interface. After connection setup,
to the name of the Codec class to use. Codec must implement hbase's <classname>Codec</classname> Interface. After connection setup,
all passed cellblocks will be sent with this codec. The server will return cellblocks using this same codec as long
as the codec is on the servers' CLASSPATH (else you will get <classname>UnsupportedCellCodecException</classname>).</para>
<para>To change the default codec, set <varname>hbase.client.default.rpc.codec</varname>.
</para>
<para>To disable cellblocks completely and to go pure protobuf, set the default to the
empty String and do not specify a codec in your Configuration. So, set <varname>hbase.client.default.rpc.codec</varname>
to the empty string and do not set <varname>hbase.client.rpc.codec</varname>.
This will cause the client to connect to the server with no codec specified.
If a server sees no codec, it will return all responses in pure protobuf.
Running pure protobuf all the time will be slower than running with cellblocks.
</para>
</section>
<section><title>Compression</title>
<para>Uses hadoops compression codecs. To enable compressing of passed CellBlocks, set <varname>hbase.client.rpc.compressor</varname>