HBASE-15669 HFile size is not considered correctly in a replication request

This commit is contained in:
Ashish Singhi 2016-05-06 17:26:17 +05:30
parent bec81b1977
commit 34e9a6ff30
7 changed files with 229 additions and 34 deletions

View File

@ -3063,13 +3063,16 @@ public final class ProtobufUtil {
* @param tableName The tableName into which the bulk load is being imported into.
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
* @param storeFiles A set of store files of a column family are bulk loaded.
* @param storeFilesSize Map of store files and their lengths
* @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile
* name
* @return The WAL log marker for bulk loads.
*/
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles, long bulkloadSeqId) {
BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder()
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
@ -3078,7 +3081,10 @@ public final class ProtobufUtil {
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
for (Path path : entry.getValue()) {
builder.addStoreFile(path.getName());
String name = path.getName();
builder.addStoreFile(name);
Long size = storeFilesSize.get(name) == null ? (Long) 0L : storeFilesSize.get(name);
builder.setStoreFileSize(size);
}
desc.addStores(builder);
}

View File

@ -7821,6 +7821,24 @@ public final class WALProtos {
*/
com.google.protobuf.ByteString
getStoreFileBytes(int index);
// optional uint64 store_file_size = 4;
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
boolean hasStoreFileSize();
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
long getStoreFileSize();
}
/**
* Protobuf type {@code hbase.pb.StoreDescriptor}
@ -7891,6 +7909,11 @@ public final class WALProtos {
storeFile_.add(input.readBytes());
break;
}
case 32: {
bitField0_ |= 0x00000004;
storeFileSize_ = input.readUInt64();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -8051,10 +8074,35 @@ public final class WALProtos {
return storeFile_.getByteString(index);
}
// optional uint64 store_file_size = 4;
public static final int STORE_FILE_SIZE_FIELD_NUMBER = 4;
private long storeFileSize_;
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public boolean hasStoreFileSize() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public long getStoreFileSize() {
return storeFileSize_;
}
private void initFields() {
familyName_ = com.google.protobuf.ByteString.EMPTY;
storeHomeDir_ = "";
storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY;
storeFileSize_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -8085,6 +8133,9 @@ public final class WALProtos {
for (int i = 0; i < storeFile_.size(); i++) {
output.writeBytes(3, storeFile_.getByteString(i));
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeUInt64(4, storeFileSize_);
}
getUnknownFields().writeTo(output);
}
@ -8111,6 +8162,10 @@ public final class WALProtos {
size += dataSize;
size += 1 * getStoreFileList().size();
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(4, storeFileSize_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -8146,6 +8201,11 @@ public final class WALProtos {
}
result = result && getStoreFileList()
.equals(other.getStoreFileList());
result = result && (hasStoreFileSize() == other.hasStoreFileSize());
if (hasStoreFileSize()) {
result = result && (getStoreFileSize()
== other.getStoreFileSize());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -8171,6 +8231,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_FILE_FIELD_NUMBER;
hash = (53 * hash) + getStoreFileList().hashCode();
}
if (hasStoreFileSize()) {
hash = (37 * hash) + STORE_FILE_SIZE_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getStoreFileSize());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -8286,6 +8350,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000002);
storeFile_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
storeFileSize_ = 0L;
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@ -8328,6 +8394,10 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000004);
}
result.storeFile_ = storeFile_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000004;
}
result.storeFileSize_ = storeFileSize_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -8362,6 +8432,9 @@ public final class WALProtos {
}
onChanged();
}
if (other.hasStoreFileSize()) {
setStoreFileSize(other.getStoreFileSize());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -8660,6 +8733,55 @@ public final class WALProtos {
return this;
}
// optional uint64 store_file_size = 4;
private long storeFileSize_ ;
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public boolean hasStoreFileSize() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public long getStoreFileSize() {
return storeFileSize_;
}
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public Builder setStoreFileSize(long value) {
bitField0_ |= 0x00000008;
storeFileSize_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 store_file_size = 4;</code>
*
* <pre>
* size of store file
* </pre>
*/
public Builder clearStoreFileSize() {
bitField0_ = (bitField0_ & ~0x00000008);
storeFileSize_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.StoreDescriptor)
}
@ -11877,24 +11999,25 @@ public final class WALProtos {
"ome_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"S\n\013F" +
"lushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FL" +
"USH\020\001\022\017\n\013ABORT_FLUSH\020\002\022\020\n\014CANNOT_FLUSH\020\003",
"\"R\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" +
"\"k\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014" +
"\022\026\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003" +
" \003(\t\"\237\001\n\022BulkLoadDescriptor\022\'\n\ntable_nam" +
"e\030\001 \002(\0132\023.hbase.pb.TableName\022\033\n\023encoded_" +
"region_name\030\002 \002(\014\022)\n\006stores\030\003 \003(\0132\031.hbas" +
"e.pb.StoreDescriptor\022\030\n\020bulkload_seq_num" +
"\030\004 \002(\003\"\272\002\n\025RegionEventDescriptor\022=\n\neven" +
"t_type\030\001 \002(\0162).hbase.pb.RegionEventDescr" +
"iptor.EventType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023e" +
"ncoded_region_name\030\003 \002(\014\022\033\n\023log_sequence",
"_number\030\004 \001(\004\022)\n\006stores\030\005 \003(\0132\031.hbase.pb" +
".StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.hbase" +
".pb.ServerName\022\023\n\013region_name\030\007 \001(\014\".\n\tE" +
"ventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOS" +
"E\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
"ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
"_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" +
"tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
" \003(\t\022\027\n\017store_file_size\030\004 \001(\004\"\237\001\n\022BulkLo" +
"adDescriptor\022\'\n\ntable_name\030\001 \002(\0132\023.hbase" +
".pb.TableName\022\033\n\023encoded_region_name\030\002 \002" +
"(\014\022)\n\006stores\030\003 \003(\0132\031.hbase.pb.StoreDescr" +
"iptor\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\272\002\n\025Regi" +
"onEventDescriptor\022=\n\nevent_type\030\001 \002(\0162)." +
"hbase.pb.RegionEventDescriptor.EventType" +
"\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_n",
"ame\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001(\004\022)" +
"\n\006stores\030\005 \003(\0132\031.hbase.pb.StoreDescripto" +
"r\022$\n\006server\030\006 \001(\0132\024.hbase.pb.ServerName\022" +
"\023\n\013region_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REG" +
"ION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrail" +
"er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" +
"AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" +
"g.apache.hadoop.hbase.protobuf.generated" +
"B\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -11942,7 +12065,7 @@ public final class WALProtos {
internal_static_hbase_pb_StoreDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_StoreDescriptor_descriptor,
new java.lang.String[] { "FamilyName", "StoreHomeDir", "StoreFile", });
new java.lang.String[] { "FamilyName", "StoreHomeDir", "StoreFile", "StoreFileSize", });
internal_static_hbase_pb_BulkLoadDescriptor_descriptor =
getDescriptor().getMessageTypes().get(6);
internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new

View File

@ -132,6 +132,7 @@ message StoreDescriptor {
required bytes family_name = 1;
required string store_home_dir = 2; //relative to region dir
repeated string store_file = 3; // relative to store dir
optional uint64 store_file_size = 4; // size of store file
}
/**

View File

@ -5315,6 +5315,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
BulkLoadListener bulkLoadListener) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
Preconditions.checkNotNull(familyPaths);
// we need writeLock for multi-family bulk load
startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@ -5397,6 +5398,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
// Note the size of the store file
try {
FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
.getLen());
} catch (IOException e) {
LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
storeFilesSizes.put(commitedStoreFile.getName(), 0L);
}
if(storeFiles.containsKey(familyName)) {
storeFiles.get(familyName).add(commitedStoreFile);
} else {
@ -5431,9 +5442,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (wal != null && !storeFiles.isEmpty()) {
// Write a bulk load event for hfiles that are loaded
try {
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles,
storeFilesSizes, seqId);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {

View File

@ -712,6 +712,7 @@ public class ReplicationSource extends Thread
currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry);
currentSize += entry.getEdit().heapSize();
currentSize += calculateTotalSizeOfStoreFiles(edit);
} else {
metrics.incrLogEditsFiltered();
}
@ -738,6 +739,35 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
/**
* Calculate the total size of all the store files
* @param edit edit to count row keys from
* @return the total size of the store files
*/
private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0;
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalStoreFilesSize += stores.get(j).getStoreFileSize();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Size of HFiles part of cell will not be considered in replication "
+ "request size calculation.", e);
}
}
}
return totalStoreFilesSize;
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
@ -746,12 +776,14 @@ public class ReplicationSource extends Thread
peerId = peerClusterZnode.split("-")[0];
}
List<Cell> cells = edit.getCells();
for (int i = 0; i < cells.size(); i++) {
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
for (int j = 0; j < stores.size(); j++) {
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
manager.cleanUpHFileRefs(peerId, storeFileList);
metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
@ -934,18 +966,20 @@ public class ReplicationSource extends Thread
int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
for (int i = 0; i < edit.size(); i++) {
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
// Count HFiles to be replicated
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
for (int j = 0; j < stores.size(); j++) {
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalHFileEntries += stores.get(j).getStoreFileList().size();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "This its hfiles count will not be added into metric.");
+ "Then its hfiles count will not be added into metric.");
}
}

View File

@ -287,6 +287,7 @@ public class TestReplicationSink {
}
List<Integer> numberList = new ArrayList<>(numbers);
Collections.sort(numberList);
Map<String, Long> storeFilesSize = new HashMap<String, Long>(1);
// 2. Create 25 hfiles
Configuration conf = TEST_UTIL.getConfiguration();
@ -297,6 +298,7 @@ public class TestReplicationSink {
HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
p.add(hfilePath);
storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
}
// 3. Create a BulkLoadDescriptor and a WALEdit
@ -310,7 +312,7 @@ public class TestReplicationSink {
HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1);
ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
}
List<WALEntry> entries = new ArrayList<WALEntry>(1);

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
@ -479,16 +480,32 @@ public class TestReplicationSourceManager {
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
Map<String, Long> storeFilesSize = new HashMap<>(1);
List<Path> p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f1)));
Path hfilePath1 = new Path(Bytes.toString(f1));
p.add(hfilePath1);
try {
storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
} catch (IOException e) {
LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
storeFilesSize.put(hfilePath1.getName(), 0L);
}
storeFiles.put(f1, p);
scope.put(f1, 1);
p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f2)));
Path hfilePath2 = new Path(Bytes.toString(f2));
p.add(hfilePath2);
try {
storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
} catch (IOException e) {
LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
storeFilesSize.put(hfilePath2.getName(), 0L);
}
storeFiles.put(f2, p);
// 2. Create bulk load descriptor
BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
BulkLoadDescriptor desc =
ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
// 3. create bulk load wal edit event
WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);