diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index eee7dd63fb3..4b015037054 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -641,8 +641,7 @@ class AsyncProcess { private void logAndResubmit(List> initialActions, HRegionLocation oldLocation, List> toReplay, int numAttempt, int failureCount, Throwable throwable, - HConnectionManager.ServerErrorTracker errorsByServer){ - + HConnectionManager.ServerErrorTracker errorsByServer) { if (toReplay.isEmpty()) { // it's either a success or a last failure if (failureCount != 0) { @@ -789,22 +788,22 @@ class AsyncProcess { StringBuilder sb = new StringBuilder(); sb.append("#").append(id).append(", table=").append(tableName). - append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" "); + append(", attempt=").append(numAttempt).append("/").append(numTries).append(" "); if (failureCount > 0 || error != null){ - sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: "). - append(error == null ? "null" : error.getMessage()); - }else { + sb.append("failed ").append(failureCount).append(" ops").append(", last exception: "). + append(error == null ? "null" : error); + } else { sb.append("SUCCEEDED"); } - sb.append(" on server ").append(sn); + sb.append(" on ").append(sn); - sb.append(", tracking started at ").append(startTime); + sb.append(", tracking started ").append(startTime); if (willRetry) { - sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms"). - append(", will replay ").append(replaySize).append(" ops."); + sb.append(", retrying after ").append(backOffTime).append(" ms"). + append(", replay ").append(replaySize).append(" ops."); } else if (failureCount > 0) { sb.append(" - FAILED, NOT RETRYING ANYMORE"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index e9c5977a7d8..060a79a02a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -41,10 +41,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; /** * A cluster connection. Knows how to find the master, locate regions out on the cluster, - * keeps a cache of locations and then knows how to re-calibrate after they move. - * {@link HConnectionManager} manages instances of this class. This is NOT a connection to a - * particular server but to all servers in the cluster. Individual connections are managed at a - * lower level. + * keeps a cache of locations and then knows how to re-calibrate after they move. You need one + * of these to talk to your HBase cluster. {@link HConnectionManager} manages instances of this + * class. See it for how to get one of these. + * + *

This is NOT a connection to a particular server but to ALL servers in the cluster. Individual + * connections are managed at a lower level. * *

HConnections are used by {@link HTable} mostly but also by * {@link HBaseAdmin}, and {@link CatalogTracker}. HConnection instances can be shared. Sharing diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 93305e5cf09..477eb88fc45 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -71,7 +71,10 @@ import com.google.protobuf.Service; import com.google.protobuf.ServiceException; /** - *

Used to communicate with a single HBase table. + *

Used to communicate with a single HBase table. An implementation of + * {@link HTableInterface}. Instances of this class can be constructed directly but it is + * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}. + * See {@link HConnectionManager} class comment for an example. * *

This class is not thread safe for reads nor write. * @@ -336,7 +339,7 @@ public class HTable implements HTableInterface { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); - ap = new AsyncProcess(connection, tableName, pool, null, + ap = new AsyncProcess(connection, tableName, pool, null, configuration, rpcCallerFactory); this.maxKeyValueSize = this.configuration.getInt( @@ -1070,7 +1073,7 @@ public class HTable implements HTableInterface { throw new IOException( "Invalid arguments to incrementColumnValue", npe); } - + RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { public Long call() throws IOException { @@ -1525,7 +1528,7 @@ public class HTable implements HTableInterface { @Override public String toString() { - return tableName + ", " + connection; + return tableName + ";" + connection; } /** @@ -1541,4 +1544,4 @@ public class HTable implements HTableInterface { t.close(); } } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index 8ee80845617..6b49a0e6753 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -37,6 +37,7 @@ import java.util.Map; /** * Used to communicate with a single HBase table. + * Obtain an instance from an {@ink HConnection}. * * @since 0.21.0 */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index b1bc310b0da..7d9b7d42609 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.ServiceException; @@ -65,20 +66,29 @@ class MultiServerCallable extends RegionServerCallable { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); + RegionAction.Builder regionActionBuilder = RegionAction.newBuilder(); + ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); List cells = null; // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry>> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); final List> actions = e.getValue(); - RegionAction.Builder regionActionBuilder; + regionActionBuilder.clear(); + regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( + HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) ); + + if (this.cellBlock) { // Presize. Presume at least a KV per Action. There are likely more. if (cells == null) cells = new ArrayList(countOfActions); // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. // They have already been handled above. Guess at count of cells - regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells); + regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, + regionActionBuilder, actionBuilder, mutationBuilder); } else { - regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions); + regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, + regionActionBuilder, actionBuilder, mutationBuilder); } multiRequestBuilder.addRegionAction(regionActionBuilder.build()); } @@ -118,4 +128,4 @@ class MultiServerCallable extends RegionServerCallable { // Use the location we were given in the constructor rather than go look it up. setStub(getConnection().getClient(getLocation().getServerName())); } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index ac364018895..9b3c6fef68a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -988,8 +988,8 @@ public final class ProtobufUtil { * @param increment * @return the converted mutate */ - public static MutationProto toMutation(final Increment increment) { - MutationProto.Builder builder = MutationProto.newBuilder(); + public static MutationProto toMutation(final Increment increment, + final MutationProto.Builder builder) { builder.setRow(ZeroCopyLiteralByteString.wrap(increment.getRow())); builder.setMutateType(MutationType.INCREMENT); builder.setDurability(toDurability(increment.getDurability())); @@ -1045,12 +1045,18 @@ public final class ProtobufUtil { */ public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { - MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation); + return toMutation(type, mutation, MutationProto.newBuilder()); + } + + public static MutationProto toMutation(final MutationType type, final Mutation mutation, + MutationProto.Builder builder) + throws IOException { + builder = getMutationBuilderAndSetCommonFields(type, mutation, builder); ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { + columnBuilder.clear(); columnBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family.getKey())); - columnBuilder.clearQualifierValue(); for (Cell cell: family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); valueBuilder.setQualifier(ZeroCopyLiteralByteString.wrap( @@ -1080,9 +1086,10 @@ public final class ProtobufUtil { * @return a protobuf'd Mutation * @throws IOException */ - public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation) + public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation, + final MutationProto.Builder builder) throws IOException { - MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation); + getMutationBuilderAndSetCommonFields(type, mutation, builder); builder.setAssociatedCellCount(mutation.size()); return builder.build(); } @@ -1095,8 +1102,7 @@ public final class ProtobufUtil { * @return A partly-filled out protobuf'd Mutation. */ private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type, - final Mutation mutation) { - MutationProto.Builder builder = MutationProto.newBuilder(); + final Mutation mutation, MutationProto.Builder builder) { builder.setRow(ZeroCopyLiteralByteString.wrap(mutation.getRow())); builder.setMutateType(type); builder.setDurability(toDurability(mutation.getDurability())); @@ -2254,15 +2260,16 @@ public final class ProtobufUtil { // Doing this is going to kill us if we do it for all data passed. // St.Ack 20121205 CellProtos.Cell.Builder kvbuilder = CellProtos.Cell.newBuilder(); - kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(), + kvbuilder.setRow(ZeroCopyLiteralByteString.wrap(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(), + kvbuilder.setFamily(ZeroCopyLiteralByteString.wrap(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())); - kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(), + kvbuilder.setQualifier(ZeroCopyLiteralByteString.wrap(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())); kvbuilder.setCellType(CellProtos.CellType.valueOf(kv.getTypeByte())); kvbuilder.setTimestamp(kv.getTimestamp()); - kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + kvbuilder.setValue(ZeroCopyLiteralByteString.wrap(kv.getValueArray(), kv.getValueOffset(), + kv.getValueLength())); return kvbuilder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 77cd5ee2eef..5f8f96e3946 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -218,7 +218,7 @@ public final class RequestConverter { builder.setRegion(region); Condition condition = buildCondition( row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder())); builder.setCondition(condition); return builder.build(); } @@ -246,7 +246,8 @@ public final class RequestConverter { builder.setRegion(region); Condition condition = buildCondition( row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, + MutationProto.newBuilder())); builder.setCondition(condition); return builder.build(); } @@ -265,7 +266,7 @@ public final class RequestConverter { RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); - builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder())); return builder.build(); } @@ -283,7 +284,8 @@ public final class RequestConverter { RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); - builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append, + MutationProto.newBuilder())); return builder.build(); } @@ -300,7 +302,7 @@ public final class RequestConverter { RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); - builder.setMutation(ProtobufUtil.toMutation(increment)); + builder.setMutation(ProtobufUtil.toMutation(increment, MutationProto.newBuilder())); return builder.build(); } @@ -318,7 +320,8 @@ public final class RequestConverter { RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); - builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete)); + builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, + MutationProto.newBuilder())); return builder.build(); } @@ -334,7 +337,10 @@ public final class RequestConverter { public static RegionAction.Builder buildRegionAction(final byte [] regionName, final RowMutations rowMutations) throws IOException { - RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName); + RegionAction.Builder builder = + getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); + ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { MutationType mutateType = null; if (mutation instanceof Put) { @@ -345,8 +351,11 @@ public final class RequestConverter { throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + mutation.getClass().getName()); } - MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation); - builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build()); + mutationBuilder.clear(); + MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); + actionBuilder.clear(); + actionBuilder.setMutation(mp); + builder.addAction(actionBuilder.build()); } return builder; } @@ -363,9 +372,11 @@ public final class RequestConverter { * @throws IOException */ public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final RowMutations rowMutations, final List cells) + final RowMutations rowMutations, final List cells, + final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, + final MutationProto.Builder mutationBuilder) throws IOException { - RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName); for (Mutation mutation: rowMutations.getMutations()) { MutationType type = null; if (mutation instanceof Put) { @@ -376,18 +387,20 @@ public final class RequestConverter { throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + mutation.getClass().getName()); } - MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation); + mutationBuilder.clear(); + MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); cells.add(mutation); - builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build()); + actionBuilder.clear(); + regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); } - return builder; + return regionActionBuilder; } - private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) { - RegionAction.Builder builder = RegionAction.newBuilder(); + private static RegionAction.Builder getRegionActionBuilderWithRegion( + final RegionAction.Builder regionActionBuilder, final byte [] regionName) { RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - return builder; + regionActionBuilder.setRegion(region); + return regionActionBuilder; } /** @@ -484,36 +497,37 @@ public final class RequestConverter { * @throws IOException */ public static RegionAction.Builder buildRegionAction(final byte[] regionName, - final List> actions) + final List> actions, final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, + final MutationProto.Builder mutationBuilder) throws IOException { - RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName); - ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); actionBuilder.setIndex(action.getOriginalIndex()); + mutationBuilder.clear(); if (row instanceof Get) { Get g = (Get)row; - builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); + regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); } else if (row instanceof Put) { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row))); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder))); } else if (row instanceof Delete) { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row))); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder))); } else if (row instanceof Append) { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row))); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row, mutationBuilder))); } else if (row instanceof Increment) { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation((Increment)row))); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation((Increment)row, mutationBuilder))); } else if (row instanceof RowMutations) { throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } } - return builder; + return regionActionBuilder; } /** @@ -533,14 +547,18 @@ public final class RequestConverter { * @throws IOException */ public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final List> actions, final List cells) + final List> actions, final List cells, + final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, + final MutationProto.Builder mutationBuilder) throws IOException { - RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName); - ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + RegionAction.Builder builder = + getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); actionBuilder.setIndex(action.getOriginalIndex()); + mutationBuilder.clear(); if (row instanceof Get) { Get g = (Get)row; builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); @@ -548,7 +566,7 @@ public final class RequestConverter { Put p = (Put)row; cells.add(p); builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p))); + setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder))); } else if (row instanceof Delete) { Delete d = (Delete)row; int size = d.size(); @@ -560,21 +578,21 @@ public final class RequestConverter { if (size > 0) { cells.add(d); builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d))); + setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder))); } else { builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d))); + setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder))); } } else if (row instanceof Append) { Append a = (Append)row; cells.add(a); builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a))); + setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a, mutationBuilder))); } else if (row instanceof Increment) { Increment i = (Increment)row; cells.add(i); builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i))); + setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i, mutationBuilder))); } else if (row instanceof RowMutations) { continue; // ignore RowMutations } else { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index d19e3914689..021a1b0957d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -22,41 +22,79 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.net.SocketTimeoutException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import com.google.common.base.Stopwatch; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import com.google.protobuf.ZeroCopyLiteralByteString; /** * Test client behavior w/o setting up a cluster. * Mock up cluster emissions. */ @Category(SmallTests.class) -public class TestClientNoCluster { +public class TestClientNoCluster extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class); private Configuration conf; + public static final ServerName META_SERVERNAME = + new ServerName("meta.example.org", 60010, 12345); @Before public void setUp() throws Exception { @@ -71,7 +109,7 @@ public class TestClientNoCluster { * Simple cluster registry inserted in place of our usual zookeeper based one. */ static class SimpleRegistry implements Registry { - final ServerName META_HOST = new ServerName("10.10.10.10", 60010, 12345); + final ServerName META_HOST = META_SERVERNAME; @Override public void init(HConnection connection) { @@ -301,4 +339,456 @@ public class TestClientNoCluster { return this.stub; } } + + /** + * Fake many regionservers and many regions on a connection implementation. + */ + static class ManyServersManyRegionsConnection + extends HConnectionManager.HConnectionImplementation { + // All access should be synchronized + final Map serversByClient; + + /** + * Map of faked-up rows of a 'meta table'. + */ + final SortedMap> meta; + final AtomicLong sequenceids = new AtomicLong(0); + private final Configuration conf; + + ManyServersManyRegionsConnection(Configuration conf, boolean managed, + ExecutorService pool, User user) + throws IOException { + super(conf, managed, pool, user); + int serverCount = conf.getInt("hbase.test.servers", 10); + this.serversByClient = + new HashMap(serverCount); + this.meta = makeMeta(Bytes.toBytes( + conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), + conf.getInt("hbase.test.regions", 100), + conf.getLong("hbase.test.namespace.span", 1000), + serverCount); + this.conf = conf; + } + + @Override + public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { + // if (!sn.toString().startsWith("meta")) LOG.info(sn); + ClientService.BlockingInterface stub = null; + synchronized (this.serversByClient) { + stub = this.serversByClient.get(sn); + if (stub == null) { + stub = new FakeServer(this.conf, meta, sequenceids); + this.serversByClient.put(sn, stub); + } + } + return stub; + } + } + + static MultiResponse doMultiResponse(final SortedMap> meta, + final AtomicLong sequenceids, final MultiRequest request) { + // Make a response to match the request. Act like there were no failures. + ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); + // Per Region. + RegionActionResult.Builder regionActionResultBuilder = + RegionActionResult.newBuilder(); + ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); + for (RegionAction regionAction: request.getRegionActionList()) { + regionActionResultBuilder.clear(); + // Per Action in a Region. + for (ClientProtos.Action action: regionAction.getActionList()) { + roeBuilder.clear(); + // Return empty Result and proper index as result. + roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); + roeBuilder.setIndex(action.getIndex()); + regionActionResultBuilder.addResultOrException(roeBuilder.build()); + } + builder.addRegionActionResult(regionActionResultBuilder.build()); + } + return builder.build(); + } + + /** + * Fake 'server'. + * Implements the ClientService responding as though it were a 'server' (presumes a new + * ClientService.BlockingInterface made per server). + */ + static class FakeServer implements ClientService.BlockingInterface { + private AtomicInteger multiInvocationsCount = new AtomicInteger(0); + private final SortedMap> meta; + private final AtomicLong sequenceids; + private final long multiPause; + private final int tooManyMultiRequests; + + FakeServer(final Configuration c, final SortedMap> meta, + final AtomicLong sequenceids) { + this.meta = meta; + this.sequenceids = sequenceids; + + // Pause to simulate the server taking time applying the edits. This will drive up the + // number of threads used over in client. + this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); + this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); + } + + @Override + public GetResponse get(RpcController controller, GetRequest request) + throws ServiceException { + boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(), + request.getRegion().getType()); + if (!metaRegion) throw new UnsupportedOperationException(); + return doMetaGetResponse(meta, request); + } + + @Override + public MutateResponse mutate(RpcController controller, + MutateRequest request) throws ServiceException { + throw new NotImplementedException(); + } + + @Override + public ScanResponse scan(RpcController controller, + ScanRequest request) throws ServiceException { + // Presume it is a scan of meta for now. Not all scans provide a region spec expecting + // the server to keep reference by scannerid. TODO. + return doMetaScanResponse(meta, sequenceids, request); + } + + @Override + public BulkLoadHFileResponse bulkLoadHFile( + RpcController controller, BulkLoadHFileRequest request) + throws ServiceException { + throw new NotImplementedException(); + } + + @Override + public CoprocessorServiceResponse execService( + RpcController controller, CoprocessorServiceRequest request) + throws ServiceException { + throw new NotImplementedException(); + } + + @Override + public MultiResponse multi(RpcController controller, MultiRequest request) + throws ServiceException { + int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); + try { + if (concurrentInvocations >= tooManyMultiRequests) { + throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" + + concurrentInvocations)); + } + Threads.sleep(multiPause); + return doMultiResponse(meta, sequenceids, request); + } finally { + this.multiInvocationsCount.decrementAndGet(); + } + } + } + + static ScanResponse doMetaScanResponse(final SortedMap> meta, + final AtomicLong sequenceids, final ScanRequest request) { + ScanResponse.Builder builder = ScanResponse.newBuilder(); + int max = request.getNumberOfRows(); + int count = 0; + Map> tail = + request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta; + ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); + for (Map.Entry> e: tail.entrySet()) { + // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. + if (max <= 0) break; + if (++count > max) break; + HRegionInfo hri = e.getValue().getFirst(); + ByteString row = ZeroCopyLiteralByteString.wrap(hri.getRegionName()); + resultBuilder.clear(); + resultBuilder.addCell(getRegionInfo(row, hri)); + resultBuilder.addCell(getServer(row, e.getValue().getSecond())); + resultBuilder.addCell(getStartCode(row)); + builder.addResults(resultBuilder.build()); + // Set more to false if we are on the last region in table. + if (hri.getEndKey().length <= 0) builder.setMoreResults(false); + else builder.setMoreResults(true); + } + // If no scannerid, set one. + builder.setScannerId(request.hasScannerId()? + request.getScannerId(): sequenceids.incrementAndGet()); + return builder.build(); + } + + static GetResponse doMetaGetResponse(final SortedMap> meta, + final GetRequest request) { + ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); + ByteString row = request.getGet().getRow(); + Pair p = meta.get(row.toByteArray()); + if (p == null) { + if (request.getGet().getClosestRowBefore()) { + byte [] bytes = row.toByteArray(); + SortedMap> head = + bytes != null? meta.headMap(bytes): meta; + p = head == null? null: head.get(head.lastKey()); + } + } + if (p != null) { + resultBuilder.addCell(getRegionInfo(row, p.getFirst())); + resultBuilder.addCell(getServer(row, p.getSecond())); + } + resultBuilder.addCell(getStartCode(row)); + GetResponse.Builder builder = GetResponse.newBuilder(); + builder.setResult(resultBuilder.build()); + return builder.build(); + } + + /** + * @param name region name or encoded region name. + * @param type + * @return True if we are dealing with a hbase:meta region. + */ + static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) { + switch (type) { + case REGION_NAME: + return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); + case ENCODED_REGION_NAME: + return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); + default: throw new UnsupportedOperationException(); + } + } + + private final static ByteString CATALOG_FAMILY_BYTESTRING = + ZeroCopyLiteralByteString.wrap(HConstants.CATALOG_FAMILY); + private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = + ZeroCopyLiteralByteString.wrap(HConstants.REGIONINFO_QUALIFIER); + private final static ByteString SERVER_QUALIFIER_BYTESTRING = + ZeroCopyLiteralByteString.wrap(HConstants.SERVER_QUALIFIER); + + static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { + CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); + cellBuilder.setRow(row); + cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); + cellBuilder.setTimestamp(System.currentTimeMillis()); + return cellBuilder; + } + + static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { + CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); + cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); + cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(hri.toByteArray())); + return cellBuilder.build(); + } + + static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { + CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); + cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); + cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort())); + return cellBuilder.build(); + } + + static CellProtos.Cell getStartCode(final ByteString row) { + CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); + cellBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(HConstants.STARTCODE_QUALIFIER)); + // TODO: + cellBuilder.setValue(ZeroCopyLiteralByteString.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode()))); + return cellBuilder.build(); + } + + private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t"); + + /** + * Format passed integer. Zero-pad. + * Copied from hbase-server PE class and small amendment. Make them share. + * @param number + * @return Returns zero-prefixed 10-byte wide decimal version of passed + * number (Does absolute in case number is negative). + */ + private static byte [] format(final long number) { + byte [] b = new byte[10]; + long d = number; + for (int i = b.length - 1; i >= 0; i--) { + b[i] = (byte)((d % 10) + '0'); + d /= 10; + } + return b; + } + + /** + * @param count + * @param namespaceSpan + * @return count regions + */ + private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count, + final long namespaceSpan) { + byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; + byte [] endKey = HConstants.EMPTY_BYTE_ARRAY; + long interval = namespaceSpan / count; + HRegionInfo [] hris = new HRegionInfo[count]; + for (int i = 0; i < count; i++) { + if (i == 0) { + endKey = format(interval); + } else { + startKey = endKey; + if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; + else endKey = format((i + 1) * interval); + } + hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); + } + return hris; + } + + /** + * @param count + * @return Return count servernames. + */ + private static ServerName [] makeServerNames(final int count) { + ServerName [] sns = new ServerName[count]; + for (int i = 0; i < count; i++) { + sns[i] = new ServerName("" + i + ".example.org", 60010, i); + } + return sns; + } + + /** + * Comparator for meta row keys. + */ + private static class MetaRowsComparator implements Comparator { + private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator(); + @Override + public int compare(byte[] left, byte[] right) { + return delegate.compareRows(left, 0, left.length, right, 0, right.length); + } + } + + /** + * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and + * ServerName to return for this row. + * @param hris + * @param serverNames + * @return Map with faked hbase:meta content in it. + */ + static SortedMap> makeMeta(final byte [] tableName, + final int regionCount, final long namespaceSpan, final int serverCount) { + // I need a comparator for meta rows so we sort properly. + SortedMap> meta = + new ConcurrentSkipListMap>(new MetaRowsComparator()); + HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); + ServerName [] serverNames = makeServerNames(serverCount); + int per = regionCount / serverCount; + int count = 0; + for (HRegionInfo hri: hris) { + Pair p = + new Pair(hri, serverNames[count++ / per]); + meta.put(hri.getRegionName(), p); + } + return meta; + } + + /** + * Code for each 'client' to run. + * @param c + * @param sharedConnection + * @throws IOException + */ + static void cycle(final Configuration c, final HConnection sharedConnection) throws IOException { + HTableInterface table = sharedConnection.getTable(BIG_USER_TABLE); + table.setAutoFlushTo(false); + long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); + long startTime = System.currentTimeMillis(); + final int printInterval = 100000; + try { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(i); + Put p = new Put(b); + p.add(HConstants.CATALOG_FAMILY, b, b); + if (i % printInterval == 0) { + LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } + table.put(p); + } + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); + } finally { + table.close(); + } + } + + @Override + public int run(String[] arg0) throws Exception { + int errCode = 0; + // TODO: Make command options. + // How many servers to fake. + final int servers = 1; + // How many regions to put on the faked servers. + final int regions = 100000; + // How many 'keys' in the faked regions. + final long namespaceSpan = 1000000; + // How long to take to pause after doing a put; make this long if you want to fake a struggling + // server. + final long multiPause = 0; + // Check args make basic sense. + if ((namespaceSpan < regions) || (regions < servers)) { + throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" + + regions + " which must be > servers=" + servers); + } + + // Set my many servers and many regions faking connection in place. + getConf().set("hbase.client.connection.impl", + ManyServersManyRegionsConnection.class.getName()); + // Use simple kv registry rather than zk + getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); + // When to report fails. Default is we report the 10th. This means we'll see log everytime + // an exception is thrown -- usually RegionTooBusyException when we have more than + // hbase.test.multi.too.many requests outstanding at any time. + getConf().setInt("hbase.client.start.log.errors.counter", 0); + + // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. + getConf().setInt("hbase.test.regions", regions); + getConf().setLong("hbase.test.namespace.span", namespaceSpan); + getConf().setLong("hbase.test.servers", servers); + getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); + getConf().setLong("hbase.test.multi.pause.when.done", multiPause); + // Let there be ten outstanding requests at a time before we throw RegionBusyException. + getConf().setInt("hbase.test.multi.too.many", 10); + final int clients = 20; + + // Have them all share the same connection so they all share the same instance of + // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server. + final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p")); + // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p")); + // Share a connection so I can keep counts in the 'server' on concurrency. + final HConnection sharedConnection = HConnectionManager.createConnection(getConf()/*, pool*/); + try { + Thread [] ts = new Thread[clients]; + for (int j = 0; j < ts.length; j++) { + ts[j] = new Thread("" + j) { + final Configuration c = getConf(); + + @Override + public void run() { + try { + cycle(c, sharedConnection); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + ts[j].start(); + } + for (int j = 0; j < ts.length; j++) { + ts[j].join(); + } + } finally { + sharedConnection.close(); + } + return errCode; + } + + /** + * Run a client instance against a faked up server. + * @param args TODO + * @throws Exception + */ + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 4748915dbab..c1cbd7bb7e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -59,6 +59,8 @@ import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -318,7 +320,10 @@ public class TestIPC { List cells = new ArrayList(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction( - HConstants.EMPTY_BYTE_ARRAY, rm, cells); + HConstants.EMPTY_BYTE_ARRAY, rm, cells, + RegionAction.newBuilder(), + ClientProtos.Action.newBuilder(), + MutationProto.newBuilder()); CellScanner cellScanner = CellUtil.createCellScanner(cells); if (i % 1000 == 0) { LOG.info("" + i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index 77710d22b1c..51fceb5f6c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -219,7 +219,8 @@ public class TestProtobufUtil { mutateBuilder.setDurability(MutationProto.Durability.USE_DEFAULT); Increment increment = ProtobufUtil.toIncrement(proto, null); - assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(increment)); + assertEquals(mutateBuilder.build(), + ProtobufUtil.toMutation(increment, MutationProto.newBuilder())); } /**