HBASE-10322 Strip tags from KV while sending back to client on reads.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1560265 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2014-01-22 07:19:45 +00:00
parent c079ba4660
commit 508f7697a5
20 changed files with 401 additions and 98 deletions

View File

@ -47,7 +47,8 @@ class HConnectionKey {
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.HBASE_META_SCANNER_CACHING,
HConstants.HBASE_CLIENT_INSTANCE_ID };
HConstants.HBASE_CLIENT_INSTANCE_ID,
HConstants.RPC_CODEC_CONF_KEY };
private Map<String, String> properties;
private String username;

View File

@ -126,7 +126,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
if (connection == null) return true; // Default is to do cellblocks.
Configuration configuration = connection.getConfiguration();
if (configuration == null) return true;
String codec = configuration.get("hbase.client.rpc.codec", "");
String codec = configuration.get(HConstants.RPC_CODEC_CONF_KEY, "");
return codec != null && codec.length() > 0;
}

View File

@ -147,13 +147,8 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
return addImmutable(family, qualifier, this.ts, value);
}
public Put add(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
return add(family, qualifier, this.ts, value, tag);
}
/**
* See {@link #add(byte[], byte[], byte[], Tag[] tag)}. This version expects
* that the underlying arrays won't change. It's intended
* This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
public Put addImmutable(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
@ -197,20 +192,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
}
/**
* Forms a keyvalue with tags
*/
@SuppressWarnings("unchecked")
public Put add(byte[] family, byte[] qualifier, long ts, byte[] value, Tag[] tag) {
List<Cell> list = getCellList(family);
KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
list.add(kv);
familyMap.put(CellUtil.cloneFamily(kv), list);
return this;
}
/**
* See {@link #add(byte[], byte[], long, byte[], Tag[] tag)}. This version expects
* that the underlying arrays won't change. It's intended
* This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
@SuppressWarnings("unchecked")
@ -223,30 +205,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
}
/**
* Add the specified column and value, with the specified timestamp as
* its version to this Put operation.
* @param family family name
* @param qualifier column qualifier
* @param ts version timestamp
* @param value column value
* @param tag the tags
* @return this
*/
public Put add(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value, Tag[] tag) {
if (ts < 0) {
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
}
List<Cell> list = getCellList(family);
KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
list.add(kv);
familyMap.put(CellUtil.cloneFamily(kv), list);
return this;
}
/**
* See {@link #add(byte[], ByteBuffer, long, ByteBuffer, Tag[] tag)}. This version expects
* that the underlying arrays won't change. It's intended
* This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
public Put addImmutable(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value,

View File

@ -1298,9 +1298,9 @@ public class RpcClient {
* @return Codec to use on this client.
*/
Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be the empty string AND
// "hbase.client.default.rpc.codec" -- because default is to do cell block encoding.
String className = conf.get("hbase.client.rpc.codec", getDefaultCodec(this.conf));
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
if (className == null || className.length() == 0) return null;
try {
return (Codec)Class.forName(className).newInstance();

View File

@ -675,7 +675,10 @@ public final class ProtobufUtil {
"Missing required field: qualifer value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = qv.getTags().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
append.add(CellUtil.createCell(row, family, qualifier, append.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}
@ -750,7 +753,10 @@ public final class ProtobufUtil {
throw new DoNotRetryIOException("Missing required field: qualifer value");
}
byte[] value = qv.getValue().toByteArray();
byte[] tags = qv.getTags().toByteArray();
byte[] tags = null;
if (qv.hasTags()) {
tags = qv.getTags().toByteArray();
}
increment.add(CellUtil.createCell(row, family, qualifier, increment.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}

View File

@ -973,6 +973,12 @@ public final class HConstants {
/** Configuration key for enabling HLog encryption, a boolean */
public static final String ENABLE_WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
/** Configuration key for setting RPC codec class name */
public static final String RPC_CODEC_CONF_KEY = "hbase.client.rpc.codec";
/** Configuration key for setting replication codec class name */
public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -2840,7 +2840,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
*/
public static long oswrite(final KeyValue kv, final OutputStream out) throws IOException {
@Deprecated
public static long oswrite(final KeyValue kv, final OutputStream out)
throws IOException {
int length = kv.getLength();
// This does same as DataOuput#writeInt (big-endian, etc.)
out.write(Bytes.toBytes(length));
@ -2848,6 +2850,30 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
return length + Bytes.SIZEOF_INT;
}
/**
* Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
* not require a {@link DataOutput}, just take plain {@link OutputStream}
* Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
* @param kv
* @param out
* @param withTags
* @return Length written on stream
* @throws IOException
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
*/
public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
throws IOException {
int length = kv.getLength();
if (!withTags) {
length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
// This does same as DataOuput#writeInt (big-endian, etc.)
out.write(Bytes.toBytes(length));
out.write(kv.getBuffer(), kv.getOffset(), length);
return length + Bytes.SIZEOF_INT;
}
/**
* Comparator that compares row component only of a KeyValue.
*/

View File

@ -30,9 +30,10 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
* Uses ints delimiting all lengths. Profligate. Needs tune up.
* <b>Use this Codec only at server side.</b>
*/
@InterfaceAudience.Private
public class CellCodecV2 implements Codec {
public class CellCodecWithTags implements Codec {
static class CellEncoder extends BaseEncoder {
CellEncoder(final OutputStream out) {
super(out);
@ -61,7 +62,7 @@ public class CellCodecV2 implements Codec {
/**
* Write int length followed by array bytes.
*
*
* @param bytes
* @param offset
* @param length
@ -118,4 +119,4 @@ public class CellCodecV2 implements Codec {
public Encoder getEncoder(OutputStream os) {
return new CellEncoder(os);
}
}
}

View File

@ -55,7 +55,8 @@ public class KeyValueCodec implements Codec {
checkFlushed();
// This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
// make expensive copy.
KeyValue.oswrite((KeyValue)KeyValueUtil.ensureKeyValue(cell), this.out);
// Do not write tags over RPC
KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, false);
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
/**
* Codec that does KeyValue version 1 serialization with serializing tags also.
*
* <p>
* Encodes by casting Cell to KeyValue and writing out the backing array with a length prefix. This
* is how KVs were serialized in Puts, Deletes and Results pre-0.96. Its what would happen if you
* called the Writable#write KeyValue implementation. This encoder will fail if the passed Cell is
* not an old-school pre-0.96 KeyValue. Does not copy bytes writing. It just writes them direct to
* the passed stream.
*
* <p>
* If you wrote two KeyValues to this encoder, it would look like this in the stream:
*
* <pre>
* length-of-KeyValue1 // A java int with the length of KeyValue1 backing array
* KeyValue1 backing array filled with a KeyValue serialized in its particular format
* length-of-KeyValue2
* KeyValue2 backing array
* </pre>
*
* Note: The only difference of this with KeyValueCodec is the latter ignores tags in KeyValues.
* <b>Use this Codec only at server side.</b>
*/
@InterfaceAudience.Private
public class KeyValueCodecWithTags implements Codec {
public static class KeyValueEncoder extends BaseEncoder {
public KeyValueEncoder(final OutputStream out) {
super(out);
}
@Override
public void write(Cell cell) throws IOException {
checkFlushed();
// This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
// make expensive copy.
// Write tags
KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, true);
}
}
public static class KeyValueDecoder extends BaseDecoder {
public KeyValueDecoder(final InputStream in) {
super(in);
}
protected Cell parseCell() throws IOException {
return KeyValue.iscreate(in);
}
}
/**
* Implementation depends on {@link InputStream#available()}
*/
@Override
public Decoder getDecoder(final InputStream is) {
return new KeyValueDecoder(is);
}
@Override
public Encoder getEncoder(OutputStream os) {
return new KeyValueEncoder(os);
}
}

View File

@ -42,14 +42,14 @@ import com.google.common.io.CountingInputStream;
import com.google.common.io.CountingOutputStream;
@Category(SmallTests.class)
public class TestCellCodecV2 {
public class TestCellCodecWithTags {
@Test
public void testCellWithTag() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountingOutputStream cos = new CountingOutputStream(baos);
DataOutputStream dos = new DataOutputStream(cos);
Codec codec = new CellCodecV2();
Codec codec = new CellCodecWithTags();
Codec.Encoder encoder = codec.getEncoder(dos);
final Cell cell1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
@ -110,4 +110,4 @@ public class TestCellCodecV2 {
dis.close();
assertEquals(offset, cis.getCount());
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.codec;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.io.CountingInputStream;
import com.google.common.io.CountingOutputStream;
@Category(SmallTests.class)
public class TestKeyValueCodecWithTags {
@Test
public void testKeyValueWithTag() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CountingOutputStream cos = new CountingOutputStream(baos);
DataOutputStream dos = new DataOutputStream(cos);
Codec codec = new KeyValueCodecWithTags();
Codec.Encoder encoder = codec.getEncoder(dos);
final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
new Tag((byte) 1, Bytes.toBytes("teststring1")),
new Tag((byte) 2, Bytes.toBytes("teststring2")) });
final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
Bytes.toBytes("teststring3")), });
final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
new Tag((byte) 2, Bytes.toBytes("teststring4")),
new Tag((byte) 2, Bytes.toBytes("teststring5")),
new Tag((byte) 1, Bytes.toBytes("teststring6")) });
encoder.write(kv1);
encoder.write(kv2);
encoder.write(kv3);
encoder.flush();
dos.close();
long offset = cos.getCount();
CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
DataInputStream dis = new DataInputStream(cis);
Codec.Decoder decoder = codec.getDecoder(dis);
assertTrue(decoder.advance());
Cell c = decoder.current();
assertTrue(CellComparator.equals(c, kv1));
List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(2, tags.size());
Tag tag = tags.get(0);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
tag = tags.get(1);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, kv2));
tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(1, tags.size());
tag = tags.get(0);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
assertTrue(decoder.advance());
c = decoder.current();
assertTrue(CellComparator.equals(c, kv3));
tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
assertEquals(3, tags.size());
tag = tags.get(0);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
tag = tags.get(1);
assertEquals(2, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
tag = tags.get(2);
assertEquals(1, tag.getType());
assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
assertFalse(decoder.advance());
dis.close();
assertEquals(offset, cis.getCount());
}
}

View File

@ -293,14 +293,16 @@ public class WALCellCodec implements Codec {
}
}
public class EnsureKvEncoder extends KeyValueCodec.KeyValueEncoder {
public class EnsureKvEncoder extends BaseEncoder {
public EnsureKvEncoder(OutputStream out) {
super(out);
}
@Override
public void write(Cell cell) throws IOException {
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
super.write(cell);
checkFlushed();
// Make sure to write tags into WAL
KeyValue.oswrite((KeyValue) cell, this.out, true);
}
}

View File

@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -98,6 +99,10 @@ public class ReplicationSink {
this.conf.getInt("replication.sink.client.retries.number", 4));
this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
this.conf.getInt("replication.sink.client.ops.timeout", 10000));
String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
}
/**

View File

@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -38,6 +39,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@ -144,7 +147,8 @@ public class ReplicationSource extends Thread
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId) throws IOException {
this.stopper = stopper;
this.conf = conf;
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
@ -154,12 +158,12 @@ public class ReplicationSource extends Thread
maxRetriesMultiplier * maxRetriesMultiplier);
this.queue =
new PriorityBlockingQueue<Path>(
conf.getInt("hbase.regionserver.maxlogs", 32),
this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = HConnectionManager.getConnection(conf);
this.conn = HConnectionManager.getConnection(this.conf);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
@ -174,10 +178,16 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}
private void decorateConf() {
String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
}
@Override
public void enqueueLog(Path log) {

View File

@ -1180,7 +1180,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
@ -1239,7 +1241,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
void testRow(final int i) throws IOException {
Put put = new Put(format(i));
byte[] row = format(i);
Put put = new Put(row);
byte[] value = generateData(this.rand, VALUE_LENGTH);
if (useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
@ -1248,7 +1251,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Tag;
@ -148,11 +149,13 @@ public class TestEncodedSeekers {
Put put = new Put(key);
byte[] col = Bytes.toBytes(String.valueOf(j));
byte[] value = dataGenerator.generateRandomSizeValue(key, col);
put.add(CF_BYTES, col, value);
if(includeTags) {
if (includeTags) {
Tag[] tag = new Tag[1];
tag[0] = new Tag((byte)1, "Visibility");
put.add(CF_BYTES, col, value, tag);
tag[0] = new Tag((byte) 1, "Visibility");
KeyValue kv = new KeyValue(key, CF_BYTES, col, HConstants.LATEST_TIMESTAMP, value, tag);
put.add(kv);
} else {
put.add(CF_BYTES, col, value);
}
if(VERBOSE){
KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Tag;
@ -389,7 +390,9 @@ public class TestCacheOnWrite {
Tag t = new Tag((byte) 1, "visibility");
Tag[] tags = new Tag[1];
tags[0] = t;
p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr), tags);
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
p.add(kv);
} else {
p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -300,10 +301,8 @@ public class TestTags {
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
Tag[] tags = new Tag[1];
tags[0] = new Tag((byte) 1, "ram");
put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value, tags);
// put.setAttribute("visibility", Bytes.toBytes("myTag"));
put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
@ -336,8 +335,8 @@ public class TestTags {
table.put(put2);
put2 = new Put(rowe);
value2 = Bytes.toBytes("1000dfsddfdf");
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2, tags);
// put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put2);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
@ -418,90 +417,116 @@ public class TestTags {
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row1);
byte[] v = Bytes.toBytes(2L);
put.add(f, q, v, new Tag[] { new Tag((byte) 1, "tag1") });
put.add(f, q, v);
put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Increment increment = new Increment(row1);
increment.addColumn(f, q, 1L);
table.increment(increment);
TestCoprocessorForTags.checkTagPresence = true;
ResultScanner scanner = table.getScanner(new Scan());
Result result = scanner.next();
KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
List<Tag> tags = kv.getTags();
List<Tag> tags = TestCoprocessorForTags.tags;
assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
increment = new Increment(row1);
increment.add(new KeyValue(row1, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
increment.add(new KeyValue(row1, f, q, 1234L, v));
increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan());
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
put = new Put(row2);
v = Bytes.toBytes(2L);
put.add(f, q, v);
table.put(put);
increment = new Increment(row2);
increment.add(new KeyValue(row2, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
increment.add(new KeyValue(row2, f, q, 1234L, v));
increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
Scan scan = new Scan();
scan.setStartRow(row2);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
tags = TestCoprocessorForTags.tags;
assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
// Test Append
byte[] row3 = Bytes.toBytes("r3");
put = new Put(row3);
put.add(f, q, Bytes.toBytes("a"), new Tag[] { new Tag((byte) 1, "tag1") });
put.add(f, q, Bytes.toBytes("a"));
put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Append append = new Append(row3);
append.add(f, q, Bytes.toBytes("b"));
table.append(append);
scan = new Scan();
scan.setStartRow(row3);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
append = new Append(row3);
append.add(new KeyValue(row3, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
append.add(new KeyValue(row3, f, q, 1234L, v));
append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
byte[] row4 = Bytes.toBytes("r4");
put = new Put(row4);
put.add(f, q, Bytes.toBytes("a"));
table.put(put);
append = new Append(row4);
append.add(new KeyValue(row4, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
append.add(new KeyValue(row4, f, q, 1234L, v));
append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
scan = new Scan();
scan.setStartRow(row4);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = kv.getTags();
tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
} finally {
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
if (table != null) {
table.close();
}
@ -543,14 +568,22 @@ public class TestTags {
}
public static class TestCoprocessorForTags extends BaseRegionObserver {
public static boolean checkTagPresence = false;
public static List<Tag> tags = null;
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
byte[] attribute = put.getAttribute("visibility");
updateMutationAddingTags(put);
}
private void updateMutationAddingTags(final Mutation m) {
byte[] attribute = m.getAttribute("visibility");
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<Cell>();
if (attribute != null) {
for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
@ -567,10 +600,41 @@ public class TestTags {
((List<Cell>) updatedCells).add(newKV);
}
}
put.getFamilyCellMap().remove(cf);
m.getFamilyCellMap().remove(cf);
// Update the family map
put.getFamilyCellMap().put(cf, updatedCells);
m.getFamilyCellMap().put(cf, updatedCells);
}
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
throws IOException {
updateMutationAddingTags(increment);
return super.preIncrement(e, increment);
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
throws IOException {
updateMutationAddingTags(append);
return super.preAppend(e, append);
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
if (checkTagPresence) {
if (results.size() > 0) {
// Check tag presence in the 1st cell in 1st Result
Result result = results.get(0);
CellScanner cellScanner = result.cellScanner();
if (cellScanner.advance()) {
Cell cell = cellScanner.current();
tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
}
}
}
return hasMore;
}
}
}

View File

@ -44,7 +44,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Durability;
@ -1100,7 +1102,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
@ -1159,7 +1163,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
void testRow(final int i) throws IOException {
Put put = new Put(format(i));
byte[] row = format(i);
Put put = new Put(row);
byte[] value = generateData(this.rand, ROW_LENGTH);
if (useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
@ -1168,7 +1173,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}