HBASE-9907 Rig to fake a cluster so can profile client behaviors

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1541703 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-11-13 20:49:59 +00:00
parent c5a3048fec
commit ffd91d7e90
10 changed files with 615 additions and 79 deletions

View File

@ -641,8 +641,7 @@ class AsyncProcess<CResult> {
private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
List<Action<Row>> toReplay, int numAttempt, int failureCount,
Throwable throwable,
HConnectionManager.ServerErrorTracker errorsByServer){
HConnectionManager.ServerErrorTracker errorsByServer) {
if (toReplay.isEmpty()) {
// it's either a success or a last failure
if (failureCount != 0) {
@ -789,22 +788,22 @@ class AsyncProcess<CResult> {
StringBuilder sb = new StringBuilder();
sb.append("#").append(id).append(", table=").append(tableName).
append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
if (failureCount > 0 || error != null){
sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: ").
append(error == null ? "null" : error.getMessage());
}else {
sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
append(error == null ? "null" : error);
} else {
sb.append("SUCCEEDED");
}
sb.append(" on server ").append(sn);
sb.append(" on ").append(sn);
sb.append(", tracking started at ").append(startTime);
sb.append(", tracking started ").append(startTime);
if (willRetry) {
sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
append(", will replay ").append(replaySize).append(" ops.");
sb.append(", retrying after ").append(backOffTime).append(" ms").
append(", replay ").append(replaySize).append(" ops.");
} else if (failureCount > 0) {
sb.append(" - FAILED, NOT RETRYING ANYMORE");
}

View File

@ -41,10 +41,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
/**
* A cluster connection. Knows how to find the master, locate regions out on the cluster,
* keeps a cache of locations and then knows how to re-calibrate after they move.
* {@link HConnectionManager} manages instances of this class. This is NOT a connection to a
* particular server but to all servers in the cluster. Individual connections are managed at a
* lower level.
* keeps a cache of locations and then knows how to re-calibrate after they move. You need one
* of these to talk to your HBase cluster. {@link HConnectionManager} manages instances of this
* class. See it for how to get one of these.
*
* <p>This is NOT a connection to a particular server but to ALL servers in the cluster. Individual
* connections are managed at a lower level.
*
* <p>HConnections are used by {@link HTable} mostly but also by
* {@link HBaseAdmin}, and {@link CatalogTracker}. HConnection instances can be shared. Sharing

View File

@ -71,7 +71,10 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* <p>Used to communicate with a single HBase table.
* <p>Used to communicate with a single HBase table. An implementation of
* {@link HTableInterface}. Instances of this class can be constructed directly but it is
* encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
* See {@link HConnectionManager} class comment for an example.
*
* <p>This class is not thread safe for reads nor write.
*
@ -336,7 +339,7 @@ public class HTable implements HTableInterface {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
ap = new AsyncProcess<Object>(connection, tableName, pool, null,
ap = new AsyncProcess<Object>(connection, tableName, pool, null,
configuration, rpcCallerFactory);
this.maxKeyValueSize = this.configuration.getInt(
@ -1070,7 +1073,7 @@ public class HTable implements HTableInterface {
throw new IOException(
"Invalid arguments to incrementColumnValue", npe);
}
RegionServerCallable<Long> callable =
new RegionServerCallable<Long>(connection, getName(), row) {
public Long call() throws IOException {
@ -1525,7 +1528,7 @@ public class HTable implements HTableInterface {
@Override
public String toString() {
return tableName + ", " + connection;
return tableName + ";" + connection;
}
/**
@ -1541,4 +1544,4 @@ public class HTable implements HTableInterface {
t.close();
}
}
}
}

View File

@ -37,6 +37,7 @@ import java.util.Map;
/**
* Used to communicate with a single HBase table.
* Obtain an instance from an {@ink HConnection}.
*
* @since 0.21.0
*/

View File

@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ServiceException;
@ -65,20 +66,29 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
List<CellScannable> cells = null;
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action<R>> actions = e.getValue();
RegionAction.Builder regionActionBuilder;
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
if (this.cellBlock) {
// Presize. Presume at least a KV per Action. There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
regionActionBuilder, actionBuilder, mutationBuilder);
} else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
regionActionBuilder, actionBuilder, mutationBuilder);
}
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
@ -118,4 +128,4 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Use the location we were given in the constructor rather than go look it up.
setStub(getConnection().getClient(getLocation().getServerName()));
}
}
}

View File

@ -988,8 +988,8 @@ public final class ProtobufUtil {
* @param increment
* @return the converted mutate
*/
public static MutationProto toMutation(final Increment increment) {
MutationProto.Builder builder = MutationProto.newBuilder();
public static MutationProto toMutation(final Increment increment,
final MutationProto.Builder builder) {
builder.setRow(ZeroCopyLiteralByteString.wrap(increment.getRow()));
builder.setMutateType(MutationType.INCREMENT);
builder.setDurability(toDurability(increment.getDurability()));
@ -1045,12 +1045,18 @@ public final class ProtobufUtil {
*/
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
return toMutation(type, mutation, MutationProto.newBuilder());
}
public static MutationProto toMutation(final MutationType type, final Mutation mutation,
MutationProto.Builder builder)
throws IOException {
builder = getMutationBuilderAndSetCommonFields(type, mutation, builder);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
columnBuilder.clear();
columnBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family.getKey()));
columnBuilder.clearQualifierValue();
for (Cell cell: family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
valueBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(
@ -1080,9 +1086,10 @@ public final class ProtobufUtil {
* @return a protobuf'd Mutation
* @throws IOException
*/
public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation)
public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation,
final MutationProto.Builder builder)
throws IOException {
MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
getMutationBuilderAndSetCommonFields(type, mutation, builder);
builder.setAssociatedCellCount(mutation.size());
return builder.build();
}
@ -1095,8 +1102,7 @@ public final class ProtobufUtil {
* @return A partly-filled out protobuf'd Mutation.
*/
private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type,
final Mutation mutation) {
MutationProto.Builder builder = MutationProto.newBuilder();
final Mutation mutation, MutationProto.Builder builder) {
builder.setRow(ZeroCopyLiteralByteString.wrap(mutation.getRow()));
builder.setMutateType(type);
builder.setDurability(toDurability(mutation.getDurability()));
@ -2254,15 +2260,16 @@ public final class ProtobufUtil {
// Doing this is going to kill us if we do it for all data passed.
// St.Ack 20121205
CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder();
kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(),
kvbuilder.setRow(ZeroCopyLiteralByteString.wrap(kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength()));
kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(),
kvbuilder.setFamily(ZeroCopyLiteralByteString.wrap(kv.getFamilyArray(),
kv.getFamilyOffset(), kv.getFamilyLength()));
kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(),
kvbuilder.setQualifier(ZeroCopyLiteralByteString.wrap(kv.getQualifierArray(),
kv.getQualifierOffset(), kv.getQualifierLength()));
kvbuilder.setCellType(CellProtos.CellType.valueOf(kv.getTypeByte()));
kvbuilder.setTimestamp(kv.getTimestamp());
kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
kvbuilder.setValue(ZeroCopyLiteralByteString.wrap(kv.getValueArray(), kv.getValueOffset(),
kv.getValueLength()));
return kvbuilder.build();
}

View File

@ -218,7 +218,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
builder.setCondition(condition);
return builder.build();
}
@ -246,7 +246,8 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
MutationProto.newBuilder()));
builder.setCondition(condition);
return builder.build();
}
@ -265,7 +266,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
return builder.build();
}
@ -283,7 +284,8 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append));
builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append,
MutationProto.newBuilder()));
return builder.build();
}
@ -300,7 +302,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutation(ProtobufUtil.toMutation(increment));
builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
return builder.build();
}
@ -318,7 +320,8 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
MutationProto.newBuilder()));
return builder.build();
}
@ -334,7 +337,10 @@ public final class RequestConverter {
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
@ -345,8 +351,11 @@ public final class RequestConverter {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}
@ -363,9 +372,11 @@ public final class RequestConverter {
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final RowMutations rowMutations, final List<CellScannable> cells)
final RowMutations rowMutations, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder)
throws IOException {
RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null;
if (mutation instanceof Put) {
@ -376,18 +387,20 @@ public final class RequestConverter {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
cells.add(mutation);
builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
return builder;
return regionActionBuilder;
}
private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
RegionAction.Builder builder = RegionAction.newBuilder();
private static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
return builder;
regionActionBuilder.setRegion(region);
return regionActionBuilder;
}
/**
@ -484,36 +497,37 @@ public final class RequestConverter {
* @throws IOException
*/
public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
final List<Action<R>> actions)
final List<Action<R>> actions, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder)
throws IOException {
RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
for (Action<R> action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());
mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
} else if (row instanceof Delete) {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
} else if (row instanceof Append) {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row, mutationBuilder)));
} else if (row instanceof Increment) {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation((Increment)row)));
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation((Increment)row, mutationBuilder)));
} else if (row instanceof RowMutations) {
throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
return builder;
return regionActionBuilder;
}
/**
@ -533,14 +547,18 @@ public final class RequestConverter {
* @throws IOException
*/
public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final List<Action<R>> actions, final List<CellScannable> cells)
final List<Action<R>> actions, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder)
throws IOException {
RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
for (Action<R> action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());
mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
@ -548,7 +566,7 @@ public final class RequestConverter {
Put p = (Put)row;
cells.add(p);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)));
setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder)));
} else if (row instanceof Delete) {
Delete d = (Delete)row;
int size = d.size();
@ -560,21 +578,21 @@ public final class RequestConverter {
if (size > 0) {
cells.add(d);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)));
setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder)));
} else {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)));
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder)));
}
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)));
setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a, mutationBuilder)));
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)));
setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i, mutationBuilder)));
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {

View File

@ -22,41 +22,79 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.ZeroCopyLiteralByteString;
/**
* Test client behavior w/o setting up a cluster.
* Mock up cluster emissions.
*/
@Category(SmallTests.class)
public class TestClientNoCluster {
public class TestClientNoCluster extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
private Configuration conf;
public static final ServerName META_SERVERNAME =
new ServerName("meta.example.org", 60010, 12345);
@Before
public void setUp() throws Exception {
@ -71,7 +109,7 @@ public class TestClientNoCluster {
* Simple cluster registry inserted in place of our usual zookeeper based one.
*/
static class SimpleRegistry implements Registry {
final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345);
final ServerName META_HOST = META_SERVERNAME;
@Override
public void init(HConnection connection) {
@ -301,4 +339,456 @@ public class TestClientNoCluster {
return this.stub;
}
}
/**
* Fake many regionservers and many regions on a connection implementation.
*/
static class ManyServersManyRegionsConnection
extends HConnectionManager.HConnectionImplementation {
// All access should be synchronized
final Map<ServerName, ClientService.BlockingInterface> serversByClient;
/**
* Map of faked-up rows of a 'meta table'.
*/
final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
final AtomicLong sequenceids = new AtomicLong(0);
private final Configuration conf;
ManyServersManyRegionsConnection(Configuration conf, boolean managed,
ExecutorService pool, User user)
throws IOException {
super(conf, managed, pool, user);
int serverCount = conf.getInt("hbase.test.servers", 10);
this.serversByClient =
new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
this.meta = makeMeta(Bytes.toBytes(
conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
conf.getInt("hbase.test.regions", 100),
conf.getLong("hbase.test.namespace.span", 1000),
serverCount);
this.conf = conf;
}
@Override
public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
// if (!sn.toString().startsWith("meta")) LOG.info(sn);
ClientService.BlockingInterface stub = null;
synchronized (this.serversByClient) {
stub = this.serversByClient.get(sn);
if (stub == null) {
stub = new FakeServer(this.conf, meta, sequenceids);
this.serversByClient.put(sn, stub);
}
}
return stub;
}
}
static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
final AtomicLong sequenceids, final MultiRequest request) {
// Make a response to match the request. Act like there were no failures.
ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
// Per Region.
RegionActionResult.Builder regionActionResultBuilder =
RegionActionResult.newBuilder();
ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
for (RegionAction regionAction: request.getRegionActionList()) {
regionActionResultBuilder.clear();
// Per Action in a Region.
for (ClientProtos.Action action: regionAction.getActionList()) {
roeBuilder.clear();
// Return empty Result and proper index as result.
roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
roeBuilder.setIndex(action.getIndex());
regionActionResultBuilder.addResultOrException(roeBuilder.build());
}
builder.addRegionActionResult(regionActionResultBuilder.build());
}
return builder.build();
}
/**
* Fake 'server'.
* Implements the ClientService responding as though it were a 'server' (presumes a new
* ClientService.BlockingInterface made per server).
*/
static class FakeServer implements ClientService.BlockingInterface {
private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
private final AtomicLong sequenceids;
private final long multiPause;
private final int tooManyMultiRequests;
FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
final AtomicLong sequenceids) {
this.meta = meta;
this.sequenceids = sequenceids;
// Pause to simulate the server taking time applying the edits. This will drive up the
// number of threads used over in client.
this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
}
@Override
public GetResponse get(RpcController controller, GetRequest request)
throws ServiceException {
boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
request.getRegion().getType());
if (!metaRegion) throw new UnsupportedOperationException();
return doMetaGetResponse(meta, request);
}
@Override
public MutateResponse mutate(RpcController controller,
MutateRequest request) throws ServiceException {
throw new NotImplementedException();
}
@Override
public ScanResponse scan(RpcController controller,
ScanRequest request) throws ServiceException {
// Presume it is a scan of meta for now. Not all scans provide a region spec expecting
// the server to keep reference by scannerid. TODO.
return doMetaScanResponse(meta, sequenceids, request);
}
@Override
public BulkLoadHFileResponse bulkLoadHFile(
RpcController controller, BulkLoadHFileRequest request)
throws ServiceException {
throw new NotImplementedException();
}
@Override
public CoprocessorServiceResponse execService(
RpcController controller, CoprocessorServiceRequest request)
throws ServiceException {
throw new NotImplementedException();
}
@Override
public MultiResponse multi(RpcController controller, MultiRequest request)
throws ServiceException {
int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
try {
if (concurrentInvocations >= tooManyMultiRequests) {
throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
concurrentInvocations));
}
Threads.sleep(multiPause);
return doMultiResponse(meta, sequenceids, request);
} finally {
this.multiInvocationsCount.decrementAndGet();
}
}
}
static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
final AtomicLong sequenceids, final ScanRequest request) {
ScanResponse.Builder builder = ScanResponse.newBuilder();
int max = request.getNumberOfRows();
int count = 0;
Map<byte [], Pair<HRegionInfo, ServerName>> tail =
request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
// Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
if (max <= 0) break;
if (++count > max) break;
HRegionInfo hri = e.getValue().getFirst();
ByteString row = ZeroCopyLiteralByteString.wrap(hri.getRegionName());
resultBuilder.clear();
resultBuilder.addCell(getRegionInfo(row, hri));
resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
resultBuilder.addCell(getStartCode(row));
builder.addResults(resultBuilder.build());
// Set more to false if we are on the last region in table.
if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
else builder.setMoreResults(true);
}
// If no scannerid, set one.
builder.setScannerId(request.hasScannerId()?
request.getScannerId(): sequenceids.incrementAndGet());
return builder.build();
}
static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
final GetRequest request) {
ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
ByteString row = request.getGet().getRow();
Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
if (p == null) {
if (request.getGet().getClosestRowBefore()) {
byte [] bytes = row.toByteArray();
SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
bytes != null? meta.headMap(bytes): meta;
p = head == null? null: head.get(head.lastKey());
}
}
if (p != null) {
resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
resultBuilder.addCell(getServer(row, p.getSecond()));
}
resultBuilder.addCell(getStartCode(row));
GetResponse.Builder builder = GetResponse.newBuilder();
builder.setResult(resultBuilder.build());
return builder.build();
}
/**
* @param name region name or encoded region name.
* @param type
* @return True if we are dealing with a hbase:meta region.
*/
static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
switch (type) {
case REGION_NAME:
return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
case ENCODED_REGION_NAME:
return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
default: throw new UnsupportedOperationException();
}
}
private final static ByteString CATALOG_FAMILY_BYTESTRING =
ZeroCopyLiteralByteString.wrap(HConstants.CATALOG_FAMILY);
private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
ZeroCopyLiteralByteString.wrap(HConstants.REGIONINFO_QUALIFIER);
private final static ByteString SERVER_QUALIFIER_BYTESTRING =
ZeroCopyLiteralByteString.wrap(HConstants.SERVER_QUALIFIER);
static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
cellBuilder.setRow(row);
cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
cellBuilder.setTimestamp(System.currentTimeMillis());
return cellBuilder;
}
static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(hri.toByteArray()));
return cellBuilder.build();
}
static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
return cellBuilder.build();
}
static CellProtos.Cell getStartCode(final ByteString row) {
CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
cellBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(HConstants.STARTCODE_QUALIFIER));
// TODO:
cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
return cellBuilder.build();
}
private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
/**
* Format passed integer. Zero-pad.
* Copied from hbase-server PE class and small amendment. Make them share.
* @param number
* @return Returns zero-prefixed 10-byte wide decimal version of passed
* number (Does absolute in case number is negative).
*/
private static byte [] format(final long number) {
byte [] b = new byte[10];
long d = number;
for (int i = b.length - 1; i >= 0; i--) {
b[i] = (byte)((d % 10) + '0');
d /= 10;
}
return b;
}
/**
* @param count
* @param namespaceSpan
* @return <code>count</code> regions
*/
private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
final long namespaceSpan) {
byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
long interval = namespaceSpan / count;
HRegionInfo [] hris = new HRegionInfo[count];
for (int i = 0; i < count; i++) {
if (i == 0) {
endKey = format(interval);
} else {
startKey = endKey;
if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
else endKey = format((i + 1) * interval);
}
hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
}
return hris;
}
/**
* @param count
* @return Return <code>count</code> servernames.
*/
private static ServerName [] makeServerNames(final int count) {
ServerName [] sns = new ServerName[count];
for (int i = 0; i < count; i++) {
sns[i] = new ServerName("" + i + ".example.org", 60010, i);
}
return sns;
}
/**
* Comparator for meta row keys.
*/
private static class MetaRowsComparator implements Comparator<byte []> {
private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
@Override
public int compare(byte[] left, byte[] right) {
return delegate.compareRows(left, 0, left.length, right, 0, right.length);
}
}
/**
* Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
* ServerName to return for this row.
* @param hris
* @param serverNames
* @return Map with faked hbase:meta content in it.
*/
static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
final int regionCount, final long namespaceSpan, final int serverCount) {
// I need a comparator for meta rows so we sort properly.
SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
ServerName [] serverNames = makeServerNames(serverCount);
int per = regionCount / serverCount;
int count = 0;
for (HRegionInfo hri: hris) {
Pair<HRegionInfo, ServerName> p =
new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
meta.put(hri.getRegionName(), p);
}
return meta;
}
/**
* Code for each 'client' to run.
* @param c
* @param sharedConnection
* @throws IOException
*/
static void cycle(final Configuration c, final HConnection sharedConnection) throws IOException {
HTableInterface table = sharedConnection.getTable(BIG_USER_TABLE);
table.setAutoFlushTo(false);
long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
long startTime = System.currentTimeMillis();
final int printInterval = 100000;
try {
Stopwatch stopWatch = new Stopwatch();
stopWatch.start();
for (int i = 0; i < namespaceSpan; i++) {
byte [] b = format(i);
Put p = new Put(b);
p.add(HConstants.CATALOG_FAMILY, b, b);
if (i % printInterval == 0) {
LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
stopWatch.reset();
stopWatch.start();
}
table.put(p);
}
LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
(System.currentTimeMillis() - startTime) + "ms");
} finally {
table.close();
}
}
@Override
public int run(String[] arg0) throws Exception {
int errCode = 0;
// TODO: Make command options.
// How many servers to fake.
final int servers = 1;
// How many regions to put on the faked servers.
final int regions = 100000;
// How many 'keys' in the faked regions.
final long namespaceSpan = 1000000;
// How long to take to pause after doing a put; make this long if you want to fake a struggling
// server.
final long multiPause = 0;
// Check args make basic sense.
if ((namespaceSpan < regions) || (regions < servers)) {
throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
regions + " which must be > servers=" + servers);
}
// Set my many servers and many regions faking connection in place.
getConf().set("hbase.client.connection.impl",
ManyServersManyRegionsConnection.class.getName());
// Use simple kv registry rather than zk
getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
// When to report fails. Default is we report the 10th. This means we'll see log everytime
// an exception is thrown -- usually RegionTooBusyException when we have more than
// hbase.test.multi.too.many requests outstanding at any time.
getConf().setInt("hbase.client.start.log.errors.counter", 0);
// Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
getConf().setInt("hbase.test.regions", regions);
getConf().setLong("hbase.test.namespace.span", namespaceSpan);
getConf().setLong("hbase.test.servers", servers);
getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
// Let there be ten outstanding requests at a time before we throw RegionBusyException.
getConf().setInt("hbase.test.multi.too.many", 10);
final int clients = 20;
// Have them all share the same connection so they all share the same instance of
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
// Share a connection so I can keep counts in the 'server' on concurrency.
final HConnection sharedConnection = HConnectionManager.createConnection(getConf()/*, pool*/);
try {
Thread [] ts = new Thread[clients];
for (int j = 0; j < ts.length; j++) {
ts[j] = new Thread("" + j) {
final Configuration c = getConf();
@Override
public void run() {
try {
cycle(c, sharedConnection);
} catch (IOException e) {
e.printStackTrace();
}
}
};
ts[j].start();
}
for (int j = 0; j < ts.length; j++) {
ts[j].join();
}
} finally {
sharedConnection.close();
}
return errCode;
}
/**
* Run a client instance against a faked up server.
* @param args TODO
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
}
}

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -318,7 +320,10 @@ public class TestIPC {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(
HConstants.EMPTY_BYTE_ARRAY, rm, cells);
HConstants.EMPTY_BYTE_ARRAY, rm, cells,
RegionAction.newBuilder(),
ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
CellScanner cellScanner = CellUtil.createCellScanner(cells);
if (i % 1000 == 0) {
LOG.info("" + i);

View File

@ -219,7 +219,8 @@ public class TestProtobufUtil {
mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT);
Increment increment = ProtobufUtil.toIncrement(proto, null);
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(increment));
assertEquals(mutateBuilder.build(),
ProtobufUtil.toMutation(increment, MutationProto.newBuilder()));
}
/**