From 9a2e680caeb8c6627357ff5a0963170f09e65414 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 8 Sep 2017 16:01:33 -0700 Subject: [PATCH] HBASE-16478 Rename WALKey in PB to WALEdit This is a rebase of Enis's original patch --- .../src/main/protobuf/Admin.proto | 2 +- .../src/main/protobuf/WAL.proto | 13 +------ .../protobuf/ReplicationProtbufUtil.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 30 +++++++-------- .../regionserver/wal/ProtobufLogReader.java | 9 +++-- .../regionserver/ReplicationSink.java | 8 ++-- .../org/apache/hadoop/hbase/wal/WALKey.java | 38 +++++++++---------- .../apache/hadoop/hbase/wal/WALSplitter.java | 16 ++++---- .../regionserver/TestReplicationSink.java | 12 +++--- 9 files changed, 59 insertions(+), 71 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index 62aac9aa132..db5a3befed0 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -172,7 +172,7 @@ message UpdateFavoredNodesResponse { // Protocol buffer version of WAL for replication message WALEntry { - required WALKey key = 1; + required WALEdit edit = 1; // Following may be null if the KVs/Cells are carried along the side in a cellblock (See // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null // and associated_cell_count has count of Cells associated w/ this WALEntry diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 81e56504629..4c9a1715777 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -34,10 +34,9 @@ message WALHeader { } /* - * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header - * for some KVs + * Protocol buffer version of WALEdit; */ -message WALKey { +message WALEdit { required bytes encoded_region_name = 1; required bytes table_name = 2; required uint64 log_sequence_number = 3; @@ -62,14 +61,6 @@ message WALKey { optional uint64 nonceGroup = 9; optional uint64 nonce = 10; optional uint64 orig_sequence_number = 11; - -/* - optional CustomEntryType custom_entry_type = 9; - - enum CustomEntryType { - COMPACTION = 0; - } -*/ } enum ScopeType { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 8f681f0a341..1e201eee807 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -109,7 +109,7 @@ public class ReplicationProtbufUtil { for (Entry entry: entries) { entryBuilder.clear(); // TODO: this duplicates a lot in WALKey#getBuilder - WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); + WALProtos.WALEdit.Builder keyBuilder = entryBuilder.getEditBuilder(); WALKey key = entry.getKey(); keyBuilder.setEncodedRegionName( UnsafeByteOperations.unsafeWrap(encodedRegionName == null diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1b944dc04c1..1e44d866b24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -596,13 +596,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param row * @param family * @param qualifier - * @param compareOp + * @param op * @param comparator @throws IOException */ private boolean checkAndRowMutate(final Region region, final List actions, - final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, - ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { + final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, + CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, + ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } @@ -647,10 +647,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Execute an append mutation. - * - * @param region - * @param m - * @param cellScanner * @return result to return to client if default operation should be * bypassed as indicated by RegionObserver, null otherwise * @throws IOException @@ -2077,7 +2073,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // empty input return ReplicateWALEntryResponse.newBuilder().build(); } - ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + ByteString regionName = entries.get(0).getEdit().getEncodedRegionName(); Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); RegionCoprocessorHost coprocessorHost = ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) @@ -2090,19 +2086,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; for (WALEntry entry : entries) { - if (!regionName.equals(entry.getKey().getEncodedRegionName())) { + if (!regionName.equals(entry.getEdit().getEncodedRegionName())) { throw new NotServingRegionException("Replay request contains entries from multiple " + "regions. First region:" + regionName.toStringUtf8() + " , other region:" - + entry.getKey().getEncodedRegionName()); + + entry.getEdit().getEncodedRegionName()); } if (regionServer.nonceManager != null && isPrimary) { - long nonceGroup = entry.getKey().hasNonceGroup() - ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; - long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; + long nonceGroup = entry.getEdit().hasNonceGroup() + ? entry.getEdit().getNonceGroup() : HConstants.NO_NONCE; + long nonce = entry.getEdit().hasNonce() ? entry.getEdit().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal( nonceGroup, nonce, - entry.getKey().getWriteTime()); + entry.getEdit().getWriteTime()); } Pair walEntry = (coprocessorHost == null) ? null : new Pair<>(); List edits = WALSplitter.getMutationsFromWALEntry(entry, @@ -2121,8 +2117,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // HBASE-17924 // sort to improve lock efficiency Collections.sort(edits); - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? - entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); + long replaySeqId = (entry.getEdit().hasOrigSequenceNumber()) ? + entry.getEdit().getOrigSequenceNumber() : entry.getEdit().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 593a7612358..eb4c460c751 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALEdit; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -333,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase { } return false; } - WALKey.Builder builder = WALKey.newBuilder(); + WALEdit.Builder builder = WALEdit.newBuilder(); long size = 0; try { long available = -1; @@ -363,11 +363,12 @@ public class ProtobufLogReader extends ReaderBase { throw new EOFException("Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); } - WALKey walKey = builder.build(); + WALEdit walKey = builder.build(); entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { if (LOG.isTraceEnabled()) { - LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos()); + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + + this.inputStream.getPos()); } continue; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 80ef6e398a5..a58540104ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -159,7 +159,7 @@ public class ReplicationSink { for (WALEntry entry : entries) { TableName table = - TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName.valueOf(entry.getEdit().getTableName().toByteArray()); Cell previousCell = null; Mutation m = null; int count = entry.getAssociatedCellCount(); @@ -183,8 +183,8 @@ public class ReplicationSink { CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + List clusterIds = new ArrayList<>(entry.getEdit().getClusterIdsList().size()); + for (HBaseProtos.UUID clusterId : entry.getEdit().getClusterIdsList()) { clusterIds.add(toUUID(clusterId)); } m.setClusterIds(clusterIds); @@ -221,7 +221,7 @@ public class ReplicationSink { } int size = entries.size(); - this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); + this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getEdit().getWriteTime()); this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated); this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 751ceba7937..bce510336b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -539,9 +539,9 @@ public class WALKey implements SequenceId, Comparable { this.encodedRegionName = encodedRegionName; } - public WALProtos.WALKey.Builder getBuilder( + public WALProtos.WALEdit.Builder getBuilder( WALCellCodec.ByteStringCompressor compressor) throws IOException { - WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); + WALProtos.WALEdit.Builder builder = WALProtos.WALEdit.newBuilder(); if (compressionContext == null) { builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); @@ -580,42 +580,42 @@ public class WALKey implements SequenceId, Comparable { return builder; } - public void readFieldsFromPb(WALProtos.WALKey walKey, + public void readFieldsFromPb(WALProtos.WALEdit walEdit, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { if (this.compressionContext != null) { this.encodedRegionName = uncompressor.uncompress( - walKey.getEncodedRegionName(), compressionContext.regionDict); + walEdit.getEncodedRegionName(), compressionContext.regionDict); byte[] tablenameBytes = uncompressor.uncompress( - walKey.getTableName(), compressionContext.tableDict); + walEdit.getTableName(), compressionContext.tableDict); this.tablename = TableName.valueOf(tablenameBytes); } else { - this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); - this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); + this.encodedRegionName = walEdit.getEncodedRegionName().toByteArray(); + this.tablename = TableName.valueOf(walEdit.getTableName().toByteArray()); } clusterIds.clear(); - for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { + for (HBaseProtos.UUID clusterId : walEdit.getClusterIdsList()) { clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } - if (walKey.hasNonceGroup()) { - this.nonceGroup = walKey.getNonceGroup(); + if (walEdit.hasNonceGroup()) { + this.nonceGroup = walEdit.getNonceGroup(); } - if (walKey.hasNonce()) { - this.nonce = walKey.getNonce(); + if (walEdit.hasNonce()) { + this.nonce = walEdit.getNonce(); } this.replicationScope = null; - if (walKey.getScopesCount() > 0) { + if (walEdit.getScopesCount() > 0) { this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (FamilyScope scope : walKey.getScopesList()) { + for (FamilyScope scope : walEdit.getScopesList()) { byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); this.replicationScope.put(family, scope.getScopeType().getNumber()); } } - setSequenceId(walKey.getLogSequenceNumber()); - this.writeTime = walKey.getWriteTime(); - if(walKey.hasOrigSequenceNumber()) { - this.origLogSeqNum = walKey.getOrigSequenceNumber(); + setSequenceId(walEdit.getLogSequenceNumber()); + this.writeTime = walEdit.getWriteTime(); + if(walEdit.hasOrigSequenceNumber()) { + this.origLogSeqNum = walEdit.getOrigSequenceNumber(); } } @@ -644,4 +644,4 @@ public class WALKey implements SequenceId, Comparable { } return size; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 4a9741a4908..986509aa26c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -2338,8 +2338,8 @@ public class WALSplitter { return new ArrayList<>(); } - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? - entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); + long replaySeqId = (entry.getEdit().hasOrigSequenceNumber()) ? + entry.getEdit().getOrigSequenceNumber() : entry.getEdit().getLogSequenceNumber(); int count = entry.getAssociatedCellCount(); List mutations = new ArrayList<>(); Cell previousCell = null; @@ -2369,9 +2369,9 @@ public class WALSplitter { } else { m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); // Puts might come from increment or append, thus we need nonces. - long nonceGroup = entry.getKey().hasNonceGroup() - ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; - long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; + long nonceGroup = entry.getEdit().hasNonceGroup() + ? entry.getEdit().getNonceGroup() : HConstants.NO_NONCE; + long nonce = entry.getEdit().hasNonce() ? entry.getEdit().getNonce() : HConstants.NO_NONCE; mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); } } @@ -2386,10 +2386,10 @@ public class WALSplitter { // reconstruct WALKey if (logEntry != null) { - org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = - entry.getKey(); + org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALEdit walKeyProto = + entry.getEdit(); List clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); - for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { + for (HBaseProtos.UUID uuid : entry.getEdit().getClusterIdsList()) { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new WALKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 0e08c90fd5f..d850993cbbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -60,8 +60,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -301,7 +300,7 @@ public class TestReplicationSink { // 3. Create a BulkLoadDescriptor and a WALEdit Map> storeFiles = new HashMap<>(1); storeFiles.put(FAM_NAME1, p); - WALEdit edit = null; + org.apache.hadoop.hbase.regionserver.wal.WALEdit edit = null; WALProtos.BulkLoadDescriptor loadDescriptor = null; try (Connection c = ConnectionFactory.createConnection(conf); @@ -311,7 +310,8 @@ public class TestReplicationSink { ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); - edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); + edit = org.apache.hadoop.hbase.regionserver.wal.WALEdit.createBulkLoadEvent(regionInfo, + loadDescriptor); } List entries = new ArrayList<>(1); @@ -376,7 +376,7 @@ public class TestReplicationSink { private WALEntry.Builder createWALEntryBuilder(TableName table) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); - WALKey.Builder keyBuilder = WALKey.newBuilder(); + WALProtos.WALEdit.Builder keyBuilder = WALProtos.WALEdit.newBuilder(); UUID.Builder uuidBuilder = UUID.newBuilder(); uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); @@ -385,7 +385,7 @@ public class TestReplicationSink { keyBuilder.setWriteTime(System.currentTimeMillis()); keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); - builder.setKey(keyBuilder.build()); + builder.setEdit(keyBuilder.build()); return builder; } }