From 0ffbf9c75902209ffae0d2dbe86ed3024da0144d Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 2 Oct 2019 17:49:14 +0100 Subject: [PATCH] HBASE-23101 Backport HBASE-22380 to branch-1 Fixes #680 Signed-off-by: Andrew Purtell --- .../coprocessor/SecureBulkLoadClient.java | 21 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 60 +++- .../hbase/protobuf/RequestConverter.java | 23 +- .../protobuf/generated/ClientProtos.java | 307 ++++++++++++++---- .../generated/SecureBulkLoadProtos.java | 233 +++++++++++-- .../hbase/protobuf/generated/WALProtos.java | 221 ++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 1 + .../src/main/protobuf/SecureBulkLoad.proto | 1 + hbase-protocol/src/main/protobuf/WAL.proto | 1 + .../mapreduce/LoadIncrementalHFiles.java | 9 +- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 8 +- .../hadoop/hbase/regionserver/Region.java | 5 +- .../regionserver/HFileReplicator.java | 5 +- .../regionserver/ReplicationSink.java | 48 ++- .../access/SecureBulkLoadEndpoint.java | 4 +- .../hbase/regionserver/TestBulkLoad.java | 18 +- .../regionserver/TestBulkLoadReplication.java | 299 +++++++++++++++++ .../regionserver/TestHRegionReplayEvents.java | 2 +- .../hbase/regionserver/wal/TestWALReplay.java | 4 +- 21 files changed, 1135 insertions(+), 141 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java index 05db8d4140e..bfbd8ddc4d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java @@ -117,10 +117,24 @@ public class SecureBulkLoadClient { } } + /** + * @deprecated + * @param familyPaths + * @param userToken + * @param bulkToken + * @param startRow + * @return + * @throws IOException + */ public boolean bulkLoadHFiles(final List> familyPaths, - final Token userToken, - final String bulkToken, - final byte[] startRow) throws IOException { + final Token userToken, final String bulkToken, + final byte[] startRow) throws IOException { + return this.bulkLoadHFiles(familyPaths, userToken, bulkToken, startRow, null); + } + + public boolean bulkLoadHFiles(final List> familyPaths, + final Token userToken, final String bulkToken, + final byte[] startRow, List clusterIds) throws IOException { // we never want to send a batch of HFiles to all regions, thus cannot call // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 try { @@ -151,6 +165,7 @@ public class SecureBulkLoadClient { SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() .setFsToken(protoDT) .addAllFamilyPath(protoFamilyPaths) + .addAllClusterIds(clusterIds != null ? clusterIds : new ArrayList()) .setBulkToken(bulkToken).build(); ServerRpcController controller = new ServerRpcController(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 2c70ad70d0f..975cf4458c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1807,12 +1807,31 @@ public final class ProtobufUtil { * @param assignSeqNum * @return true if all are loaded * @throws IOException + * @deprecated use bulkLoadHFile(final ClientService.BlockingInterface client, + * final List> familyPaths, final byte[] regionName, boolean assignSeqNum, + * List clusterIds) instead. */ public static boolean bulkLoadHFile(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum) throws IOException { + return bulkLoadHFile(client, familyPaths, regionName, assignSeqNum, null); + } + + /** + * A helper to bulk load a list of HFiles using client protocol. + * + * @param client + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @return true if all are loaded + * @throws IOException + */ + public static boolean bulkLoadHFile(final ClientService.BlockingInterface client, + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, List clusterIds) throws IOException { BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum); + RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, clusterIds); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); @@ -3358,6 +3377,10 @@ public final class ProtobufUtil { * Generates a marker for the WAL so that we propagate the notion of a bulk region load * throughout the WAL. * + * @deprecated use toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, + * Map> storeFiles, Map storeFilesSize, long bulkloadSeqId, + * List clusterIds) instead. + * * @param tableName The tableName into which the bulk load is being imported into. * @param encodedRegionName Encoded region name of the region which is being bulk loaded. * @param storeFiles A set of store files of a column family are bulk loaded. @@ -3367,17 +3390,42 @@ public final class ProtobufUtil { * @return The WAL log marker for bulk loads. */ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, - ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId) { + ByteString encodedRegionName, Map> storeFiles, + Map storeFilesSize, long bulkloadSeqId) { + return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, + storeFilesSize, bulkloadSeqId, null); + } + + /** + * Generates a marker for the WAL so that we propagate the notion of a bulk region load + * throughout the WAL, keeping track of clusters who already applied the bulk event via + * the passed clusterIds parameter. + * + * @param tableName The tableName into which the bulk load is being imported into. + * @param encodedRegionName Encoded region name of the region which is being bulk loaded. + * @param storeFiles A set of store files of a column family are bulk loaded. + * @param storeFilesSize Map of store files and their lengths + * @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile name + * @param clusterIds The list of cluster Ids with the clusters where the bulk even had + * already been processed. + * + * @return The WAL log marker for bulk loads. + */ + public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, + ByteString encodedRegionName, Map> storeFiles, + Map storeFilesSize, long bulkloadSeqId, List clusterIds) { BulkLoadDescriptor.Builder desc = - BulkLoadDescriptor.newBuilder() + BulkLoadDescriptor.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + if(clusterIds != null) { + desc.addAllClusterIds(clusterIds); + } for (Map.Entry> entry : storeFiles.entrySet()) { WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder() - .setFamilyName(ByteStringer.wrap(entry.getKey())) - .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region + .setFamilyName(ByteStringer.wrap(entry.getKey())) + .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region for (Path path : entry.getValue()) { String name = path.getName(); builder.addStoreFile(name); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 9c6fea7483d..63b8af260c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -561,14 +561,32 @@ public final class RequestConverter { /** * Create a protocol buffer bulk load request * + * @deprecated use buildBulkLoadHFileRequest(final List> familyPaths, + * final byte[] regionName, boolean assignSeqNum, List clusterIds) + * * @param familyPaths * @param regionName * @param assignSeqNum * @return a bulk load request */ + public static BulkLoadHFileRequest buildBulkLoadHFileRequest( + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum) { + return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, null); + } + + /** + * Create a protocol buffer bulk load request + * + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @param clusterIds + * @return a bulk load request + */ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List> familyPaths, - final byte[] regionName, boolean assignSeqNum) { + final byte[] regionName, boolean assignSeqNum, List clusterIds) { BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -579,6 +597,9 @@ public final class RequestConverter { familyPathBuilder.setPath(familyPath.getSecond()); builder.addFamilyPath(familyPathBuilder.build()); } + if(clusterIds!=null) { + builder.addAllClusterIds(clusterIds); + } builder.setAssignSeqNum(assignSeqNum); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 35fddc281c0..3c905e8a8c7 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -22843,6 +22843,26 @@ public final class ClientProtos { * optional bool assign_seq_num = 3; */ boolean getAssignSeqNum(); + + // repeated string cluster_ids = 4; + /** + * repeated string cluster_ids = 4; + */ + java.util.List + getClusterIdsList(); + /** + * repeated string cluster_ids = 4; + */ + int getClusterIdsCount(); + /** + * repeated string cluster_ids = 4; + */ + java.lang.String getClusterIds(int index); + /** + * repeated string cluster_ids = 4; + */ + com.google.protobuf.ByteString + getClusterIdsBytes(int index); } /** * Protobuf type {@code hbase.pb.BulkLoadHFileRequest} @@ -22927,6 +22947,14 @@ public final class ClientProtos { assignSeqNum_ = input.readBool(); break; } + case 34: { + if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000008; + } + clusterIds_.add(input.readBytes()); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -22938,6 +22966,9 @@ public final class ClientProtos { if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { familyPath_ = java.util.Collections.unmodifiableList(familyPath_); } + if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -23662,10 +23693,41 @@ public final class ClientProtos { return assignSeqNum_; } + // repeated string cluster_ids = 4; + public static final int CLUSTER_IDS_FIELD_NUMBER = 4; + private com.google.protobuf.LazyStringList clusterIds_; + /** + * repeated string cluster_ids = 4; + */ + public java.util.List + getClusterIdsList() { + return clusterIds_; + } + /** + * repeated string cluster_ids = 4; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 4; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 4; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); familyPath_ = java.util.Collections.emptyList(); assignSeqNum_ = false; + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -23702,6 +23764,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBool(3, assignSeqNum_); } + for (int i = 0; i < clusterIds_.size(); i++) { + output.writeBytes(4, clusterIds_.getByteString(i)); + } getUnknownFields().writeTo(output); } @@ -23723,6 +23788,15 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, assignSeqNum_); } + { + int dataSize = 0; + for (int i = 0; i < clusterIds_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(clusterIds_.getByteString(i)); + } + size += dataSize; + size += 1 * getClusterIdsList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -23758,6 +23832,8 @@ public final class ClientProtos { result = result && (getAssignSeqNum() == other.getAssignSeqNum()); } + result = result && getClusterIdsList() + .equals(other.getClusterIdsList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -23783,6 +23859,10 @@ public final class ClientProtos { hash = (37 * hash) + ASSIGN_SEQ_NUM_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getAssignSeqNum()); } + if (getClusterIdsCount() > 0) { + hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER; + hash = (53 * hash) + getClusterIdsList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -23914,6 +23994,8 @@ public final class ClientProtos { } assignSeqNum_ = false; bitField0_ = (bitField0_ & ~0x00000004); + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -23963,6 +24045,12 @@ public final class ClientProtos { to_bitField0_ |= 0x00000002; } result.assignSeqNum_ = assignSeqNum_; + if (((bitField0_ & 0x00000008) == 0x00000008)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList( + clusterIds_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.clusterIds_ = clusterIds_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -24011,6 +24099,16 @@ public final class ClientProtos { if (other.hasAssignSeqNum()) { setAssignSeqNum(other.getAssignSeqNum()); } + if (!other.clusterIds_.isEmpty()) { + if (clusterIds_.isEmpty()) { + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureClusterIdsIsMutable(); + clusterIds_.addAll(other.clusterIds_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -24442,6 +24540,99 @@ public final class ClientProtos { return this; } + // repeated string cluster_ids = 4; + private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureClusterIdsIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_); + bitField0_ |= 0x00000008; + } + } + /** + * repeated string cluster_ids = 4; + */ + public java.util.List + getClusterIdsList() { + return java.util.Collections.unmodifiableList(clusterIds_); + } + /** + * repeated string cluster_ids = 4; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 4; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 4; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + /** + * repeated string cluster_ids = 4; + */ + public Builder setClusterIds( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 4; + */ + public Builder addClusterIds( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 4; + */ + public Builder addAllClusterIds( + java.lang.Iterable values) { + ensureClusterIdsIsMutable(); + super.addAll(values, clusterIds_); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 4; + */ + public Builder clearClusterIds() { + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 4; + */ + public Builder addClusterIdsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest) } @@ -37378,66 +37569,66 @@ public final class ClientProtos { "(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014scan_me" + "trics\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mv" + "cc_read_point\030\013 \001(\004:\0010\022 \n\006cursor\030\014 \001(\0132\020" + - ".hbase.pb.Cursor\"\305\001\n\024BulkLoadHFileReques" + + ".hbase.pb.Cursor\"\332\001\n\024BulkLoadHFileReques" + "t\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpeci" + "fier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bul" + "kLoadHFileRequest.FamilyPath\022\026\n\016assign_s" + - "eq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" + - "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" + - "e\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServiceC", - "all\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023" + - "\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030" + - "CoprocessorServiceResult\022&\n\005value\030\001 \001(\0132" + - "\027.hbase.pb.NameBytesPair\"v\n\031CoprocessorS" + - "erviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + - ".RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.p" + - "b.CoprocessorServiceCall\"o\n\032CoprocessorS" + - "erviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.p" + - "b.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase" + - ".pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 ", - "\001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutatio" + - "nProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014se" + - "rvice_call\030\004 \001(\0132 .hbase.pb.CoprocessorS" + - "erviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002" + - "(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030" + - "\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"" + - "c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005" + - ":\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compact" + - "ionPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadS" + - "tats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSp", - "ecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionL" + - "oadStats\"\336\001\n\021ResultOrException\022\r\n\005index\030" + - "\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022" + - "*\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesP" + - "air\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.C" + - "oprocessorServiceResult\0220\n\tloadStats\030\005 \001" + - "(\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Reg" + - "ionActionResult\0226\n\021resultOrException\030\001 \003" + - "(\0132\033.hbase.pb.ResultOrException\022*\n\texcep" + - "tion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014M", - "ultiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbas" + - "e.pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n" + - "\tcondition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001" + - "\n\rMultiResponse\0228\n\022regionActionResult\030\001 " + - "\003(\0132\034.hbase.pb.RegionActionResult\022\021\n\tpro" + - "cessed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036" + - ".hbase.pb.MultiRegionLoadStats*\'\n\013Consis" + - "tency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClie" + - "ntService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025" + - ".hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.", - "pb.MutateRequest\032\030.hbase.pb.MutateRespon" + - "se\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbas" + - "e.pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hba" + - "se.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bu" + - "lkLoadHFileResponse\022X\n\013ExecService\022#.hba" + - "se.pb.CoprocessorServiceRequest\032$.hbase." + - "pb.CoprocessorServiceResponse\022d\n\027ExecReg" + - "ionServerService\022#.hbase.pb.CoprocessorS" + - "erviceRequest\032$.hbase.pb.CoprocessorServ" + - "iceResponse\0228\n\005Multi\022\026.hbase.pb.MultiReq", - "uest\032\027.hbase.pb.MultiResponseBB\n*org.apa" + - "che.hadoop.hbase.protobuf.generatedB\014Cli" + - "entProtosH\001\210\001\001\240\001\001" + "eq_num\030\003 \001(\010\022\023\n\013cluster_ids\030\004 \003(\t\032*\n\nFam" + + "ilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n" + + "\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a", + "\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n" + + "\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t" + + "\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServiceR" + + "esult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.NameByte" + + "sPair\"v\n\031CoprocessorServiceRequest\022)\n\006re" + + "gion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022.\n" + + "\004call\030\002 \002(\0132 .hbase.pb.CoprocessorServic" + + "eCall\"o\n\032CoprocessorServiceResponse\022)\n\006r" + + "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022&" + + "\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226", + "\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001" + + "(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003 \001(\013" + + "2\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(\0132 ." + + "hbase.pb.CoprocessorServiceCall\"k\n\014Regio" + + "nAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Regio" + + "nSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003" + + "(\0132\020.hbase.pb.Action\"c\n\017RegionLoadStats\022" + + "\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupanc" + + "y\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\001" + + "0\"j\n\024MultiRegionLoadStats\022)\n\006region\030\001 \003(", + "\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat\030\002 \003" + + "(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Result" + + "OrException\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001" + + "(\0132\020.hbase.pb.Result\022*\n\texception\030\003 \001(\0132" + + "\027.hbase.pb.NameBytesPair\022:\n\016service_resu" + + "lt\030\004 \001(\0132\".hbase.pb.CoprocessorServiceRe" + + "sult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.Regio" + + "nLoadStatsB\002\030\001\"x\n\022RegionActionResult\0226\n\021" + + "resultOrException\030\001 \003(\0132\033.hbase.pb.Resul" + + "tOrException\022*\n\texception\030\002 \001(\0132\027.hbase.", + "pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014regi" + + "onAction\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022" + + "\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023." + + "hbase.pb.Condition\"\226\001\n\rMultiResponse\0228\n\022" + + "regionActionResult\030\001 \003(\0132\034.hbase.pb.Regi" + + "onActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020reg" + + "ionStatistics\030\003 \001(\0132\036.hbase.pb.MultiRegi" + + "onLoadStats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014" + + "\n\010TIMELINE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024." + + "hbase.pb.GetRequest\032\025.hbase.pb.GetRespon", + "se\022;\n\006Mutate\022\027.hbase.pb.MutateRequest\032\030." + + "hbase.pb.MutateResponse\0225\n\004Scan\022\025.hbase." + + "pb.ScanRequest\032\026.hbase.pb.ScanResponse\022P" + + "\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadHFile" + + "Request\032\037.hbase.pb.BulkLoadHFileResponse" + + "\022X\n\013ExecService\022#.hbase.pb.CoprocessorSe" + + "rviceRequest\032$.hbase.pb.CoprocessorServi" + + "ceResponse\022d\n\027ExecRegionServerService\022#." + + "hbase.pb.CoprocessorServiceRequest\032$.hba" + + "se.pb.CoprocessorServiceResponse\0228\n\005Mult", + "i\022\026.hbase.pb.MultiRequest\032\027.hbase.pb.Mul" + + "tiResponseBB\n*org.apache.hadoop.hbase.pr" + + "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -37551,7 +37742,7 @@ public final class ClientProtos { internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_BulkLoadHFileRequest_descriptor, - new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", }); + new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "ClusterIds", }); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor = internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java index 8521ba883fa..7891480c4df 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java @@ -74,6 +74,26 @@ public final class SecureBulkLoadProtos { */ com.google.protobuf.ByteString getBulkTokenBytes(); + + // repeated string cluster_ids = 5; + /** + * repeated string cluster_ids = 5; + */ + java.util.List + getClusterIdsList(); + /** + * repeated string cluster_ids = 5; + */ + int getClusterIdsCount(); + /** + * repeated string cluster_ids = 5; + */ + java.lang.String getClusterIds(int index); + /** + * repeated string cluster_ids = 5; + */ + com.google.protobuf.ByteString + getClusterIdsBytes(int index); } /** * Protobuf type {@code hbase.pb.SecureBulkLoadHFilesRequest} @@ -157,6 +177,14 @@ public final class SecureBulkLoadProtos { bulkToken_ = input.readBytes(); break; } + case 42: { + if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000010; + } + clusterIds_.add(input.readBytes()); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -168,6 +196,9 @@ public final class SecureBulkLoadProtos { if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { familyPath_ = java.util.Collections.unmodifiableList(familyPath_); } + if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -317,11 +348,42 @@ public final class SecureBulkLoadProtos { } } + // repeated string cluster_ids = 5; + public static final int CLUSTER_IDS_FIELD_NUMBER = 5; + private com.google.protobuf.LazyStringList clusterIds_; + /** + * repeated string cluster_ids = 5; + */ + public java.util.List + getClusterIdsList() { + return clusterIds_; + } + /** + * repeated string cluster_ids = 5; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 5; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 5; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + private void initFields() { familyPath_ = java.util.Collections.emptyList(); assignSeqNum_ = false; fsToken_ = org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.DelegationToken.getDefaultInstance(); bulkToken_ = ""; + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -361,6 +423,9 @@ public final class SecureBulkLoadProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(4, getBulkTokenBytes()); } + for (int i = 0; i < clusterIds_.size(); i++) { + output.writeBytes(5, clusterIds_.getByteString(i)); + } getUnknownFields().writeTo(output); } @@ -386,6 +451,15 @@ public final class SecureBulkLoadProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, getBulkTokenBytes()); } + { + int dataSize = 0; + for (int i = 0; i < clusterIds_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(clusterIds_.getByteString(i)); + } + size += dataSize; + size += 1 * getClusterIdsList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -426,6 +500,8 @@ public final class SecureBulkLoadProtos { result = result && getBulkToken() .equals(other.getBulkToken()); } + result = result && getClusterIdsList() + .equals(other.getClusterIdsList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -455,6 +531,10 @@ public final class SecureBulkLoadProtos { hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER; hash = (53 * hash) + getBulkToken().hashCode(); } + if (getClusterIdsCount() > 0) { + hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER; + hash = (53 * hash) + getClusterIdsList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -582,6 +662,8 @@ public final class SecureBulkLoadProtos { bitField0_ = (bitField0_ & ~0x00000004); bulkToken_ = ""; bitField0_ = (bitField0_ & ~0x00000008); + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -635,6 +717,12 @@ public final class SecureBulkLoadProtos { to_bitField0_ |= 0x00000004; } result.bulkToken_ = bulkToken_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList( + clusterIds_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.clusterIds_ = clusterIds_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -688,6 +776,16 @@ public final class SecureBulkLoadProtos { bulkToken_ = other.bulkToken_; onChanged(); } + if (!other.clusterIds_.isEmpty()) { + if (clusterIds_.isEmpty()) { + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureClusterIdsIsMutable(); + clusterIds_.addAll(other.clusterIds_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1193,6 +1291,99 @@ public final class SecureBulkLoadProtos { return this; } + // repeated string cluster_ids = 5; + private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureClusterIdsIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_); + bitField0_ |= 0x00000010; + } + } + /** + * repeated string cluster_ids = 5; + */ + public java.util.List + getClusterIdsList() { + return java.util.Collections.unmodifiableList(clusterIds_); + } + /** + * repeated string cluster_ids = 5; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 5; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 5; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + /** + * repeated string cluster_ids = 5; + */ + public Builder setClusterIds( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addClusterIds( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addAllClusterIds( + java.lang.Iterable values) { + ensureClusterIdsIsMutable(); + super.addAll(values, clusterIds_); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder clearClusterIds() { + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addClusterIdsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.SecureBulkLoadHFilesRequest) } @@ -4858,30 +5049,30 @@ public final class SecureBulkLoadProtos { static { java.lang.String[] descriptorData = { "\n\024SecureBulkLoad.proto\022\010hbase.pb\032\013Table." + - "proto\032\013HBase.proto\032\014Client.proto\"\266\001\n\033Sec" + + "proto\032\013HBase.proto\032\014Client.proto\"\313\001\n\033Sec" + "ureBulkLoadHFilesRequest\022>\n\013family_path\030" + "\001 \003(\0132).hbase.pb.BulkLoadHFileRequest.Fa" + "milyPath\022\026\n\016assign_seq_num\030\002 \001(\010\022+\n\010fs_t" + "oken\030\003 \002(\0132\031.hbase.pb.DelegationToken\022\022\n" + - "\nbulk_token\030\004 \002(\t\".\n\034SecureBulkLoadHFile" + - "sResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" + - "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001" + - "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"A\n\026Pre", - "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" + - "\023.hbase.pb.TableName\"-\n\027PrepareBulkLoadR" + - "esponse\022\022\n\nbulk_token\030\001 \002(\t\",\n\026CleanupBu" + - "lkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\"\031\n\027Cle" + - "anupBulkLoadResponse2\256\002\n\025SecureBulkLoadS" + - "ervice\022V\n\017PrepareBulkLoad\022 .hbase.pb.Pre" + - "pareBulkLoadRequest\032!.hbase.pb.PrepareBu" + - "lkLoadResponse\022e\n\024SecureBulkLoadHFiles\022%" + - ".hbase.pb.SecureBulkLoadHFilesRequest\032&." + - "hbase.pb.SecureBulkLoadHFilesResponse\022V\n", - "\017CleanupBulkLoad\022 .hbase.pb.CleanupBulkL" + - "oadRequest\032!.hbase.pb.CleanupBulkLoadRes" + - "ponseBJ\n*org.apache.hadoop.hbase.protobu" + - "f.generatedB\024SecureBulkLoadProtosH\001\210\001\001\240\001" + - "\001" + "\nbulk_token\030\004 \002(\t\022\023\n\013cluster_ids\030\005 \003(\t\"." + + "\n\034SecureBulkLoadHFilesResponse\022\016\n\006loaded" + + "\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nidentifier\030" + + "\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n", + "\007service\030\004 \001(\t\"A\n\026PrepareBulkLoadRequest" + + "\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.TableNam" + + "e\"-\n\027PrepareBulkLoadResponse\022\022\n\nbulk_tok" + + "en\030\001 \002(\t\",\n\026CleanupBulkLoadRequest\022\022\n\nbu" + + "lk_token\030\001 \002(\t\"\031\n\027CleanupBulkLoadRespons" + + "e2\256\002\n\025SecureBulkLoadService\022V\n\017PrepareBu" + + "lkLoad\022 .hbase.pb.PrepareBulkLoadRequest" + + "\032!.hbase.pb.PrepareBulkLoadResponse\022e\n\024S" + + "ecureBulkLoadHFiles\022%.hbase.pb.SecureBul" + + "kLoadHFilesRequest\032&.hbase.pb.SecureBulk", + "LoadHFilesResponse\022V\n\017CleanupBulkLoad\022 ." + + "hbase.pb.CleanupBulkLoadRequest\032!.hbase." + + "pb.CleanupBulkLoadResponseBJ\n*org.apache" + + ".hadoop.hbase.protobuf.generatedB\024Secure" + + "BulkLoadProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4893,7 +5084,7 @@ public final class SecureBulkLoadProtos { internal_static_hbase_pb_SecureBulkLoadHFilesRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_SecureBulkLoadHFilesRequest_descriptor, - new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", }); + new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "ClusterIds", }); internal_static_hbase_pb_SecureBulkLoadHFilesResponse_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_hbase_pb_SecureBulkLoadHFilesResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 259a1ace3e3..728b5587773 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -9852,6 +9852,26 @@ public final class WALProtos { * required int64 bulkload_seq_num = 4; */ long getBulkloadSeqNum(); + + // repeated string cluster_ids = 5; + /** + * repeated string cluster_ids = 5; + */ + java.util.List + getClusterIdsList(); + /** + * repeated string cluster_ids = 5; + */ + int getClusterIdsCount(); + /** + * repeated string cluster_ids = 5; + */ + java.lang.String getClusterIds(int index); + /** + * repeated string cluster_ids = 5; + */ + com.google.protobuf.ByteString + getClusterIdsBytes(int index); } /** * Protobuf type {@code hbase.pb.BulkLoadDescriptor} @@ -9940,6 +9960,14 @@ public final class WALProtos { bulkloadSeqNum_ = input.readInt64(); break; } + case 42: { + if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000010; + } + clusterIds_.add(input.readBytes()); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -9951,6 +9979,9 @@ public final class WALProtos { if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) { stores_ = java.util.Collections.unmodifiableList(stores_); } + if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -10073,11 +10104,42 @@ public final class WALProtos { return bulkloadSeqNum_; } + // repeated string cluster_ids = 5; + public static final int CLUSTER_IDS_FIELD_NUMBER = 5; + private com.google.protobuf.LazyStringList clusterIds_; + /** + * repeated string cluster_ids = 5; + */ + public java.util.List + getClusterIdsList() { + return clusterIds_; + } + /** + * repeated string cluster_ids = 5; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 5; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 5; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + private void initFields() { tableName_ = org.apache.hadoop.hbase.protobuf.generated.TableProtos.TableName.getDefaultInstance(); encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; stores_ = java.util.Collections.emptyList(); bulkloadSeqNum_ = 0L; + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10125,6 +10187,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeInt64(4, bulkloadSeqNum_); } + for (int i = 0; i < clusterIds_.size(); i++) { + output.writeBytes(5, clusterIds_.getByteString(i)); + } getUnknownFields().writeTo(output); } @@ -10150,6 +10215,15 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeInt64Size(4, bulkloadSeqNum_); } + { + int dataSize = 0; + for (int i = 0; i < clusterIds_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(clusterIds_.getByteString(i)); + } + size += dataSize; + size += 1 * getClusterIdsList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10190,6 +10264,8 @@ public final class WALProtos { result = result && (getBulkloadSeqNum() == other.getBulkloadSeqNum()); } + result = result && getClusterIdsList() + .equals(other.getClusterIdsList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10219,6 +10295,10 @@ public final class WALProtos { hash = (37 * hash) + BULKLOAD_SEQ_NUM_FIELD_NUMBER; hash = (53 * hash) + hashLong(getBulkloadSeqNum()); } + if (getClusterIdsCount() > 0) { + hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER; + hash = (53 * hash) + getClusterIdsList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -10351,6 +10431,8 @@ public final class WALProtos { } bulkloadSeqNum_ = 0L; bitField0_ = (bitField0_ & ~0x00000008); + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -10404,6 +10486,12 @@ public final class WALProtos { to_bitField0_ |= 0x00000004; } result.bulkloadSeqNum_ = bulkloadSeqNum_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList( + clusterIds_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.clusterIds_ = clusterIds_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10455,6 +10543,16 @@ public final class WALProtos { if (other.hasBulkloadSeqNum()) { setBulkloadSeqNum(other.getBulkloadSeqNum()); } + if (!other.clusterIds_.isEmpty()) { + if (clusterIds_.isEmpty()) { + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureClusterIdsIsMutable(); + clusterIds_.addAll(other.clusterIds_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10930,6 +11028,99 @@ public final class WALProtos { return this; } + // repeated string cluster_ids = 5; + private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureClusterIdsIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_); + bitField0_ |= 0x00000010; + } + } + /** + * repeated string cluster_ids = 5; + */ + public java.util.List + getClusterIdsList() { + return java.util.Collections.unmodifiableList(clusterIds_); + } + /** + * repeated string cluster_ids = 5; + */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated string cluster_ids = 5; + */ + public java.lang.String getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated string cluster_ids = 5; + */ + public com.google.protobuf.ByteString + getClusterIdsBytes(int index) { + return clusterIds_.getByteString(index); + } + /** + * repeated string cluster_ids = 5; + */ + public Builder setClusterIds( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addClusterIds( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addAllClusterIds( + java.lang.Iterable values) { + ensureClusterIdsIsMutable(); + super.addAll(values, clusterIds_); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder clearClusterIds() { + clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + onChanged(); + return this; + } + /** + * repeated string cluster_ids = 5; + */ + public Builder addClusterIdsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadDescriptor) } @@ -13007,23 +13198,23 @@ public final class WALProtos { "LUSH\020\002\022\020\n\014CANNOT_FLUSH\020\003\"q\n\017StoreDescrip" + "tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" + "ir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003(\t\022\035\n\025store_fi" + - "le_size_bytes\030\004 \001(\004\"\237\001\n\022BulkLoadDescript" + + "le_size_bytes\030\004 \001(\004\"\264\001\n\022BulkLoadDescript" + "or\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.TableN" + "ame\022\033\n\023encoded_region_name\030\002 \002(\014\022)\n\006stor" + "es\030\003 \003(\0132\031.hbase.pb.StoreDescriptor\022\030\n\020b" + - "ulkload_seq_num\030\004 \002(\003\"\272\002\n\025RegionEventDes", - "criptor\022=\n\nevent_type\030\001 \002(\0162).hbase.pb.R" + - "egionEventDescriptor.EventType\022\022\n\ntable_" + - "name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022" + - "\033\n\023log_sequence_number\030\004 \001(\004\022)\n\006stores\030\005" + - " \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006serve" + - "r\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013region_" + - "name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN\020\000" + - "\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScop" + - "eType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030RE" + - "PLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.h", - "adoop.hbase.protobuf.generatedB\tWALProto" + - "sH\001\210\001\000\240\001\001" + "ulkload_seq_num\030\004 \002(\003\022\023\n\013cluster_ids\030\005 \003", + "(\t\"\272\002\n\025RegionEventDescriptor\022=\n\nevent_ty" + + "pe\030\001 \002(\0162).hbase.pb.RegionEventDescripto" + + "r.EventType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encod" + + "ed_region_name\030\003 \002(\014\022\033\n\023log_sequence_num" + + "ber\030\004 \001(\004\022)\n\006stores\030\005 \003(\0132\031.hbase.pb.Sto" + + "reDescriptor\022$\n\006server\030\006 \001(\0132\024.hbase.pb." + + "ServerName\022\023\n\013region_name\030\007 \001(\014\".\n\tEvent" + + "Type\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"" + + "\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICATIO" + + "N_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLO", + "BAL\020\001B?\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -13083,7 +13274,7 @@ public final class WALProtos { internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_BulkLoadDescriptor_descriptor, - new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", }); + new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", "ClusterIds", }); internal_static_hbase_pb_RegionEventDescriptor_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_hbase_pb_RegionEventDescriptor_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 83f4e25d844..5740669a610 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -373,6 +373,7 @@ message BulkLoadHFileRequest { required RegionSpecifier region = 1; repeated FamilyPath family_path = 2; optional bool assign_seq_num = 3; + repeated string cluster_ids = 4; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto index 12e7cf766c5..09b911fb346 100644 --- a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto +++ b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto @@ -32,6 +32,7 @@ message SecureBulkLoadHFilesRequest { optional bool assign_seq_num = 2; required DelegationToken fs_token = 3; required string bulk_token = 4; + repeated string cluster_ids = 5; } message SecureBulkLoadHFilesResponse { diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index ce5b2c66ab1..985ac93c2dd 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -150,6 +150,7 @@ message BulkLoadDescriptor { required bytes encoded_region_name = 2; repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; + repeated string cluster_ids = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 2945214c579..fea492fd289 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -139,6 +139,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private int nrThreads; private int depth = 2; + private List clusterIds = new ArrayList<>(); + private LoadIncrementalHFiles() {} public LoadIncrementalHFiles(Configuration conf) throws Exception { @@ -146,6 +148,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } + public void setClusterIds(List clusterIds) { + this.clusterIds = clusterIds; + } + public void setDepth(int depth) { this.depth = depth; } @@ -875,7 +881,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); if (!isSecureBulkLoadEndpointAvailable()) { - success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); + success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds, + clusterIds); } else { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(table); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index eb747fe51e5..6c27ad8e121 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5843,7 +5843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener) throws IOException { + BulkLoadListener bulkLoadListener, List clusterIds) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap(); @@ -6019,7 +6019,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, - storeFilesSizes, seqId); + storeFilesSizes, seqId, clusterIds); WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 573a64aa398..cdcdb1efc67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -461,7 +461,7 @@ public class HRegionServer extends HasThread implements /** * Unique identifier for the cluster we are a part of. */ - private String clusterId; + String clusterId; /** * MX Bean for RegionServerInfo diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1da3c8d9bbd..18c2f8ef517 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2212,6 +2212,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); + List clusterIds = new ArrayList(request.getClusterIdsList()); + if(clusterIds.contains(this.regionServer.clusterId)){ + return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); + } else { + clusterIds.add(this.regionServer.clusterId); + } try { checkOpen(); requestCount.increment(); @@ -2228,7 +2234,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean loaded = false; try { if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, clusterIds); } } finally { if (region.getCoprocessorHost() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6a06bd8481d..9654e675278 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -552,14 +552,15 @@ public interface Region extends ConfigurationObserver { * rows with multiple column families atomically. * * @param familyPaths List of Pair<byte[] column family, String hfilePath> + * @param assignSeqId * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded - * @param assignSeqId + * @param clusterIds * @return true if successful, false if failed recoverably * @throws IOException if failed unrecoverably. */ boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener) throws IOException; + BulkLoadListener bulkLoadListener, List clusterIds) throws IOException; /////////////////////////////////////////////////////////////////////////// // Coprocessors diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index fc9f3d1f903..1a26568bff2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -86,17 +86,19 @@ public class HFileReplicator { private ThreadPoolExecutor exec; private int maxCopyThreads; private int copiesPerThread; + private List sourceClusterIds; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, Configuration conf, - Connection connection) throws IOException { + Connection connection, List sourceClusterIds) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; this.bulkLoadHFileMap = tableQueueMap; this.conf = conf; this.connection = connection; + this.sourceClusterIds = sourceClusterIds; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -130,6 +132,7 @@ public class HFileReplicator { LoadIncrementalHFiles loadHFiles = null; try { loadHFiles = new LoadIncrementalHFiles(conf); + loadHFiles.setClusterIds(sourceClusterIds); } catch (Exception e) { LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" + " data.", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 30c7b0b74c9..9143f3dd805 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -157,9 +157,7 @@ public class ReplicationSink { Map, List>> rowMap = new TreeMap, List>>(); - // Map of table name Vs list of pair of family and list of hfile paths from its namespace - Map>>> bulkLoadHFileMap = null; - + Map, Map>>>> bulkLoadsPerClusters = null; for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -174,10 +172,19 @@ public class ReplicationSink { Cell cell = cells.current(); // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap>>>(); + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + if(bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + } + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -213,14 +220,26 @@ public class ReplicationSink { LOG.debug("Finished replicating mutations."); } - if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { - LOG.debug("Started replicating bulk loaded data."); - HFileReplicator hFileReplicator = - new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), + if(bulkLoadsPerClusters != null) { + for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { + Map>>> bulkLoadHFileMap = entry.getValue(); + if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { + if(LOG.isDebugEnabled()) { + LOG.debug("Started replicating bulk loaded data from cluster ids: " + + entry.getKey().toString()); + } + HFileReplicator hFileReplicator = + new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection()); - hFileReplicator.replicate(); - LOG.debug("Finished replicating bulk loaded data."); + getConnection(), entry.getKey()); + hFileReplicator.replicate(); + if(LOG.isDebugEnabled()) { + LOG.debug("Finished replicating bulk loaded data from cluster id: " + + entry.getKey().toString()); + } + } + } } int size = entries.size(); @@ -235,8 +254,7 @@ public class ReplicationSink { private void buildBulkLoadHFileMap( final Map>>> bulkLoadHFileMap, TableName table, - Cell cell) throws IOException { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + BulkLoadDescriptor bld) throws IOException { List storesList = bld.getStoresList(); int storesSize = storesList.size(); for (int j = 0; j < storesSize; j++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 2cd5c500566..37ca5943490 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -298,7 +298,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService @Override public void secureBulkLoadHFiles(RpcController controller, - SecureBulkLoadHFilesRequest request, + final SecureBulkLoadHFilesRequest request, RpcCallback done) { final List> familyPaths = new ArrayList>(); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { @@ -391,7 +391,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService //We call bulkLoadHFiles as requesting user //To enable access prior to staging return env.getRegion().bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf)); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getClusterIdsList()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 5649d8acb11..d71d0811263 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -122,14 +122,14 @@ public class TestBulkLoad { }; }); testRegionWithFamiliesAndSpecifiedTableName(tableName, family1) - .bulkLoadHFiles(familyPaths, false, null); + .bulkLoadHFiles(familyPaths, false, null, null); verify(log).sync(anyLong()); } @Test public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException { testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList>(), - false, null); + false, null, null); } @Test @@ -147,7 +147,7 @@ public class TestBulkLoad { return 01L; }; }); - testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); + testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null); verify(log).sync(anyLong()); } @@ -167,7 +167,7 @@ public class TestBulkLoad { }; }); testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2), - false, null); + false, null, null); verify(log).sync(anyLong()); } @@ -188,33 +188,33 @@ public class TestBulkLoad { }); TableName tableName = TableName.valueOf("test", "test"); testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) - .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); + .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null, null); verify(log).sync(anyLong()); } @Test(expected = DoNotRetryIOException.class) public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException { testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, - null); + null, null); } @Test(expected = DoNotRetryIOException.class) public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor() throws IOException { - testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null); + testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null); } @Test(expected = DoNotRetryIOException.class) public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException { testRegionWithFamilies() .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)), - false, null); + false, null, null); } @Test(expected = FileNotFoundException.class) public void shouldThrowErrorIfHFileDoesNotExist() throws IOException { List> list = asList(withMissingHFileForFamily(family1)); - testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null); + testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null, null); } private Pair withMissingHFileForFamily(byte[] family) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java new file mode 100644 index 00000000000..d8d4930033e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for bulk load replication. Defines three clusters, with the following + * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between + * 2 and 3). + * + * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file + * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication + * topology all these bulk loads should get replicated only once on each peer. To assert this, + * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the + * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying + * we are not entering the infinite loop condition addressed by HBASE-22380. + */ +@Category({ ReplicationTests.class, MediumTests.class}) +public class TestBulkLoadReplication extends TestReplicationBase { + + protected static final Logger LOG = + LoggerFactory.getLogger(TestBulkLoadReplication.class); + + private static final String PEER1_CLUSTER_ID = "peer1"; + private static final String PEER4_CLUSTER_ID = "peer4"; + private static final String PEER3_CLUSTER_ID = "peer3"; + + private static final String PEER_ID1 = "1"; + private static final String PEER_ID3 = "3"; + private static final String PEER_ID4 = "4"; + + private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static CountDownLatch BULK_LOAD_LATCH; + + private static HBaseTestingUtility utility3; + private static HBaseTestingUtility utility4; + private static Configuration conf3; + private static Configuration conf4; +// private static Table htable3; +// private static Table htable4; + + @Rule + public TestName name = new TestName(); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID); + conf3 = HBaseConfiguration.create(conf1); + setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + utility3 = new HBaseTestingUtility(conf3); + conf4 = HBaseConfiguration.create(conf1); + setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID); + conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4"); + utility4 = new HBaseTestingUtility(conf4); + TestReplicationBase.setUpBeforeClass(); + startCluster(utility3, conf3); + startCluster(utility4, conf4); + } + + private static void startCluster(HBaseTestingUtility util, Configuration configuration) + throws Exception { + LOG.info("Setup Zk to same one from utility1 and utility4"); + util.setZkCluster(utility1.getZkCluster()); + util.startMiniCluster(2); + + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + HColumnDescriptor columnDesc = new HColumnDescriptor(famName); + columnDesc.setScope(1); + tableDesc.addFamily(columnDesc); + + Connection connection = ConnectionFactory.createConnection(configuration); + try (Admin admin = connection.getAdmin()) { + admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + util.waitUntilAllRegionsAssigned(tableName); + } + + @Before + public void setUpBase() throws Exception { + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1); + ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4); + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3); + //adds cluster4 as a remote peer on cluster1 + getReplicationAdmin(utility1.getConfiguration()).addPeer(PEER_ID4, peer4Config); + //adds cluster1 as a remote peer on cluster4 + ReplicationAdmin admin4 = getReplicationAdmin(utility4.getConfiguration()); + admin4.addPeer(PEER_ID1, peer1Config); + //adds cluster3 as a remote peer on cluster4 + admin4.addPeer(PEER_ID3, peer3Config); + //adds cluster4 as a remote peer on cluster3 + getReplicationAdmin(utility3.getConfiguration()).addPeer(PEER_ID4, peer4Config); + setupCoprocessor(utility1); + setupCoprocessor(utility4); + setupCoprocessor(utility3); + } + + private ReplicationAdmin getReplicationAdmin(Configuration configuration) throws IOException { + return new ReplicationAdmin(configuration); + } + + private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(util.getClusterKey()); + return config; + } + + private void setupCoprocessor(HBaseTestingUtility cluster) throws IOException { + for(HRegion region : cluster.getHBaseCluster().getRegions(tableName)){ + region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, + 0, cluster.getConfiguration()); + } + } + + @After + public void tearDownBase() throws Exception { + getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID1); + getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID3); + getReplicationAdmin(utility3.getConfiguration()).removePeer(PEER_ID4); + } + + private static void setupBulkLoadConfigsForCluster(Configuration config, + String clusterReplicationId) throws Exception { + config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); + File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); + File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + + "/hbase-site.xml"); + config.writeXml(new FileOutputStream(sourceConfigFile)); + config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); + } + + @Test + public void testBulkLoadReplicationActiveActive() throws Exception { + Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName); + Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName); + byte[] row = Bytes.toBytes("001"); + byte[] value = Bytes.toBytes("v1"); + assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable); + row = Bytes.toBytes("002"); + value = Bytes.toBytes("v2"); + assertBulkLoadConditions(row, value, utility4, peer4TestTable, peer1TestTable, peer3TestTable); + row = Bytes.toBytes("003"); + value = Bytes.toBytes("v3"); + assertBulkLoadConditions(row, value, utility3, peer3TestTable, peer4TestTable, peer1TestTable); + //Additional wait to make sure no extra bulk load happens + Thread.sleep(400); + //We have 3 bulk load events (1 initiated on each cluster). + //Each event gets 3 counts (the originator cluster, plus the two peers), + //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. + assertEquals(9, BULK_LOADS_COUNT.get()); + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, + HBaseTestingUtility utility, Table...tables) throws Exception { + BULK_LOAD_LATCH = new CountDownLatch(3); + bulkLoadOnCluster(row, value, utility); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); + assertTableHasValue(tables[0], row, value); + assertTableHasValue(tables[1], row, value); + assertTableHasValue(tables[2], row, value); + } + + private void bulkLoadOnCluster(byte[] row, byte[] value, + HBaseTestingUtility cluster) throws Exception { + String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration()); + copyToHdfs(bulkLoadFile, cluster.getDFSCluster()); + LoadIncrementalHFiles bulkLoadHFilesTool = + new LoadIncrementalHFiles(cluster.getConfiguration()); + bulkLoadHFilesTool.run(new String[]{"/bulk_dir/region1/", tableName.getNameAsString()}); + } + + private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { + Path bulkLoadDir = new Path("/bulk_dir/region1/f"); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { + Get get = new Get(row); + Result result = table.get(get); + assertTrue(result.advance()); + assertEquals(Bytes.toString(value), Bytes.toString(result.value())); + } + + private String createHFileForFamilies(byte[] row, byte[] value, + Configuration clusterConfig) throws IOException { + final KeyValue kv = new KeyValue(row, famName, Bytes.toBytes("1"), System.currentTimeMillis(), + KeyValue.Type.Put, value); + final HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); + // TODO We need a way to do this without creating files + final File hFileLocation = testFolder.newFile(); + final FSDataOutputStream out = + new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(kv); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + public static class BulkReplicationTestObserver extends BaseRegionObserver { + + @Override + public void preBulkLoadHFile(ObserverContext ctx, + List> familyPaths) throws IOException { + BULK_LOADS_COUNT.incrementAndGet(); + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> familyPaths, boolean hasLoaded) throws IOException { + if(hasLoaded) { + BULK_LOAD_LATCH.countDown(); + } + return true; + } + + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 397387a1b85..02009c011e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1480,7 +1480,7 @@ public class TestHRegionReplayEvents { randomValues))); expectedLoadFileCount++; } - primaryRegion.bulkLoadHFiles(familyPaths, false, null); + primaryRegion.bulkLoadHFiles(familyPaths, false, null, null); // now replay the edits and the bulk load marker reader = createWALReaderForPrimary(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 018bab6ee36..c571eb85c41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -361,7 +361,7 @@ public class TestWALReplay { Bytes.toBytes("z"), 10); List > hfs= new ArrayList>(1); hfs.add(Pair.newPair(family, f.toString())); - region.bulkLoadHFiles(hfs, true, null); + region.bulkLoadHFiles(hfs, true, null, null); // Add an edit so something in the WAL byte [] row = tableName.getName(); @@ -435,7 +435,7 @@ public class TestWALReplay { Bytes.toBytes(i + "50"), 10); hfs.add(Pair.newPair(family, f.toString())); } - region.bulkLoadHFiles(hfs, true, null); + region.bulkLoadHFiles(hfs, true, null, null); final int rowsInsertedCount = 31; assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));