HBASE-11161 Provide example of POJO encoding with protobuf (Nick Dimiduk)

This commit is contained in:
Andrew Purtell 2014-05-23 13:53:23 -07:00
parent 30aab8b5ea
commit 8b145419ed
3 changed files with 224 additions and 0 deletions

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.types;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Order;
import org.apache.hadoop.hbase.util.PositionedByteRange;
/**
* A base-class for {@link DataType} implementations backed by protobuf. See
* {@code PBKeyValue} in {@code hbase-examples} module.
*/
public abstract class PBType<T extends Message> implements DataType<T> {
@Override
public boolean isOrderPreserving() {
return false;
}
@Override
public Order getOrder() {
return null;
}
@Override
public boolean isNullable() {
return false;
}
@Override
public boolean isSkippable() {
return true;
}
@Override
public int encodedLength(T val) {
return val.getSerializedSize();
}
/**
* Create a {@link CodedInputStream} from a {@link PositionedByteRange}. Be sure to update
* {@code src}'s position after consuming from the stream.
* <p>For example:
* <pre>
* Foo.Builder builder = ...
* CodedInputStream is = inputStreamFromByteRange(src);
* Foo ret = builder.mergeFrom(is).build();
* src.setPosition(src.getPosition() + is.getTotalBytesRead());
* </pre>
*/
public static CodedInputStream inputStreamFromByteRange(PositionedByteRange src) {
return CodedInputStream.newInstance(
src.getBytes(),
src.getOffset() + src.getPosition(),
src.getRemaining());
}
/**
* Create a {@link CodedOutputStream} from a {@link PositionedByteRange}. Be sure to update
* {@code dst}'s position after writing to the stream.
* <p>For example:
* <pre>
* CodedOutputStream os = outputStreamFromByteRange(dst);
* int before = os.spaceLeft(), after, written;
* val.writeTo(os);
* after = os.spaceLeft();
* written = before - after;
* dst.setPosition(dst.getPosition() + written);
* </pre>
*/
public static CodedOutputStream outputStreamFromByteRange(PositionedByteRange dst) {
return CodedOutputStream.newInstance(
dst.getBytes(),
dst.getOffset() + dst.getPosition(),
dst.getRemaining()
);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.types;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.util.PositionedByteRange;
import java.io.IOException;
/**
* An example for using protobuf objects with {@link DataType} API.
*/
public class PBCell extends PBType<CellProtos.Cell> {
@Override
public Class<CellProtos.Cell> encodedClass() {
return CellProtos.Cell.class;
}
@Override
public int skip(PositionedByteRange src) {
CellProtos.Cell.Builder builder = CellProtos.Cell.newBuilder();
CodedInputStream is = inputStreamFromByteRange(src);
try {
builder.mergeFrom(is);
int consumed = is.getTotalBytesRead();
src.setPosition(src.getPosition() + consumed);
return consumed;
} catch (IOException e) {
throw new RuntimeException("Error while skipping type.", e);
}
}
@Override
public CellProtos.Cell decode(PositionedByteRange src) {
CellProtos.Cell.Builder builder = CellProtos.Cell.newBuilder();
CodedInputStream is = inputStreamFromByteRange(src);
try {
CellProtos.Cell ret = builder.mergeFrom(is).build();
src.setPosition(src.getPosition() + is.getTotalBytesRead());
return ret;
} catch (IOException e) {
throw new RuntimeException("Error while decoding type.", e);
}
}
@Override
public int encode(PositionedByteRange dst, CellProtos.Cell val) {
CodedOutputStream os = outputStreamFromByteRange(dst);
try {
int before = os.spaceLeft(), after, written;
val.writeTo(os);
after = os.spaceLeft();
written = before - after;
dst.setPosition(dst.getPosition() + written);
return written;
} catch (IOException e) {
throw new RuntimeException("Error while encoding type.", e);
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.types;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PositionedByteRange;
import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
import org.junit.Test;
public class TestPBCell {
private static final PBCell CODEC = new PBCell();
/**
* Basic test to verify utility methods in {@link PBType} and delegation to protobuf works.
*/
@Test
public void testRoundTrip() {
final Cell cell = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("fam"),
Bytes.toBytes("qual"), Bytes.toBytes("val"));
CellProtos.Cell c = ProtobufUtil.toCell(cell), decoded;
PositionedByteRange pbr = new SimplePositionedByteRange(c.getSerializedSize());
pbr.setPosition(0);
int encodedLength = CODEC.encode(pbr, c);
pbr.setPosition(0);
decoded = CODEC.decode(pbr);
assertEquals(encodedLength, pbr.getPosition());
assertTrue(CellComparator.equals(cell, ProtobufUtil.toCell(decoded)));
}
}