HBASE-22622 WALKey Extended Attributes (#352)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Geoffrey Jacoby 2019-07-02 14:32:24 -07:00 committed by Andrew Purtell
parent 48c623c808
commit 1ad48c1ebc
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
6 changed files with 159 additions and 30 deletions

View File

@ -63,14 +63,20 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;
repeated Attribute extended_attributes = 12;
/*
optional CustomEntryType custom_entry_type = 9;
/*
optional CustomEntryType custom_entry_type = 9;
enum CustomEntryType {
COMPACTION = 0;
}
*/
enum CustomEntryType {
COMPACTION = 0;
}
*/
}
message Attribute {
required string key = 1;
required bytes value = 2;
}
enum ScopeType {

View File

@ -62,7 +62,7 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;
repeated Attribute extended_attributes = 12;
/*
optional CustomEntryType custom_entry_type = 9;
@ -71,6 +71,10 @@ message WALKey {
}
*/
}
message Attribute {
required string key = 1;
required bytes value = 2;
}
enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
@ -71,7 +72,7 @@ public class WALUtil {
MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@ -87,7 +88,7 @@ public class WALUtil {
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@ -103,7 +104,7 @@ public class WALUtil {
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc);
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@ -125,7 +126,7 @@ public class WALUtil {
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
@ -133,11 +134,15 @@ public class WALUtil {
}
private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
final NavigableMap<byte[], Integer> replicationScope,
final RegionInfo hri,
final WALEdit edit,
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
return doFullAppendTransaction(wal, replicationScope, hri,
edit, mvcc, extendedAttributes, true);
}
/**
@ -150,11 +155,12 @@ public class WALUtil {
*/
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope);
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.append(hri, walKey, edit, false);

View File

@ -86,6 +86,21 @@ public interface WALKey extends SequenceId, Comparable<WALKey> {
*/
long getOrigLogSeqNum();
/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
* @param attributeKey The key of a key / value pair
*/
default byte[] getExtendedAttribute(String attributeKey){
return null;
}
/**
* Returns a map of all extended attributes injected into this WAL key.
*/
default Map<String, byte[]> getExtendedAttributes() {
return new HashMap<>();
}
/**
* Produces a string map for this key. Useful for programmatic use and
* manipulation of the data stored in an WALKeyImpl, for example, printing
@ -98,6 +113,12 @@ public interface WALKey extends SequenceId, Comparable<WALKey> {
stringMap.put("table", getTableName());
stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName()));
stringMap.put("sequence", getSequenceId());
Map<String, byte[]> extendedAttributes = getExtendedAttributes();
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> entry : extendedAttributes.entrySet()){
stringMap.put(entry.getKey(), Bytes.toStringBinary(entry.getValue()));
}
}
return stringMap;
}
}

View File

@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@ -116,14 +118,16 @@ public class WALKeyImpl implements WALKey {
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
private Map<String, byte[]> extendedAttributes;
public WALKeyImpl() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null, null);
}
public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope, null);
}
@VisibleForTesting
@ -132,7 +136,7 @@ public class WALKeyImpl implements WALKey {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, null);
HConstants.NO_NONCE, null, null, null);
}
@VisibleForTesting
@ -141,7 +145,7 @@ public class WALKeyImpl implements WALKey {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, null);
HConstants.NO_NONCE, mvcc, null, null);
}
// TODO: Fix being able to pass in sequenceid.
@ -153,20 +157,28 @@ public class WALKeyImpl implements WALKey {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null, null);
null, null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
HConstants.NO_NONCE, null, replicationScope, null);
}
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
HConstants.NO_NONCE, mvcc, replicationScope, null);
}
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes);
}
public WALKeyImpl(final byte[] encodedRegionName,
@ -180,7 +192,7 @@ public class WALKeyImpl implements WALKey {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc, null);
mvcc, null, null);
}
/**
@ -206,7 +218,7 @@ public class WALKeyImpl implements WALKey {
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}
/**
@ -231,7 +243,8 @@ public class WALKeyImpl implements WALKey {
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup,
nonce, mvcc, null, null);
}
/**
@ -252,7 +265,7 @@ public class WALKeyImpl implements WALKey {
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
null, null);
}
/**
@ -275,7 +288,7 @@ public class WALKeyImpl implements WALKey {
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}
/**
@ -304,7 +317,22 @@ public class WALKeyImpl implements WALKey {
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc, null);
mvcc, null, null);
}
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes){
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
clusterIds,
nonceGroup,
nonce,
mvcc, replicationScope, extendedAttributes);
}
@InterfaceAudience.Private
@ -316,7 +344,8 @@ public class WALKeyImpl implements WALKey {
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@ -329,6 +358,7 @@ public class WALKeyImpl implements WALKey {
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
this.extendedAttributes = extendedAttributes;
}
// For deserialization. DO NOT USE. See setWriteEntry below.
@ -434,6 +464,17 @@ public class WALKeyImpl implements WALKey {
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
}
@Override
public byte[] getExtendedAttribute(String attributeKey){
return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;
}
@Override
public Map<String, byte[]> getExtendedAttributes(){
return extendedAttributes != null ? new HashMap<String, byte[]>(extendedAttributes) :
new HashMap<String, byte[]>();
}
@Override
public String toString() {
return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId;
@ -539,6 +580,14 @@ public class WALKeyImpl implements WALKey {
.setScopeType(ScopeType.forNumber(e.getValue())));
}
}
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> e : extendedAttributes.entrySet()){
WALProtos.Attribute attr = WALProtos.Attribute.newBuilder().
setKey(e.getKey()).setValue(compressor.compress(e.getValue(),
CompressionContext.DictionaryIndex.TABLE)).build();
builder.addExtendedAttributes(attr);
}
}
return builder;
}
@ -573,6 +622,14 @@ public class WALKeyImpl implements WALKey {
if (walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
if (walKey.getExtendedAttributesCount() > 0){
this.extendedAttributes = new HashMap<>(walKey.getExtendedAttributesCount());
for (WALProtos.Attribute attr : walKey.getExtendedAttributesList()){
byte[] value =
uncompressor.uncompress(attr.getValue(), CompressionContext.DictionaryIndex.TABLE);
extendedAttributes.put(attr.getKey(), value);
}
}
}
@Override

View File

@ -27,9 +27,13 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
@ -52,6 +56,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -64,6 +69,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -72,6 +78,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
@ -333,6 +341,33 @@ public class TestWALEntryStream {
}
}
@Test
public void testWALKeySerialization() throws Exception {
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
attributes.put("foo", Bytes.toBytes("foo-value"));
attributes.put("bar", Bytes.toBytes("bar-value"));
WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L,
mvcc, scopes, attributes);
Assert.assertEquals(attributes, key.getExtendedAttributes());
WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
WALProtos.WALKey serializedKey = builder.build();
WALKeyImpl deserializedKey = new WALKeyImpl();
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
//equals() only checks region name, sequence id and write time
Assert.assertEquals(key, deserializedKey);
//can't use Map.equals() because byte arrays use reference equality
Assert.assertEquals(key.getExtendedAttributes().keySet(),
deserializedKey.getExtendedAttributes().keySet());
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
}
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
}
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));