HBASE-8471 Server-side, remove convertion from pb type to client type before we call method
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1485396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d4b220114
commit
91148bcc2a
|
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.exceptions.UnknownSnapshotException;
|
|||
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
@ -572,10 +573,12 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true);
|
||||
Result[] values = null;
|
||||
// Get a batch at a time.
|
||||
ClientService.BlockingInterface server = connection.getClient(firstMetaServer.getServerName());
|
||||
ClientService.BlockingInterface server = connection.getClient(firstMetaServer
|
||||
.getServerName());
|
||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
||||
try {
|
||||
ScanResponse response = server.scan(null, request);
|
||||
values = ResponseConverter.getResults(response);
|
||||
ScanResponse response = server.scan(controller, request);
|
||||
values = ResponseConverter.getResults(controller.cellScanner(), response);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
|
|
|
@ -26,14 +26,17 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
@ -142,8 +145,9 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
incRPCcallsMetrics();
|
||||
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
|
||||
ScanResponse response = null;
|
||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
||||
try {
|
||||
response = stub.scan(null, request);
|
||||
response = stub.scan(controller, request);
|
||||
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
|
||||
// from client to server will increment this number in both sides. Client passes this
|
||||
// number along with the request and at RS side both the incoming nextCallSeq and its
|
||||
|
@ -155,7 +159,9 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
// See HBASE-5974
|
||||
nextCallSeq++;
|
||||
long timestamp = System.currentTimeMillis();
|
||||
rrs = ResponseConverter.getResults(response);
|
||||
// Results are returned via controller
|
||||
CellScanner cellScanner = controller.cellScanner();
|
||||
rrs = ResponseConverter.getResults(cellScanner, response);
|
||||
if (logScannerActivity) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (now - timestamp > logCutOffLatency) {
|
||||
|
@ -173,7 +179,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
updateResultsMetrics(response);
|
||||
updateResultsMetrics(rrs);
|
||||
} catch (IOException e) {
|
||||
if (logScannerActivity) {
|
||||
LOG.info("Got exception making request " + TextFormat.shortDebugString(request), e);
|
||||
|
@ -232,14 +238,19 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateResultsMetrics(ScanResponse response) {
|
||||
if (this.scanMetrics == null || !response.hasResultSizeBytes()) {
|
||||
private void updateResultsMetrics(Result[] rrs) {
|
||||
if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
|
||||
return;
|
||||
}
|
||||
long value = response.getResultSizeBytes();
|
||||
this.scanMetrics.countOfBytesInResults.addAndGet(value);
|
||||
long resultSize = 0;
|
||||
for (Result rr : rrs) {
|
||||
for (KeyValue kv : rr.raw()) {
|
||||
resultSize += kv.getLength();
|
||||
}
|
||||
}
|
||||
this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
|
||||
if (isRegionServerRemote) {
|
||||
this.scanMetrics.countOfBytesInRemoteResults.addAndGet(value);
|
||||
this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,11 +21,15 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.UserPermissionsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
|
@ -36,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRespo
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
|
||||
|
@ -54,28 +59,13 @@ import com.google.protobuf.RpcController;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ResponseConverter {
|
||||
public static final Log LOG = LogFactory.getLog(ResponseConverter.class);
|
||||
|
||||
private ResponseConverter() {
|
||||
}
|
||||
|
||||
// Start utilities for Client
|
||||
|
||||
/**
|
||||
* Get the client Results from a protocol buffer ScanResponse
|
||||
*
|
||||
* @param response the protocol buffer ScanResponse
|
||||
* @return the client Results in the response
|
||||
*/
|
||||
public static Result[] getResults(final ScanResponse response) {
|
||||
if (response == null) return null;
|
||||
int count = response.getResultCount();
|
||||
Result[] results = new Result[count];
|
||||
for (int i = 0; i < count; i++) {
|
||||
results[i] = ProtobufUtil.toResult(response.getResult(i));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the results from a protocol buffer MultiResponse
|
||||
*
|
||||
|
@ -278,4 +268,46 @@ public final class ResponseConverter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Results from the cells using the cells meta data.
|
||||
* @param cellScanner
|
||||
* @param response
|
||||
* @return results
|
||||
*/
|
||||
public static Result[] getResults(CellScanner cellScanner, ScanResponse response)
|
||||
throws IOException {
|
||||
if (response == null || cellScanner == null) return null;
|
||||
ResultCellMeta resultCellMeta = response.getResultCellMeta();
|
||||
if (resultCellMeta == null) return null;
|
||||
int noOfResults = resultCellMeta.getCellsLengthCount();
|
||||
Result[] results = new Result[noOfResults];
|
||||
for (int i = 0; i < noOfResults; i++) {
|
||||
int noOfCells = resultCellMeta.getCellsLength(i);
|
||||
List<Cell> cells = new ArrayList<Cell>(noOfCells);
|
||||
for (int j = 0; j < noOfCells; j++) {
|
||||
try {
|
||||
if (cellScanner.advance() == false) {
|
||||
// We are not able to retrieve the exact number of cells which ResultCellMeta says us.
|
||||
// We have to scan for the same results again. Throwing DNRIOE as a client retry on the
|
||||
// same scanner will result in OutOfOrderScannerNextException
|
||||
String msg = "Results sent from server=" + noOfResults + ". But only got " + i
|
||||
+ " results completely at client. Resetting the scanner to scan again.";
|
||||
LOG.error(msg);
|
||||
throw new DoNotRetryIOException(msg);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// We are getting IOE while retrieving the cells for Results.
|
||||
// We have to scan for the same results again. Throwing DNRIOE as a client retry on the
|
||||
// same scanner will result in OutOfOrderScannerNextException
|
||||
LOG.error("Exception while reading cells from result."
|
||||
+ "Resetting the scanner to scan again.", ioe);
|
||||
throw new DoNotRetryIOException("Resetting the scanner.", ioe);
|
||||
}
|
||||
cells.add(cellScanner.current());
|
||||
}
|
||||
results[i] = new Result(cells);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -260,11 +260,14 @@ message ScanRequest {
|
|||
* be false. If it is not specified, it means there are more.
|
||||
*/
|
||||
message ScanResponse {
|
||||
repeated Result result = 1;
|
||||
optional ResultCellMeta resultCellMeta = 1;
|
||||
optional uint64 scannerId = 2;
|
||||
optional bool moreResults = 3;
|
||||
optional uint32 ttl = 4;
|
||||
optional uint64 resultSizeBytes = 5;
|
||||
}
|
||||
|
||||
message ResultCellMeta {
|
||||
repeated uint32 cellsLength = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
|
|||
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
|
||||
|
@ -165,6 +164,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
|
@ -2947,7 +2947,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
RegionScannerHolder rsh = null;
|
||||
boolean moreResults = true;
|
||||
boolean closeScanner = false;
|
||||
Long resultsWireSize = null;
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
if (request.hasCloseScanner()) {
|
||||
closeScanner = request.getCloseScanner();
|
||||
|
@ -2974,7 +2973,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
resultsWireSize = (hasMetrics != null && Bytes.toBoolean(hasMetrics)) ? 0L : null;
|
||||
region.prepareScanner(scan);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
|
@ -3081,18 +3079,16 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
moreResults = false;
|
||||
results = null;
|
||||
} else {
|
||||
for (Result result: results) {
|
||||
if (result != null) {
|
||||
ClientProtos.Result pbResult = ProtobufUtil.toResult(result);
|
||||
if (resultsWireSize != null) {
|
||||
resultsWireSize += pbResult.getSerializedSize();
|
||||
}
|
||||
builder.addResult(pbResult);
|
||||
}
|
||||
}
|
||||
if (resultsWireSize != null) {
|
||||
builder.setResultSizeBytes(resultsWireSize.longValue());
|
||||
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
|
||||
List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
|
||||
for (Result res : results) {
|
||||
cellScannables.add(res);
|
||||
rcmBuilder.addCellsLength(res.size());
|
||||
}
|
||||
builder.setResultCellMeta(rcmBuilder.build());
|
||||
// TODO is this okey to assume the type and cast
|
||||
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
||||
.createCellScanner(cellScannables));
|
||||
}
|
||||
} finally {
|
||||
// We're done. On way out re-add the above removed lease.
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.util.NavigableMap;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -41,8 +43,10 @@ import org.apache.hadoop.hbase.client.HConnection;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -52,6 +56,8 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -158,20 +164,26 @@ public class TestMetaReaderEditorNoCluster {
|
|||
kvs.add(new KeyValue(rowToVerify,
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
final Result [] results = new Result [] {new Result(kvs)};
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
for (Result result: results) {
|
||||
builder.addResult(ProtobufUtil.toResult(result));
|
||||
final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
|
||||
cellScannables.add(new Result(kvs));
|
||||
final ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
|
||||
for (CellScannable result : cellScannables) {
|
||||
metaBuilder.addCellsLength(((Result)result).size());
|
||||
}
|
||||
Mockito.when(implementation.scan(
|
||||
(RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
|
||||
thenThrow(new ServiceException("Server not running (1 of 3)")).
|
||||
thenThrow(new ServiceException("Server not running (2 of 3)")).
|
||||
thenThrow(new ServiceException("Server not running (3 of 3)")).
|
||||
thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
|
||||
.thenReturn(builder.build()).thenReturn(
|
||||
ScanResponse.newBuilder().setMoreResults(false).build());
|
||||
|
||||
builder.setResultCellMeta(metaBuilder.build());
|
||||
Mockito.when(implementation.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
|
||||
.thenThrow(new ServiceException("Server not running (1 of 3)"))
|
||||
.thenThrow(new ServiceException("Server not running (2 of 3)"))
|
||||
.thenThrow(new ServiceException("Server not running (3 of 3)"))
|
||||
.thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
|
||||
.thenAnswer(new Answer<ScanResponse>() {
|
||||
public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
|
||||
.createCellScanner(cellScannables));
|
||||
return builder.build();
|
||||
}
|
||||
}).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
|
||||
// Associate a spied-upon HConnection with UTIL.getConfiguration. Need
|
||||
// to shove this in here first so it gets picked up all over; e.g. by
|
||||
// HTable.
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -27,6 +28,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
|
@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
|
@ -76,6 +80,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
|
@ -373,7 +378,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
long scannerId = request.getScannerId();
|
||||
Result result = next(scannerId);
|
||||
if (result != null) {
|
||||
builder.addResult(ProtobufUtil.toResult(result));
|
||||
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
|
||||
metaBuilder.addCellsLength(result.size());
|
||||
builder.setResultCellMeta(metaBuilder.build());
|
||||
List<CellScannable> results = new ArrayList<CellScannable>(1);
|
||||
results.add(result);
|
||||
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
||||
.createCellScanner(results));
|
||||
builder.setMoreResults(true);
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
|
|||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
|
||||
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
|
||||
|
@ -60,6 +63,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
|
||||
|
@ -81,6 +85,8 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -594,12 +600,25 @@ public class TestAssignmentManager {
|
|||
r = MetaMockingUtil.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
}
|
||||
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
final ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
builder.setMoreResults(true);
|
||||
builder.addResult(ProtobufUtil.toResult(r));
|
||||
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
|
||||
metaBuilder.addCellsLength(r.size());
|
||||
builder.setResultCellMeta(metaBuilder.build());
|
||||
final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
|
||||
cellScannables.add(r);
|
||||
Mockito.when(implementation.scan(
|
||||
(RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
|
||||
thenReturn(builder.build());
|
||||
thenAnswer(new Answer<ScanResponse>() {
|
||||
public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
PayloadCarryingRpcController controller = (PayloadCarryingRpcController) invocation
|
||||
.getArguments()[0];
|
||||
if (controller != null) {
|
||||
controller.setCellScanner(CellUtil.createCellScanner(cellScannables));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
});
|
||||
|
||||
// Get a connection w/ mocked up common methods.
|
||||
HConnection connection =
|
||||
|
@ -1052,17 +1071,30 @@ public class TestAssignmentManager {
|
|||
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A for REGIONINFO
|
||||
Result r = MetaMockingUtil.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
final ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
builder.setMoreResults(true);
|
||||
builder.addResult(ProtobufUtil.toResult(r));
|
||||
ResultCellMeta.Builder metaBuilder = ResultCellMeta.newBuilder();
|
||||
metaBuilder.addCellsLength(r.size());
|
||||
builder.setResultCellMeta(metaBuilder.build());
|
||||
final List<CellScannable> rows = new ArrayList<CellScannable>(1);
|
||||
rows.add(r);
|
||||
Answer<ScanResponse> ans = new Answer<ClientProtos.ScanResponse>() {
|
||||
public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
PayloadCarryingRpcController controller = (PayloadCarryingRpcController) invocation
|
||||
.getArguments()[0];
|
||||
if (controller != null) {
|
||||
controller.setCellScanner(CellUtil.createCellScanner(rows));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
};
|
||||
if (enabling) {
|
||||
Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
|
||||
.thenReturn(builder.build()).thenReturn(builder.build()).thenReturn(builder.build())
|
||||
.thenReturn(builder.build()).thenReturn(builder.build())
|
||||
.thenAnswer(ans).thenAnswer(ans).thenAnswer(ans).thenAnswer(ans).thenAnswer(ans)
|
||||
.thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
|
||||
} else {
|
||||
Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any())).thenReturn(
|
||||
builder.build());
|
||||
Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any())).thenAnswer(
|
||||
ans);
|
||||
}
|
||||
// If a get, return the above result too for REGIONINFO
|
||||
GetResponse.Builder getBuilder = GetResponse.newBuilder();
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -156,7 +157,7 @@ public class TestEndToEndSplitTransaction {
|
|||
ScanRequest scanRequest = RequestConverter.buildScanRequest(
|
||||
regionName, new Scan(row), 1, true);
|
||||
try {
|
||||
server.scan(null, scanRequest);
|
||||
server.scan(new PayloadCarryingRpcController(), scanRequest);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue