HBASE-10321 CellCodec has broken the 96 client to 98 server compatibility

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1557781 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2014-01-13 17:30:13 +00:00
parent a041afacff
commit 634b1a4f1a
4 changed files with 238 additions and 51 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* Basic Cell codec that just writes out all the individual elements of a Cell. Uses ints
* delimiting all lengths. Profligate. Needs tune up.
* Note: This will not write tags of a Cell.
*/
@InterfaceAudience.Private
public class CellCodec implements Codec {
@ -53,8 +54,6 @@ public class CellCodec implements Codec {
this.out.write(cell.getTypeByte());
// Value
write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
// Write tags
write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
// MvccVersion
this.out.write(Bytes.toBytes(cell.getMvccVersion()));
}
@ -87,12 +86,11 @@ public class CellCodec implements Codec {
long timestamp = Bytes.toLong(longArray);
byte type = (byte) this.in.read();
byte[] value = readByteArray(in);
byte[] tags = readByteArray(in);
// Read memstore version
byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
IOUtils.readFully(this.in, memstoreTSArray);
long memstoreTS = Bytes.toLong(memstoreTSArray);
return CellUtil.createCell(row, family, qualifier, timestamp, type, value, tags, memstoreTS);
return CellUtil.createCell(row, family, qualifier, timestamp, type, value, memstoreTS);
}
/**

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
* Uses ints delimiting all lengths. Profligate. Needs tune up.
*/
@InterfaceAudience.Private
public class CellCodecV2 implements Codec {
static class CellEncoder extends BaseEncoder {
CellEncoder(final OutputStream out) {
super(out);
}
@Override
public void write(Cell cell) throws IOException {
checkFlushed();
// Row
write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
// Column family
write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
// Qualifier
write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
// Version
this.out.write(Bytes.toBytes(cell.getTimestamp()));
// Type
this.out.write(cell.getTypeByte());
// Value
write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
// Tags
write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
// MvccVersion
this.out.write(Bytes.toBytes(cell.getMvccVersion()));
}
/**
* Write int length followed by array bytes.
*
* @param bytes
* @param offset
* @param length
* @throws IOException
*/
private void write(final byte[] bytes, final int offset, final int length) throws IOException {
this.out.write(Bytes.toBytes(length));
this.out.write(bytes, offset, length);
}
}
static class CellDecoder extends BaseDecoder {
public CellDecoder(final InputStream in) {
super(in);
}
protected Cell parseCell() throws IOException {
byte[] row = readByteArray(this.in);
byte[] family = readByteArray(in);
byte[] qualifier = readByteArray(in);
byte[] longArray = new byte[Bytes.SIZEOF_LONG];
IOUtils.readFully(this.in, longArray);
long timestamp = Bytes.toLong(longArray);
byte type = (byte) this.in.read();
byte[] value = readByteArray(in);
byte[] tags = readByteArray(in);
// Read memstore version
byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
IOUtils.readFully(this.in, memstoreTSArray);
long memstoreTS = Bytes.toLong(memstoreTSArray);
return CellUtil.createCell(row, family, qualifier, timestamp, type, value, tags, memstoreTS);
}
/**
* @return Byte array read from the stream.
* @throws IOException
*/
private byte[] readByteArray(final InputStream in) throws IOException {
byte[] intArray = new byte[Bytes.SIZEOF_INT];
IOUtils.readFully(in, intArray);
int length = Bytes.toInt(intArray);
byte[] bytes = new byte[length];
IOUtils.readFully(in, bytes);
return bytes;
}
}
@Override
public Decoder getDecoder(InputStream is) {
return new CellDecoder(is);
}
@Override
public Encoder getEncoder(OutputStream os) {
return new CellEncoder(os);
}
}

View File

@ -27,10 +27,8 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.codec.CellCodec;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.util.Bytes;
@ -124,47 +122,4 @@ public class TestCellCodec {
dis.close();
assertEquals(offset, cis.getCount());
}
@Test
public void testThreeWithTag() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountingOutputStream cos = new CountingOutputStream(baos);
DataOutputStream dos = new DataOutputStream(cos);
Codec codec = new CellCodec();
Codec.Encoder encoder = codec.getEncoder(dos);
final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
new Tag((byte) 1, Bytes.toBytes("teststring1")),
new Tag((byte) 2, Bytes.toBytes("testString2")) });
final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
Bytes.toBytes("teststring3")), });
final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
new Tag((byte) 2, Bytes.toBytes("teststring4")),
new Tag((byte) 2, Bytes.toBytes("teststring5")),
new Tag((byte) 1, Bytes.toBytes("teststring6")) });
encoder.write(kv1);
encoder.write(kv2);
encoder.write(kv3);
encoder.flush();
dos.close();
long offset = cos.getCount();
CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
DataInputStream dis = new DataInputStream(cis);
Codec.Decoder decoder = codec.getDecoder(dis);
assertTrue(decoder.advance());
Cell c = decoder.current();
assertTrue(CellComparator.equals(c, kv1));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, kv2));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, kv3));
assertFalse(decoder.advance());
dis.close();
assertEquals(offset, cis.getCount());
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.codec;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.io.CountingInputStream;
import com.google.common.io.CountingOutputStream;
@Category(SmallTests.class)
public class TestCellCodecV2 {
@Test
public void testCellWithTag() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountingOutputStream cos = new CountingOutputStream(baos);
DataOutputStream dos = new DataOutputStream(cos);
Codec codec = new CellCodecV2();
Codec.Encoder encoder = codec.getEncoder(dos);
final Cell cell1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
new Tag((byte) 1, Bytes.toBytes("teststring1")),
new Tag((byte) 2, Bytes.toBytes("teststring2")) });
final Cell cell2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
Bytes.toBytes("teststring3")), });
final Cell cell3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
new Tag((byte) 2, Bytes.toBytes("teststring4")),
new Tag((byte) 2, Bytes.toBytes("teststring5")),
new Tag((byte) 1, Bytes.toBytes("teststring6")) });
encoder.write(cell1);
encoder.write(cell2);
encoder.write(cell3);
encoder.flush();
dos.close();
long offset = cos.getCount();
CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
DataInputStream dis = new DataInputStream(cis);
Codec.Decoder decoder = codec.getDecoder(dis);
assertTrue(decoder.advance());
Cell c = decoder.current();
assertTrue(CellComparator.equals(c, cell1));
List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(2, tags.size());
Tag tag = tags.get(0);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
tag = tags.get(1);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, cell2));
tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(1, tags.size());
tag = tags.get(0);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, cell3));
tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(3, tags.size());
tag = tags.get(0);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
tag = tags.get(1);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
tag = tags.get(2);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
assertFalse(decoder.advance());
dis.close();
assertEquals(offset, cis.getCount());
}
}