Revert "HBASE-16478 Rename WALKey in PB to WALEdit This is a rebase of Enis's original patch"

Not worth the difference it introduces; means hbase-protocol can no
longer parse a WAL entry.

This reverts commit 9a2e680cae.
This commit is contained in:
Michael Stack 2017-09-20 15:27:37 -07:00
parent e3896cfcc3
commit 37696fffe9
9 changed files with 67 additions and 54 deletions

View File

@ -172,7 +172,7 @@ message UpdateFavoredNodesResponse {
// Protocol buffer version of WAL for replication // Protocol buffer version of WAL for replication
message WALEntry { message WALEntry {
required WALEdit edit = 1; required WALKey key = 1;
// Following may be null if the KVs/Cells are carried along the side in a cellblock (See // Following may be null if the KVs/Cells are carried along the side in a cellblock (See
// RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null
// and associated_cell_count has count of Cells associated w/ this WALEntry // and associated_cell_count has count of Cells associated w/ this WALEntry

View File

@ -34,9 +34,10 @@ message WALHeader {
} }
/* /*
* Protocol buffer version of WALEdit; * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header
* for some KVs
*/ */
message WALEdit { message WALKey {
required bytes encoded_region_name = 1; required bytes encoded_region_name = 1;
required bytes table_name = 2; required bytes table_name = 2;
required uint64 log_sequence_number = 3; required uint64 log_sequence_number = 3;
@ -61,6 +62,14 @@ message WALEdit {
optional uint64 nonceGroup = 9; optional uint64 nonceGroup = 9;
optional uint64 nonce = 10; optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11; optional uint64 orig_sequence_number = 11;
/*
optional CustomEntryType custom_entry_type = 9;
enum CustomEntryType {
COMPACTION = 0;
}
*/
} }
enum ScopeType { enum ScopeType {

View File

@ -109,7 +109,7 @@ public class ReplicationProtbufUtil {
for (Entry entry: entries) { for (Entry entry: entries) {
entryBuilder.clear(); entryBuilder.clear();
// TODO: this duplicates a lot in WALKey#getBuilder // TODO: this duplicates a lot in WALKey#getBuilder
WALProtos.WALEdit.Builder keyBuilder = entryBuilder.getEditBuilder(); WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
WALKey key = entry.getKey(); WALKey key = entry.getKey();
keyBuilder.setEncodedRegionName( keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName == null UnsafeByteOperations.unsafeWrap(encodedRegionName == null

View File

@ -597,7 +597,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param row * @param row
* @param family * @param family
* @param qualifier * @param qualifier
* @param op * @param compareOp
* @param comparator @throws IOException * @param comparator @throws IOException
*/ */
private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
@ -648,6 +648,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/** /**
* Execute an append mutation. * Execute an append mutation.
*
* @param region
* @param m
* @param cellScanner
* @return result to return to client if default operation should be * @return result to return to client if default operation should be
* bypassed as indicated by RegionObserver, null otherwise * bypassed as indicated by RegionObserver, null otherwise
* @throws IOException * @throws IOException
@ -2073,7 +2077,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// empty input // empty input
return ReplicateWALEntryResponse.newBuilder().build(); return ReplicateWALEntryResponse.newBuilder().build();
} }
ByteString regionName = entries.get(0).getEdit().getEncodedRegionName(); ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
RegionCoprocessorHost coprocessorHost = RegionCoprocessorHost coprocessorHost =
ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
@ -2086,19 +2090,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
for (WALEntry entry : entries) { for (WALEntry entry : entries) {
if (!regionName.equals(entry.getEdit().getEncodedRegionName())) { if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
throw new NotServingRegionException("Replay request contains entries from multiple " + throw new NotServingRegionException("Replay request contains entries from multiple " +
"regions. First region:" + regionName.toStringUtf8() + " , other region:" "regions. First region:" + regionName.toStringUtf8() + " , other region:"
+ entry.getEdit().getEncodedRegionName()); + entry.getKey().getEncodedRegionName());
} }
if (regionServer.nonceManager != null && isPrimary) { if (regionServer.nonceManager != null && isPrimary) {
long nonceGroup = entry.getEdit().hasNonceGroup() long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getEdit().getNonceGroup() : HConstants.NO_NONCE; ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getEdit().hasNonce() ? entry.getEdit().getNonce() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
regionServer.nonceManager.reportOperationFromWal( regionServer.nonceManager.reportOperationFromWal(
nonceGroup, nonceGroup,
nonce, nonce,
entry.getEdit().getWriteTime()); entry.getKey().getWriteTime());
} }
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
@ -2117,8 +2121,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// HBASE-17924 // HBASE-17924
// sort to improve lock efficiency // sort to improve lock efficiency
Collections.sort(edits); Collections.sort(edits);
long replaySeqId = (entry.getEdit().hasOrigSequenceNumber()) ? long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getEdit().getOrigSequenceNumber() : entry.getEdit().getLogSequenceNumber(); entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
// check if it's a partial success // check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) { for (int i = 0; result != null && i < result.length; i++) {

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALEdit; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -333,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
} }
return false; return false;
} }
WALEdit.Builder builder = WALEdit.newBuilder(); WALKey.Builder builder = WALKey.newBuilder();
long size = 0; long size = 0;
try { try {
long available = -1; long available = -1;
@ -363,12 +363,11 @@ public class ProtobufLogReader extends ReaderBase {
throw new EOFException("Partial PB while reading WAL, " + throw new EOFException("Partial PB while reading WAL, " +
"probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
} }
WALEdit walKey = builder.build(); WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos());
this.inputStream.getPos());
} }
continue; continue;
} }

View File

@ -159,7 +159,7 @@ public class ReplicationSink {
for (WALEntry entry : entries) { for (WALEntry entry : entries) {
TableName table = TableName table =
TableName.valueOf(entry.getEdit().getTableName().toByteArray()); TableName.valueOf(entry.getKey().getTableName().toByteArray());
Cell previousCell = null; Cell previousCell = null;
Mutation m = null; Mutation m = null;
int count = entry.getAssociatedCellCount(); int count = entry.getAssociatedCellCount();
@ -183,8 +183,8 @@ public class ReplicationSink {
CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()); cell.getRowLength());
List<UUID> clusterIds = new ArrayList<>(entry.getEdit().getClusterIdsList().size()); List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
for (HBaseProtos.UUID clusterId : entry.getEdit().getClusterIdsList()) { for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId)); clusterIds.add(toUUID(clusterId));
} }
m.setClusterIds(clusterIds); m.setClusterIds(clusterIds);
@ -221,7 +221,7 @@ public class ReplicationSink {
} }
int size = entries.size(); int size = entries.size();
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getEdit().getWriteTime()); this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
this.totalReplicatedEdits.addAndGet(totalReplicated); this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) { } catch (IOException ex) {

View File

@ -539,9 +539,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.encodedRegionName = encodedRegionName; this.encodedRegionName = encodedRegionName;
} }
public WALProtos.WALEdit.Builder getBuilder( public WALProtos.WALKey.Builder getBuilder(
WALCellCodec.ByteStringCompressor compressor) throws IOException { WALCellCodec.ByteStringCompressor compressor) throws IOException {
WALProtos.WALEdit.Builder builder = WALProtos.WALEdit.newBuilder(); WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
if (compressionContext == null) { if (compressionContext == null) {
builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
@ -580,42 +580,42 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return builder; return builder;
} }
public void readFieldsFromPb(WALProtos.WALEdit walEdit, public void readFieldsFromPb(WALProtos.WALKey walKey,
WALCellCodec.ByteStringUncompressor uncompressor) WALCellCodec.ByteStringUncompressor uncompressor)
throws IOException { throws IOException {
if (this.compressionContext != null) { if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress( this.encodedRegionName = uncompressor.uncompress(
walEdit.getEncodedRegionName(), compressionContext.regionDict); walKey.getEncodedRegionName(), compressionContext.regionDict);
byte[] tablenameBytes = uncompressor.uncompress( byte[] tablenameBytes = uncompressor.uncompress(
walEdit.getTableName(), compressionContext.tableDict); walKey.getTableName(), compressionContext.tableDict);
this.tablename = TableName.valueOf(tablenameBytes); this.tablename = TableName.valueOf(tablenameBytes);
} else { } else {
this.encodedRegionName = walEdit.getEncodedRegionName().toByteArray(); this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walEdit.getTableName().toByteArray()); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
} }
clusterIds.clear(); clusterIds.clear();
for (HBaseProtos.UUID clusterId : walEdit.getClusterIdsList()) { for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
} }
if (walEdit.hasNonceGroup()) { if (walKey.hasNonceGroup()) {
this.nonceGroup = walEdit.getNonceGroup(); this.nonceGroup = walKey.getNonceGroup();
} }
if (walEdit.hasNonce()) { if (walKey.hasNonce()) {
this.nonce = walEdit.getNonce(); this.nonce = walKey.getNonce();
} }
this.replicationScope = null; this.replicationScope = null;
if (walEdit.getScopesCount() > 0) { if (walKey.getScopesCount() > 0) {
this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walEdit.getScopesList()) { for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
this.replicationScope.put(family, scope.getScopeType().getNumber()); this.replicationScope.put(family, scope.getScopeType().getNumber());
} }
} }
setSequenceId(walEdit.getLogSequenceNumber()); setSequenceId(walKey.getLogSequenceNumber());
this.writeTime = walEdit.getWriteTime(); this.writeTime = walKey.getWriteTime();
if(walEdit.hasOrigSequenceNumber()) { if(walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walEdit.getOrigSequenceNumber(); this.origLogSeqNum = walKey.getOrigSequenceNumber();
} }
} }

View File

@ -2337,8 +2337,8 @@ public class WALSplitter {
return new ArrayList<>(); return new ArrayList<>();
} }
long replaySeqId = (entry.getEdit().hasOrigSequenceNumber()) ? long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getEdit().getOrigSequenceNumber() : entry.getEdit().getLogSequenceNumber(); entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
int count = entry.getAssociatedCellCount(); int count = entry.getAssociatedCellCount();
List<MutationReplay> mutations = new ArrayList<>(); List<MutationReplay> mutations = new ArrayList<>();
Cell previousCell = null; Cell previousCell = null;
@ -2368,9 +2368,9 @@ public class WALSplitter {
} else { } else {
m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
// Puts might come from increment or append, thus we need nonces. // Puts might come from increment or append, thus we need nonces.
long nonceGroup = entry.getEdit().hasNonceGroup() long nonceGroup = entry.getKey().hasNonceGroup()
? entry.getEdit().getNonceGroup() : HConstants.NO_NONCE; ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
long nonce = entry.getEdit().hasNonce() ? entry.getEdit().getNonce() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
} }
} }
@ -2385,10 +2385,10 @@ public class WALSplitter {
// reconstruct WALKey // reconstruct WALKey
if (logEntry != null) { if (logEntry != null) {
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALEdit walKeyProto = org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
entry.getEdit(); entry.getKey();
List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
for (HBaseProtos.UUID uuid : entry.getEdit().getClusterIdsList()) { for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
} }
key = new WALKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( key = new WALKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -375,7 +376,7 @@ public class TestReplicationSink {
private WALEntry.Builder createWALEntryBuilder(TableName table) { private WALEntry.Builder createWALEntryBuilder(TableName table) {
WALEntry.Builder builder = WALEntry.newBuilder(); WALEntry.Builder builder = WALEntry.newBuilder();
builder.setAssociatedCellCount(1); builder.setAssociatedCellCount(1);
WALProtos.WALEdit.Builder keyBuilder = WALProtos.WALEdit.newBuilder(); WALKey.Builder keyBuilder = WALKey.newBuilder();
UUID.Builder uuidBuilder = UUID.newBuilder(); UUID.Builder uuidBuilder = UUID.newBuilder();
uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
@ -384,7 +385,7 @@ public class TestReplicationSink {
keyBuilder.setWriteTime(System.currentTimeMillis()); keyBuilder.setWriteTime(System.currentTimeMillis());
keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY));
keyBuilder.setLogSequenceNumber(-1); keyBuilder.setLogSequenceNumber(-1);
builder.setEdit(keyBuilder.build()); builder.setKey(keyBuilder.build());
return builder; return builder;
} }
} }