diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 50a49204e39..62dfd453904 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -3063,13 +3063,16 @@ public final class ProtobufUtil { * @param tableName The tableName into which the bulk load is being imported into. * @param encodedRegionName Encoded region name of the region which is being bulk loaded. * @param storeFiles A set of store files of a column family are bulk loaded. + * @param storeFilesSize Map of store files and their lengths * @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile * name * @return The WAL log marker for bulk loads. */ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, - ByteString encodedRegionName, Map> storeFiles, long bulkloadSeqId) { - BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() + ByteString encodedRegionName, Map> storeFiles, + Map storeFilesSize, long bulkloadSeqId) { + BulkLoadDescriptor.Builder desc = + BulkLoadDescriptor.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); @@ -3078,7 +3081,10 @@ public final class ProtobufUtil { .setFamilyName(ByteStringer.wrap(entry.getKey())) .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region for (Path path : entry.getValue()) { - builder.addStoreFile(path.getName()); + String name = path.getName(); + builder.addStoreFile(name); + Long size = storeFilesSize.get(name) == null ? (Long) 0L : storeFilesSize.get(name); + builder.setStoreFileSize(size); } desc.addStores(builder); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index d74688e17bc..6252d5168a5 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -7821,6 +7821,24 @@ public final class WALProtos { */ com.google.protobuf.ByteString getStoreFileBytes(int index); + + // optional uint64 store_file_size = 4; + /** + * optional uint64 store_file_size = 4; + * + *
+     * size of store file
+     * 
+ */ + boolean hasStoreFileSize(); + /** + * optional uint64 store_file_size = 4; + * + *
+     * size of store file
+     * 
+ */ + long getStoreFileSize(); } /** * Protobuf type {@code hbase.pb.StoreDescriptor} @@ -7891,6 +7909,11 @@ public final class WALProtos { storeFile_.add(input.readBytes()); break; } + case 32: { + bitField0_ |= 0x00000004; + storeFileSize_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8051,10 +8074,35 @@ public final class WALProtos { return storeFile_.getByteString(index); } + // optional uint64 store_file_size = 4; + public static final int STORE_FILE_SIZE_FIELD_NUMBER = 4; + private long storeFileSize_; + /** + * optional uint64 store_file_size = 4; + * + *
+     * size of store file
+     * 
+ */ + public boolean hasStoreFileSize() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional uint64 store_file_size = 4; + * + *
+     * size of store file
+     * 
+ */ + public long getStoreFileSize() { + return storeFileSize_; + } + private void initFields() { familyName_ = com.google.protobuf.ByteString.EMPTY; storeHomeDir_ = ""; storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY; + storeFileSize_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8085,6 +8133,9 @@ public final class WALProtos { for (int i = 0; i < storeFile_.size(); i++) { output.writeBytes(3, storeFile_.getByteString(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt64(4, storeFileSize_); + } getUnknownFields().writeTo(output); } @@ -8111,6 +8162,10 @@ public final class WALProtos { size += dataSize; size += 1 * getStoreFileList().size(); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, storeFileSize_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8146,6 +8201,11 @@ public final class WALProtos { } result = result && getStoreFileList() .equals(other.getStoreFileList()); + result = result && (hasStoreFileSize() == other.hasStoreFileSize()); + if (hasStoreFileSize()) { + result = result && (getStoreFileSize() + == other.getStoreFileSize()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8171,6 +8231,10 @@ public final class WALProtos { hash = (37 * hash) + STORE_FILE_FIELD_NUMBER; hash = (53 * hash) + getStoreFileList().hashCode(); } + if (hasStoreFileSize()) { + hash = (37 * hash) + STORE_FILE_SIZE_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getStoreFileSize()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8286,6 +8350,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000002); storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); + storeFileSize_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -8328,6 +8394,10 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000004); } result.storeFile_ = storeFile_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.storeFileSize_ = storeFileSize_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8362,6 +8432,9 @@ public final class WALProtos { } onChanged(); } + if (other.hasStoreFileSize()) { + setStoreFileSize(other.getStoreFileSize()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8660,6 +8733,55 @@ public final class WALProtos { return this; } + // optional uint64 store_file_size = 4; + private long storeFileSize_ ; + /** + * optional uint64 store_file_size = 4; + * + *
+       * size of store file
+       * 
+ */ + public boolean hasStoreFileSize() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint64 store_file_size = 4; + * + *
+       * size of store file
+       * 
+ */ + public long getStoreFileSize() { + return storeFileSize_; + } + /** + * optional uint64 store_file_size = 4; + * + *
+       * size of store file
+       * 
+ */ + public Builder setStoreFileSize(long value) { + bitField0_ |= 0x00000008; + storeFileSize_ = value; + onChanged(); + return this; + } + /** + * optional uint64 store_file_size = 4; + * + *
+       * size of store file
+       * 
+ */ + public Builder clearStoreFileSize() { + bitField0_ = (bitField0_ & ~0x00000008); + storeFileSize_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.StoreDescriptor) } @@ -11877,24 +11999,25 @@ public final class WALProtos { "ome_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"S\n\013F" + "lushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FL" + "USH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020\n\014CANNOT_FLUSH\020\003", - "\"R\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" + + "\"k\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" + "\022\026\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003" + - " \003(\t\"\237\001\n\022BulkLoadDescriptor\022\'\n\ntable_nam" + - "e\030\001 \002(\0132\023.hbase.pb.TableName\022\033\n\023encoded_" + - "region_name\030\002 \002(\014\022)\n\006stores\030\003 \003(\0132\031.hbas" + - "e.pb.StoreDescriptor\022\030\n\020bulkload_seq_num" + - "\030\004 \002(\003\"\272\002\n\025RegionEventDescriptor\022=\n\neven" + - "t_type\030\001 \002(\0162).hbase.pb.RegionEventDescr" + - "iptor.EventType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023e" + - "ncoded_region_name\030\003 \002(\014\022\033\n\023log_sequence", - "_number\030\004 \001(\004\022)\n\006stores\030\005 \003(\0132\031.hbase.pb" + - ".StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.hbase" + - ".pb.ServerName\022\023\n\013region_name\030\007 \001(\014\".\n\tE" + - "ventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOS" + - "E\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" + - "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" + - "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" + - "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001" + " \003(\t\022\027\n\017store_file_size\030\004 \001(\004\"\237\001\n\022BulkLo" + + "adDescriptor\022\'\n\ntable_name\030\001 \002(\0132\023.hbase" + + ".pb.TableName\022\033\n\023encoded_region_name\030\002 \002" + + "(\014\022)\n\006stores\030\003 \003(\0132\031.hbase.pb.StoreDescr" + + "iptor\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\272\002\n\025Regi" + + "onEventDescriptor\022=\n\nevent_type\030\001 \002(\0162)." + + "hbase.pb.RegionEventDescriptor.EventType" + + "\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_n", + "ame\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001(\004\022)" + + "\n\006stores\030\005 \003(\0132\031.hbase.pb.StoreDescripto" + + "r\022$\n\006server\030\006 \001(\0132\024.hbase.pb.ServerName\022" + + "\023\n\013region_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REG" + + "ION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrail" + + "er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" + + "AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" + + "g.apache.hadoop.hbase.protobuf.generated" + + "B\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -11942,7 +12065,7 @@ public final class WALProtos { internal_static_hbase_pb_StoreDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_StoreDescriptor_descriptor, - new java.lang.String[] { "FamilyName", "StoreHomeDir", "StoreFile", }); + new java.lang.String[] { "FamilyName", "StoreHomeDir", "StoreFile", "StoreFileSize", }); internal_static_hbase_pb_BulkLoadDescriptor_descriptor = getDescriptor().getMessageTypes().get(6); internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index cb9bd8f5c35..aeb8a163149 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -132,6 +132,7 @@ message StoreDescriptor { required bytes family_name = 1; required string store_home_dir = 2; //relative to region dir repeated string store_file = 3; // relative to store dir + optional uint64 store_file_size = 4; // size of store file } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 453231713d1..e7a99a90775 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5315,6 +5315,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi BulkLoadListener bulkLoadListener) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); + Map storeFilesSizes = new HashMap(); Preconditions.checkNotNull(familyPaths); // we need writeLock for multi-family bulk load startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); @@ -5397,6 +5398,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); + // Note the size of the store file + try { + FileSystem fs = commitedStoreFile.getFileSystem(baseConf); + storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) + .getLen()); + } catch (IOException e) { + LOG.warn("Failed to find the size of hfile " + commitedStoreFile); + storeFilesSizes.put(commitedStoreFile.getName(), 0L); + } + if(storeFiles.containsKey(familyName)) { storeFiles.get(familyName).add(commitedStoreFile); } else { @@ -5431,9 +5442,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (wal != null && !storeFiles.isEmpty()) { // Write a bulk load event for hfiles that are loaded try { - WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor( - this.getRegionInfo().getTable(), - ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); + WALProtos.BulkLoadDescriptor loadDescriptor = + ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), + ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, + storeFilesSizes, seqId); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7e58e4102aa..0f956c874be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -712,6 +712,7 @@ public class ReplicationSource extends Thread currentNbOperations += countDistinctRowKeys(edit); entries.add(entry); currentSize += entry.getEdit().heapSize(); + currentSize += calculateTotalSizeOfStoreFiles(edit); } else { metrics.incrLogEditsFiltered(); } @@ -738,6 +739,35 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } + /** + * Calculate the total size of all the store files + * @param edit edit to count row keys from + * @return the total size of the store files + */ + private int calculateTotalSizeOfStoreFiles(WALEdit edit) { + List cells = edit.getCells(); + int totalStoreFilesSize = 0; + + int totalCells = edit.size(); + for (int i = 0; i < totalCells; i++) { + if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { + try { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); + List stores = bld.getStoresList(); + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { + totalStoreFilesSize += stores.get(j).getStoreFileSize(); + } + } catch (IOException e) { + LOG.error("Failed to deserialize bulk load entry from wal edit. " + + "Size of HFiles part of cell will not be considered in replication " + + "request size calculation.", e); + } + } + } + return totalStoreFilesSize; + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = peerClusterZnode; if (peerId.contains("-")) { @@ -746,12 +776,14 @@ public class ReplicationSource extends Thread peerId = peerClusterZnode.split("-")[0]; } List cells = edit.getCells(); - for (int i = 0; i < cells.size(); i++) { + int totalCells = cells.size(); + for (int i = 0; i < totalCells; i++) { Cell cell = cells.get(i); if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); List stores = bld.getStoresList(); - for (int j = 0; j < stores.size(); j++) { + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { List storeFileList = stores.get(j).getStoreFileList(); manager.cleanUpHFileRefs(peerId, storeFileList); metrics.decrSizeOfHFileRefsQueue(storeFileList.size()); @@ -934,18 +966,20 @@ public class ReplicationSource extends Thread int totalHFileEntries = 0; Cell lastCell = cells.get(0); - for (int i = 0; i < edit.size(); i++) { + int totalCells = edit.size(); + for (int i = 0; i < totalCells; i++) { // Count HFiles to be replicated if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) { try { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i)); List stores = bld.getStoresList(); - for (int j = 0; j < stores.size(); j++) { + int totalStores = stores.size(); + for (int j = 0; j < totalStores; j++) { totalHFileEntries += stores.get(j).getStoreFileList().size(); } } catch (IOException e) { LOG.error("Failed to deserialize bulk load entry from wal edit. " - + "This its hfiles count will not be added into metric."); + + "Then its hfiles count will not be added into metric."); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index f08d2bf340a..1e80548c576 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -287,6 +287,7 @@ public class TestReplicationSink { } List numberList = new ArrayList<>(numbers); Collections.sort(numberList); + Map storeFilesSize = new HashMap(1); // 2. Create 25 hfiles Configuration conf = TEST_UTIL.getConfiguration(); @@ -297,6 +298,7 @@ public class TestReplicationSink { HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); p.add(hfilePath); + storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen()); } // 3. Create a BulkLoadDescriptor and a WALEdit @@ -310,7 +312,7 @@ public class TestReplicationSink { HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo(); loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, - ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1); + ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); } List entries = new ArrayList(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index fb8cfa0df69..9e950d2bec4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; @@ -479,16 +480,32 @@ public class TestReplicationSourceManager { private WALEdit getBulkLoadWALEdit(NavigableMap scope) { // 1. Create store files for the families Map> storeFiles = new HashMap<>(1); + Map storeFilesSize = new HashMap<>(1); List p = new ArrayList<>(1); - p.add(new Path(Bytes.toString(f1))); + Path hfilePath1 = new Path(Bytes.toString(f1)); + p.add(hfilePath1); + try { + storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); + } catch (IOException e) { + LOG.debug("Failed to calculate the size of hfile " + hfilePath1); + storeFilesSize.put(hfilePath1.getName(), 0L); + } storeFiles.put(f1, p); scope.put(f1, 1); p = new ArrayList<>(1); - p.add(new Path(Bytes.toString(f2))); + Path hfilePath2 = new Path(Bytes.toString(f2)); + p.add(hfilePath2); + try { + storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); + } catch (IOException e) { + LOG.debug("Failed to calculate the size of hfile " + hfilePath2); + storeFilesSize.put(hfilePath2.getName(), 0L); + } storeFiles.put(f2, p); // 2. Create bulk load descriptor - BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), - ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1); + BulkLoadDescriptor desc = + ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), + ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); // 3. create bulk load wal edit event WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);