HBASE-5785 Adding unit tests for protbuf utils introduced for HRegionInterface pb conversion

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1332824 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-01 21:02:21 +00:00
parent 69ffbf247f
commit 2f3d22f5b7
3 changed files with 607 additions and 273 deletions

View File

@ -28,8 +28,12 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
@ -58,6 +63,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
@ -554,6 +560,68 @@ public final class ProtobufUtil {
return increment;
}
/**
* Convert a client Scan to a protocol buffer Scan
*
* @param scan the client Scan to convert
* @return the converted protocol buffer Scan
* @throws IOException
*/
public static ClientProtos.Scan toScan(
final Scan scan) throws IOException {
ClientProtos.Scan.Builder scanBuilder =
ClientProtos.Scan.newBuilder();
scanBuilder.setCacheBlocks(scan.getCacheBlocks());
if (scan.getBatch() > 0) {
scanBuilder.setBatchSize(scan.getBatch());
}
scanBuilder.setMaxVersions(scan.getMaxVersions());
TimeRange timeRange = scan.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
scanBuilder.setTimeRange(timeRangeBuilder.build());
}
Map<String, byte[]> attributes = scan.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
scanBuilder.addAttribute(attributeBuilder.build());
}
}
byte[] startRow = scan.getStartRow();
if (startRow != null && startRow.length > 0) {
scanBuilder.setStartRow(ByteString.copyFrom(startRow));
}
byte[] stopRow = scan.getStopRow();
if (stopRow != null && stopRow.length > 0) {
scanBuilder.setStopRow(ByteString.copyFrom(stopRow));
}
if (scan.hasFilter()) {
scanBuilder.setFilter(ProtobufUtil.toParameter(scan.getFilter()));
}
Column.Builder columnBuilder = Column.newBuilder();
for (Map.Entry<byte[],NavigableSet<byte []>>
family: scan.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
NavigableSet<byte []> columns = family.getValue();
columnBuilder.clearQualifier();
if (columns != null && columns.size() > 0) {
for (byte [] qualifier: family.getValue()) {
if (qualifier != null) {
columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
}
}
}
scanBuilder.addColumn(columnBuilder.build());
}
return scanBuilder.build();
}
/**
* Convert a protocol buffer Scan to a client Scan
*
@ -615,6 +683,190 @@ public final class ProtobufUtil {
return scan;
}
/**
* Create a new protocol buffer Exec based on a client Exec
*
* @param exec
* @return
* @throws IOException
*/
public static ClientProtos.Exec toExec(
final Exec exec) throws IOException {
ClientProtos.Exec.Builder
builder = ClientProtos.Exec.newBuilder();
Configuration conf = exec.getConf();
if (conf != null) {
NameStringPair.Builder propertyBuilder = NameStringPair.newBuilder();
Iterator<Entry<String, String>> iterator = conf.iterator();
while (iterator.hasNext()) {
Entry<String, String> entry = iterator.next();
propertyBuilder.setName(entry.getKey());
propertyBuilder.setValue(entry.getValue());
builder.addProperty(propertyBuilder.build());
}
}
builder.setProtocolName(exec.getProtocolName());
builder.setMethodName(exec.getMethodName());
builder.setRow(ByteString.copyFrom(exec.getRow()));
Object[] parameters = exec.getParameters();
if (parameters != null && parameters.length > 0) {
Class<?>[] declaredClasses = exec.getParameterClasses();
for (int i = 0, n = parameters.length; i < n; i++) {
builder.addParameter(
ProtobufUtil.toParameter(declaredClasses[i], parameters[i]));
}
}
return builder.build();
}
/**
* Create a protocol buffer Get based on a client Get.
*
* @param get the client Get
* @return a protocol buffer Get
* @throws IOException
*/
public static ClientProtos.Get toGet(
final Get get) throws IOException {
ClientProtos.Get.Builder builder =
ClientProtos.Get.newBuilder();
builder.setRow(ByteString.copyFrom(get.getRow()));
builder.setCacheBlocks(get.getCacheBlocks());
builder.setMaxVersions(get.getMaxVersions());
if (get.getLockId() >= 0) {
builder.setLockId(get.getLockId());
}
if (get.getFilter() != null) {
builder.setFilter(ProtobufUtil.toParameter(get.getFilter()));
}
TimeRange timeRange = get.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
builder.setTimeRange(timeRangeBuilder.build());
}
Map<String, byte[]> attributes = get.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
builder.addAttribute(attributeBuilder.build());
}
}
if (get.hasFamilies()) {
Column.Builder columnBuilder = Column.newBuilder();
Map<byte[], NavigableSet<byte[]>> families = get.getFamilyMap();
for (Map.Entry<byte[], NavigableSet<byte[]>> family: families.entrySet()) {
NavigableSet<byte[]> qualifiers = family.getValue();
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifier();
if (qualifiers != null && qualifiers.size() > 0) {
for (byte[] qualifier: qualifiers) {
if (qualifier != null) {
columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
}
}
}
builder.addColumn(columnBuilder.build());
}
}
return builder.build();
}
/**
* Convert a client Increment to a protobuf Mutate.
*
* @param increment
* @return the converted mutate
*/
public static Mutate toMutate(final Increment increment) {
Mutate.Builder builder = Mutate.newBuilder();
builder.setRow(ByteString.copyFrom(increment.getRow()));
builder.setMutateType(MutateType.INCREMENT);
builder.setWriteToWAL(increment.getWriteToWAL());
if (increment.getLockId() >= 0) {
builder.setLockId(increment.getLockId());
}
TimeRange timeRange = increment.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
builder.setTimeRange(timeRangeBuilder.build());
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],NavigableMap<byte[], Long>>
family: increment.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue();
NavigableMap<byte[], Long> values = family.getValue();
if (values != null && values.size() > 0) {
for (Map.Entry<byte[], Long> value: values.entrySet()) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getKey()));
valueBuilder.setValue(ByteString.copyFrom(
Bytes.toBytes(value.getValue().longValue())));
columnBuilder.addQualifierValue(valueBuilder.build());
}
}
builder.addColumnValue(columnBuilder.build());
}
return builder.build();
}
/**
* Create a protocol buffer Mutate based on a client Mutation
*
* @param mutateType
* @param mutation
* @return a mutate
* @throws IOException
*/
public static Mutate toMutate(final MutateType mutateType,
final Mutation mutation) throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFrom(mutation.getRow()));
mutateBuilder.setMutateType(mutateType);
mutateBuilder.setWriteToWAL(mutation.getWriteToWAL());
if (mutation.getLockId() >= 0) {
mutateBuilder.setLockId(mutation.getLockId());
}
mutateBuilder.setTimestamp(mutation.getTimeStamp());
Map<String, byte[]> attributes = mutation.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
mutateBuilder.addAttribute(attributeBuilder.build());
}
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<KeyValue>>
family: mutation.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue();
for (KeyValue value: family.getValue()) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getQualifier()));
valueBuilder.setValue(ByteString.copyFrom(value.getValue()));
valueBuilder.setTimestamp(value.getTimestamp());
if (mutateType == MutateType.DELETE) {
KeyValue.Type keyValueType = KeyValue.Type.codeToType(value.getType());
valueBuilder.setDeleteType(toDeleteType(keyValueType));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
mutateBuilder.addColumnValue(columnBuilder.build());
}
return mutateBuilder.build();
}
/**
* Convert a client Result to a protocol buffer Result
*
@ -689,6 +941,27 @@ public final class ProtobufUtil {
return entries.toArray(new HLog.Entry[entries.size()]);
}
/**
* Convert a delete KeyValue type to protocol buffer DeleteType.
*
* @param type
* @return
* @throws IOException
*/
public static DeleteType toDeleteType(
KeyValue.Type type) throws IOException {
switch (type) {
case Delete:
return DeleteType.DELETE_ONE_VERSION;
case DeleteColumn:
return DeleteType.DELETE_MULTIPLE_VERSIONS;
case DeleteFamily:
return DeleteType.DELETE_FAMILY;
default:
throw new IOException("Unknown delete type: " + type);
}
}
/**
* Convert a protocol buffer Parameter to a Java object
*

View File

@ -18,16 +18,12 @@
package org.apache.hadoop.hbase.protobuf;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@ -43,7 +39,6 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -58,9 +53,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@ -74,14 +69,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -160,7 +151,7 @@ public final class RequestConverter {
RegionSpecifierType.REGION_NAME, regionName);
builder.setExistenceOnly(existenceOnly);
builder.setRegion(region);
builder.setGet(buildGet(get));
builder.setGet(ProtobufUtil.toGet(get));
return builder.build();
}
@ -222,7 +213,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
builder.setMutate(buildMutate(MutateType.PUT, put));
builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
builder.setCondition(condition);
return builder.build();
}
@ -250,7 +241,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
builder.setMutate(buildMutate(MutateType.DELETE, delete));
builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
builder.setCondition(condition);
return builder.build();
}
@ -269,7 +260,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutate(buildMutate(MutateType.PUT, put));
builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
return builder.build();
}
@ -287,7 +278,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutate(buildMutate(MutateType.APPEND, append));
builder.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, append));
return builder.build();
}
@ -304,7 +295,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutate(buildMutate(increment));
builder.setMutate(ProtobufUtil.toMutate(increment));
return builder.build();
}
@ -322,7 +313,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMutate(buildMutate(MutateType.DELETE, delete));
builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
return builder.build();
}
@ -352,7 +343,7 @@ public final class RequestConverter {
"RowMutations supports only put and delete, not "
+ mutation.getClass().getName());
}
Mutate mutate = buildMutate(mutateType, mutation);
Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
builder.addAction(ProtobufUtil.toParameter(mutate));
}
return builder.build();
@ -377,56 +368,7 @@ public final class RequestConverter {
builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner);
builder.setRegion(region);
ClientProtos.Scan.Builder scanBuilder =
ClientProtos.Scan.newBuilder();
scanBuilder.setCacheBlocks(scan.getCacheBlocks());
scanBuilder.setBatchSize(scan.getBatch());
scanBuilder.setMaxVersions(scan.getMaxVersions());
TimeRange timeRange = scan.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
scanBuilder.setTimeRange(timeRangeBuilder.build());
}
Map<String, byte[]> attributes = scan.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
scanBuilder.addAttribute(attributeBuilder.build());
}
}
byte[] startRow = scan.getStartRow();
if (startRow != null && startRow.length > 0) {
scanBuilder.setStartRow(ByteString.copyFrom(startRow));
}
byte[] stopRow = scan.getStopRow();
if (stopRow != null && stopRow.length > 0) {
scanBuilder.setStopRow(ByteString.copyFrom(stopRow));
}
if (scan.hasFilter()) {
scanBuilder.setFilter(ProtobufUtil.toParameter(scan.getFilter()));
}
Column.Builder columnBuilder = Column.newBuilder();
for (Map.Entry<byte[],NavigableSet<byte []>>
family: scan.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
NavigableSet<byte []> columns = family.getValue();
columnBuilder.clearQualifier();
if (columns != null && columns.size() > 0) {
for (byte [] qualifier: family.getValue()) {
if (qualifier != null) {
columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
}
}
}
scanBuilder.addColumn(columnBuilder.build());
}
builder.setScan(scanBuilder.build());
builder.setScan(ProtobufUtil.toScan(scan));
return builder.build();
}
@ -517,7 +459,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setCall(buildExec(exec));
builder.setCall(ProtobufUtil.toExec(exec));
return builder.build();
}
@ -540,17 +482,17 @@ public final class RequestConverter {
Message protoAction = null;
Row row = action.getAction();
if (row instanceof Get) {
protoAction = buildGet((Get)row);
protoAction = ProtobufUtil.toGet((Get)row);
} else if (row instanceof Put) {
protoAction = buildMutate(MutateType.PUT, (Put)row);
protoAction = ProtobufUtil.toMutate(MutateType.PUT, (Put)row);
} else if (row instanceof Delete) {
protoAction = buildMutate(MutateType.DELETE, (Delete)row);
protoAction = ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row);
} else if (row instanceof Exec) {
protoAction = buildExec((Exec)row);
protoAction = ProtobufUtil.toExec((Exec)row);
} else if (row instanceof Append) {
protoAction = buildMutate(MutateType.APPEND, (Append)row);
protoAction = ProtobufUtil.toMutate(MutateType.APPEND, (Append)row);
} else if (row instanceof Increment) {
protoAction = buildMutate((Increment)row);
protoAction = ProtobufUtil.toMutate((Increment)row);
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {
@ -875,202 +817,4 @@ public final class RequestConverter {
builder.setCompareType(compareType);
return builder.build();
}
/**
* Create a new protocol buffer Exec based on a client Exec
*
* @param exec
* @return
* @throws IOException
*/
private static ClientProtos.Exec buildExec(
final Exec exec) throws IOException {
ClientProtos.Exec.Builder
builder = ClientProtos.Exec.newBuilder();
Configuration conf = exec.getConf();
if (conf != null) {
NameStringPair.Builder propertyBuilder = NameStringPair.newBuilder();
Iterator<Entry<String, String>> iterator = conf.iterator();
while (iterator.hasNext()) {
Entry<String, String> entry = iterator.next();
propertyBuilder.setName(entry.getKey());
propertyBuilder.setValue(entry.getValue());
builder.addProperty(propertyBuilder.build());
}
}
builder.setProtocolName(exec.getProtocolName());
builder.setMethodName(exec.getMethodName());
builder.setRow(ByteString.copyFrom(exec.getRow()));
Object[] parameters = exec.getParameters();
if (parameters != null && parameters.length > 0) {
Class<?>[] declaredClasses = exec.getParameterClasses();
for (int i = 0, n = parameters.length; i < n; i++) {
builder.addParameter(
ProtobufUtil.toParameter(declaredClasses[i], parameters[i]));
}
}
return builder.build();
}
/**
* Create a protocol buffer Get based on a client Get.
*
* @param get the client Get
* @return a protocol buffer Get
* @throws IOException
*/
private static ClientProtos.Get buildGet(
final Get get) throws IOException {
ClientProtos.Get.Builder builder =
ClientProtos.Get.newBuilder();
builder.setRow(ByteString.copyFrom(get.getRow()));
builder.setCacheBlocks(get.getCacheBlocks());
builder.setMaxVersions(get.getMaxVersions());
if (get.getLockId() >= 0) {
builder.setLockId(get.getLockId());
}
if (get.getFilter() != null) {
builder.setFilter(ProtobufUtil.toParameter(get.getFilter()));
}
TimeRange timeRange = get.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
builder.setTimeRange(timeRangeBuilder.build());
}
Map<String, byte[]> attributes = get.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
builder.addAttribute(attributeBuilder.build());
}
}
if (get.hasFamilies()) {
Column.Builder columnBuilder = Column.newBuilder();
Map<byte[], NavigableSet<byte[]>> families = get.getFamilyMap();
for (Map.Entry<byte[], NavigableSet<byte[]>> family: families.entrySet()) {
NavigableSet<byte[]> qualifiers = family.getValue();
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifier();
if (qualifiers != null && qualifiers.size() > 0) {
for (byte[] qualifier: qualifiers) {
if (qualifier != null) {
columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
}
}
}
builder.addColumn(columnBuilder.build());
}
}
return builder.build();
}
private static Mutate buildMutate(final Increment increment) {
Mutate.Builder builder = Mutate.newBuilder();
builder.setRow(ByteString.copyFrom(increment.getRow()));
builder.setMutateType(MutateType.INCREMENT);
builder.setWriteToWAL(increment.getWriteToWAL());
if (increment.getLockId() >= 0) {
builder.setLockId(increment.getLockId());
}
TimeRange timeRange = increment.getTimeRange();
if (!timeRange.isAllTime()) {
HBaseProtos.TimeRange.Builder timeRangeBuilder =
HBaseProtos.TimeRange.newBuilder();
timeRangeBuilder.setFrom(timeRange.getMin());
timeRangeBuilder.setTo(timeRange.getMax());
builder.setTimeRange(timeRangeBuilder.build());
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],NavigableMap<byte[], Long>>
family: increment.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue();
NavigableMap<byte[], Long> values = family.getValue();
if (values != null && values.size() > 0) {
for (Map.Entry<byte[], Long> value: values.entrySet()) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getKey()));
valueBuilder.setValue(ByteString.copyFrom(
Bytes.toBytes(value.getValue().longValue())));
columnBuilder.addQualifierValue(valueBuilder.build());
}
}
builder.addColumnValue(columnBuilder.build());
}
return builder.build();
}
/**
* Create a protocol buffer Mutate based on a client Mutation
*
* @param mutateType
* @param mutation
* @return a mutate
* @throws IOException
*/
private static Mutate buildMutate(final MutateType mutateType,
final Mutation mutation) throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFrom(mutation.getRow()));
mutateBuilder.setMutateType(mutateType);
mutateBuilder.setWriteToWAL(mutation.getWriteToWAL());
if (mutation.getLockId() >= 0) {
mutateBuilder.setLockId(mutation.getLockId());
}
mutateBuilder.setTimestamp(mutation.getTimeStamp());
Map<String, byte[]> attributes = mutation.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
mutateBuilder.addAttribute(attributeBuilder.build());
}
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<KeyValue>>
family: mutation.getFamilyMap().entrySet()) {
columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
columnBuilder.clearQualifierValue();
for (KeyValue value: family.getValue()) {
valueBuilder.setQualifier(ByteString.copyFrom(value.getQualifier()));
valueBuilder.setValue(ByteString.copyFrom(value.getValue()));
valueBuilder.setTimestamp(value.getTimestamp());
if (mutateType == MutateType.DELETE) {
KeyValue.Type keyValueType = KeyValue.Type.codeToType(value.getType());
valueBuilder.setDeleteType(toDeleteType(keyValueType));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
mutateBuilder.addColumnValue(columnBuilder.build());
}
return mutateBuilder.build();
}
/**
* Convert a delete KeyValue type to protocol buffer DeleteType.
*
* @param type
* @return
* @throws IOException
*/
private static DeleteType toDeleteType(
KeyValue.Type type) throws IOException {
switch (type) {
case Delete:
return DeleteType.DELETE_ONE_VERSION;
case DeleteColumn:
return DeleteType.DELETE_MULTIPLE_VERSIONS;
case DeleteFamily:
return DeleteType.DELETE_FAMILY;
default:
throw new IOException("Unknown delete type: " + type);
}
}
}

View File

@ -0,0 +1,317 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.protobuf;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ByteString;
/**
* Class to test ProtobufUtil.
*/
@Category(SmallTests.class)
public class TestProtobufUtil {
/**
* Test basic Get conversions.
*
* @throws IOException
*/
@Test
public void testGet() throws IOException {
ClientProtos.Get.Builder getBuilder = ClientProtos.Get.newBuilder();
getBuilder.setRow(ByteString.copyFromUtf8("row"));
Column.Builder columnBuilder = Column.newBuilder();
columnBuilder.setFamily(ByteString.copyFromUtf8("f1"));
columnBuilder.addQualifier(ByteString.copyFromUtf8("c1"));
columnBuilder.addQualifier(ByteString.copyFromUtf8("c2"));
getBuilder.addColumn(columnBuilder.build());
columnBuilder.clear();
columnBuilder.setFamily(ByteString.copyFromUtf8("f2"));
getBuilder.addColumn(columnBuilder.build());
ClientProtos.Get proto = getBuilder.build();
// default fields
assertEquals(1, proto.getMaxVersions());
assertEquals(true, proto.getCacheBlocks());
// set the default value for equal comparison
getBuilder = ClientProtos.Get.newBuilder(proto);
getBuilder.setMaxVersions(1);
getBuilder.setCacheBlocks(true);
Get get = ProtobufUtil.toGet(proto);
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
}
/**
* Test Append Mutate conversions.
*
* @throws IOException
*/
@Test
public void testAppend() throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutateType.APPEND);
mutateBuilder.setTimestamp(111111);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v1"));
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
Mutate proto = mutateBuilder.build();
// default fields
assertEquals(true, proto.getWriteToWAL());
// set the default value for equal comparison
mutateBuilder = Mutate.newBuilder(proto);
mutateBuilder.setWriteToWAL(true);
Append append = ProtobufUtil.toAppend(proto);
// append always use the latest timestamp,
// add the timestamp to the original mutate
long timestamp = append.getTimeStamp();
mutateBuilder.setTimestamp(timestamp);
for (ColumnValue.Builder column:
mutateBuilder.getColumnValueBuilderList()) {
for (QualifierValue.Builder qualifier:
column.getQualifierValueBuilderList()) {
qualifier.setTimestamp(timestamp);
}
}
assertEquals(mutateBuilder.build(),
ProtobufUtil.toMutate(MutateType.APPEND, append));
}
/**
* Test Delete Mutate conversions.
*
* @throws IOException
*/
@Test
public void testDelete() throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutateType.DELETE);
mutateBuilder.setTimestamp(111111);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setDeleteType(DeleteType.DELETE_ONE_VERSION);
qualifierBuilder.setTimestamp(111222);
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setDeleteType(DeleteType.DELETE_MULTIPLE_VERSIONS);
qualifierBuilder.setTimestamp(111333);
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
Mutate proto = mutateBuilder.build();
// default fields
assertEquals(true, proto.getWriteToWAL());
// set the default value for equal comparison
mutateBuilder = Mutate.newBuilder(proto);
mutateBuilder.setWriteToWAL(true);
Delete delete = ProtobufUtil.toDelete(proto);
// delete always have empty value,
// add empty value to the original mutate
for (ColumnValue.Builder column:
mutateBuilder.getColumnValueBuilderList()) {
for (QualifierValue.Builder qualifier:
column.getQualifierValueBuilderList()) {
qualifier.setValue(ByteString.EMPTY);
}
}
assertEquals(mutateBuilder.build(),
ProtobufUtil.toMutate(MutateType.DELETE, delete));
}
/**
* Test Increment Mutate conversions.
*
* @throws IOException
*/
@Test
public void testIncrement() throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutateType.INCREMENT);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(11L)));
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(22L)));
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
Mutate proto = mutateBuilder.build();
// default fields
assertEquals(true, proto.getWriteToWAL());
// set the default value for equal comparison
mutateBuilder = Mutate.newBuilder(proto);
mutateBuilder.setWriteToWAL(true);
Increment increment = ProtobufUtil.toIncrement(proto);
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutate(increment));
}
/**
* Test Put Mutate conversions.
*
* @throws IOException
*/
@Test
public void testPut() throws IOException {
Mutate.Builder mutateBuilder = Mutate.newBuilder();
mutateBuilder.setRow(ByteString.copyFromUtf8("row"));
mutateBuilder.setMutateType(MutateType.PUT);
mutateBuilder.setTimestamp(111111);
ColumnValue.Builder valueBuilder = ColumnValue.newBuilder();
valueBuilder.setFamily(ByteString.copyFromUtf8("f1"));
QualifierValue.Builder qualifierBuilder = QualifierValue.newBuilder();
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c1"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v1"));
valueBuilder.addQualifierValue(qualifierBuilder.build());
qualifierBuilder.setQualifier(ByteString.copyFromUtf8("c2"));
qualifierBuilder.setValue(ByteString.copyFromUtf8("v2"));
qualifierBuilder.setTimestamp(222222);
valueBuilder.addQualifierValue(qualifierBuilder.build());
mutateBuilder.addColumnValue(valueBuilder.build());
Mutate proto = mutateBuilder.build();
// default fields
assertEquals(true, proto.getWriteToWAL());
// set the default value for equal comparison
mutateBuilder = Mutate.newBuilder(proto);
mutateBuilder.setWriteToWAL(true);
Put put = ProtobufUtil.toPut(proto);
// put value always use the default timestamp if no
// value level timestamp specified,
// add the timestamp to the original mutate
long timestamp = put.getTimeStamp();
for (ColumnValue.Builder column:
mutateBuilder.getColumnValueBuilderList()) {
for (QualifierValue.Builder qualifier:
column.getQualifierValueBuilderList()) {
if (!qualifier.hasTimestamp()) {
qualifier.setTimestamp(timestamp);
}
}
}
assertEquals(mutateBuilder.build(),
ProtobufUtil.toMutate(MutateType.PUT, put));
}
/**
* Test basic Scan conversions.
*
* @throws IOException
*/
@Test
public void testScan() throws IOException {
ClientProtos.Scan.Builder scanBuilder = ClientProtos.Scan.newBuilder();
scanBuilder.setStartRow(ByteString.copyFromUtf8("row1"));
scanBuilder.setStopRow(ByteString.copyFromUtf8("row2"));
Column.Builder columnBuilder = Column.newBuilder();
columnBuilder.setFamily(ByteString.copyFromUtf8("f1"));
columnBuilder.addQualifier(ByteString.copyFromUtf8("c1"));
columnBuilder.addQualifier(ByteString.copyFromUtf8("c2"));
scanBuilder.addColumn(columnBuilder.build());
columnBuilder.clear();
columnBuilder.setFamily(ByteString.copyFromUtf8("f2"));
scanBuilder.addColumn(columnBuilder.build());
ClientProtos.Scan proto = scanBuilder.build();
// default fields
assertEquals(1, proto.getMaxVersions());
assertEquals(true, proto.getCacheBlocks());
scanBuilder = ClientProtos.Scan.newBuilder(proto);
scanBuilder.setMaxVersions(1);
scanBuilder.setCacheBlocks(true);
Scan scan = ProtobufUtil.toScan(proto);
assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));
}
/**
* Test basic Exec conversions.
*
* @throws IOException
*/
@Test
public void testExec() throws IOException {
ClientProtos.Exec.Builder execBuilder = ClientProtos.Exec.newBuilder();
execBuilder.setRow(ByteString.copyFromUtf8("row"));
execBuilder.setProtocolName(ColumnAggregationEndpoint.class.getName());
execBuilder.setMethodName("sum");
execBuilder.addParameter(ProtobufUtil.toParameter(Bytes.toBytes("f")));
execBuilder.addParameter(ProtobufUtil.toParameter(Bytes.toBytes("c")));
ClientProtos.Exec proto = execBuilder.build();
Exec exec = ProtobufUtil.toExec(proto);
execBuilder = ClientProtos.Exec.newBuilder(ProtobufUtil.toExec(exec));
execBuilder.clearProperty(); // remove properties added by default
assertEquals(proto, execBuilder.build());
}
}