HBASE-23101 Backport HBASE-22380 to branch-1
Fixes #680 Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
dd9eadb00f
commit
0ffbf9c759
|
@ -117,10 +117,24 @@ public class SecureBulkLoadClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* @param familyPaths
|
||||
* @param userToken
|
||||
* @param bulkToken
|
||||
* @param startRow
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
|
||||
final Token<?> userToken,
|
||||
final String bulkToken,
|
||||
final byte[] startRow) throws IOException {
|
||||
final Token<?> userToken, final String bulkToken,
|
||||
final byte[] startRow) throws IOException {
|
||||
return this.bulkLoadHFiles(familyPaths, userToken, bulkToken, startRow, null);
|
||||
}
|
||||
|
||||
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
|
||||
final Token<?> userToken, final String bulkToken,
|
||||
final byte[] startRow, List<String> clusterIds) throws IOException {
|
||||
// we never want to send a batch of HFiles to all regions, thus cannot call
|
||||
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
|
||||
try {
|
||||
|
@ -151,6 +165,7 @@ public class SecureBulkLoadClient {
|
|||
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
|
||||
.setFsToken(protoDT)
|
||||
.addAllFamilyPath(protoFamilyPaths)
|
||||
.addAllClusterIds(clusterIds != null ? clusterIds : new ArrayList<String>())
|
||||
.setBulkToken(bulkToken).build();
|
||||
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
|
|
|
@ -1807,12 +1807,31 @@ public final class ProtobufUtil {
|
|||
* @param assignSeqNum
|
||||
* @return true if all are loaded
|
||||
* @throws IOException
|
||||
* @deprecated use bulkLoadHFile(final ClientService.BlockingInterface client,
|
||||
* final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
|
||||
* List<String> clusterIds) instead.
|
||||
*/
|
||||
public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
|
||||
final List<Pair<byte[], String>> familyPaths,
|
||||
final byte[] regionName, boolean assignSeqNum) throws IOException {
|
||||
return bulkLoadHFile(client, familyPaths, regionName, assignSeqNum, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper to bulk load a list of HFiles using client protocol.
|
||||
*
|
||||
* @param client
|
||||
* @param familyPaths
|
||||
* @param regionName
|
||||
* @param assignSeqNum
|
||||
* @return true if all are loaded
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
|
||||
final List<Pair<byte[], String>> familyPaths,
|
||||
final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) throws IOException {
|
||||
BulkLoadHFileRequest request =
|
||||
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
|
||||
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, clusterIds);
|
||||
try {
|
||||
BulkLoadHFileResponse response =
|
||||
client.bulkLoadHFile(null, request);
|
||||
|
@ -3358,6 +3377,10 @@ public final class ProtobufUtil {
|
|||
* Generates a marker for the WAL so that we propagate the notion of a bulk region load
|
||||
* throughout the WAL.
|
||||
*
|
||||
* @deprecated use toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName,
|
||||
* Map<byte[], List<Path>> storeFiles, Map<String, Long> storeFilesSize, long bulkloadSeqId,
|
||||
* List<String> clusterIds) instead.
|
||||
*
|
||||
* @param tableName The tableName into which the bulk load is being imported into.
|
||||
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
|
||||
* @param storeFiles A set of store files of a column family are bulk loaded.
|
||||
|
@ -3367,17 +3390,42 @@ public final class ProtobufUtil {
|
|||
* @return The WAL log marker for bulk loads.
|
||||
*/
|
||||
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
|
||||
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
|
||||
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
|
||||
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
|
||||
storeFilesSize, bulkloadSeqId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a marker for the WAL so that we propagate the notion of a bulk region load
|
||||
* throughout the WAL, keeping track of clusters who already applied the bulk event via
|
||||
* the passed clusterIds parameter.
|
||||
*
|
||||
* @param tableName The tableName into which the bulk load is being imported into.
|
||||
* @param encodedRegionName Encoded region name of the region which is being bulk loaded.
|
||||
* @param storeFiles A set of store files of a column family are bulk loaded.
|
||||
* @param storeFilesSize Map of store files and their lengths
|
||||
* @param bulkloadSeqId sequence ID (by a force flush) used to create bulk load hfile name
|
||||
* @param clusterIds The list of cluster Ids with the clusters where the bulk even had
|
||||
* already been processed.
|
||||
*
|
||||
* @return The WAL log marker for bulk loads.
|
||||
*/
|
||||
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
|
||||
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
|
||||
BulkLoadDescriptor.Builder desc =
|
||||
BulkLoadDescriptor.newBuilder()
|
||||
BulkLoadDescriptor.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName))
|
||||
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
|
||||
if(clusterIds != null) {
|
||||
desc.addAllClusterIds(clusterIds);
|
||||
}
|
||||
|
||||
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
|
||||
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
|
||||
.setFamilyName(ByteStringer.wrap(entry.getKey()))
|
||||
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
|
||||
.setFamilyName(ByteStringer.wrap(entry.getKey()))
|
||||
.setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
|
||||
for (Path path : entry.getValue()) {
|
||||
String name = path.getName();
|
||||
builder.addStoreFile(name);
|
||||
|
|
|
@ -561,14 +561,32 @@ public final class RequestConverter {
|
|||
/**
|
||||
* Create a protocol buffer bulk load request
|
||||
*
|
||||
* @deprecated use buildBulkLoadHFileRequest(final List<Pair<byte[], String>> familyPaths,
|
||||
* final byte[] regionName, boolean assignSeqNum, List<String> clusterIds)
|
||||
*
|
||||
* @param familyPaths
|
||||
* @param regionName
|
||||
* @param assignSeqNum
|
||||
* @return a bulk load request
|
||||
*/
|
||||
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
|
||||
final List<Pair<byte[], String>> familyPaths,
|
||||
final byte[] regionName, boolean assignSeqNum) {
|
||||
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer bulk load request
|
||||
*
|
||||
* @param familyPaths
|
||||
* @param regionName
|
||||
* @param assignSeqNum
|
||||
* @param clusterIds
|
||||
* @return a bulk load request
|
||||
*/
|
||||
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
|
||||
final List<Pair<byte[], String>> familyPaths,
|
||||
final byte[] regionName, boolean assignSeqNum) {
|
||||
final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) {
|
||||
BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
|
||||
RegionSpecifier region = buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, regionName);
|
||||
|
@ -579,6 +597,9 @@ public final class RequestConverter {
|
|||
familyPathBuilder.setPath(familyPath.getSecond());
|
||||
builder.addFamilyPath(familyPathBuilder.build());
|
||||
}
|
||||
if(clusterIds!=null) {
|
||||
builder.addAllClusterIds(clusterIds);
|
||||
}
|
||||
builder.setAssignSeqNum(assignSeqNum);
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -22843,6 +22843,26 @@ public final class ClientProtos {
|
|||
* <code>optional bool assign_seq_num = 3;</code>
|
||||
*/
|
||||
boolean getAssignSeqNum();
|
||||
|
||||
// repeated string cluster_ids = 4;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
java.util.List<java.lang.String>
|
||||
getClusterIdsList();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
int getClusterIdsCount();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
java.lang.String getClusterIds(int index);
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index);
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
|
||||
|
@ -22927,6 +22947,14 @@ public final class ClientProtos {
|
|||
assignSeqNum_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
case 34: {
|
||||
if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList();
|
||||
mutable_bitField0_ |= 0x00000008;
|
||||
}
|
||||
clusterIds_.add(input.readBytes());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -22938,6 +22966,9 @@ public final class ClientProtos {
|
|||
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
familyPath_ = java.util.Collections.unmodifiableList(familyPath_);
|
||||
}
|
||||
if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
|
||||
}
|
||||
this.unknownFields = unknownFields.build();
|
||||
makeExtensionsImmutable();
|
||||
}
|
||||
|
@ -23662,10 +23693,41 @@ public final class ClientProtos {
|
|||
return assignSeqNum_;
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 4;
|
||||
public static final int CLUSTER_IDS_FIELD_NUMBER = 4;
|
||||
private com.google.protobuf.LazyStringList clusterIds_;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return clusterIds_;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
|
||||
familyPath_ = java.util.Collections.emptyList();
|
||||
assignSeqNum_ = false;
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -23702,6 +23764,9 @@ public final class ClientProtos {
|
|||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeBool(3, assignSeqNum_);
|
||||
}
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
output.writeBytes(4, clusterIds_.getByteString(i));
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -23723,6 +23788,15 @@ public final class ClientProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(3, assignSeqNum_);
|
||||
}
|
||||
{
|
||||
int dataSize = 0;
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
dataSize += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSizeNoTag(clusterIds_.getByteString(i));
|
||||
}
|
||||
size += dataSize;
|
||||
size += 1 * getClusterIdsList().size();
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -23758,6 +23832,8 @@ public final class ClientProtos {
|
|||
result = result && (getAssignSeqNum()
|
||||
== other.getAssignSeqNum());
|
||||
}
|
||||
result = result && getClusterIdsList()
|
||||
.equals(other.getClusterIdsList());
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -23783,6 +23859,10 @@ public final class ClientProtos {
|
|||
hash = (37 * hash) + ASSIGN_SEQ_NUM_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getAssignSeqNum());
|
||||
}
|
||||
if (getClusterIdsCount() > 0) {
|
||||
hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getClusterIdsList().hashCode();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -23914,6 +23994,8 @@ public final class ClientProtos {
|
|||
}
|
||||
assignSeqNum_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -23963,6 +24045,12 @@ public final class ClientProtos {
|
|||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.assignSeqNum_ = assignSeqNum_;
|
||||
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
|
||||
clusterIds_);
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
}
|
||||
result.clusterIds_ = clusterIds_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -24011,6 +24099,16 @@ public final class ClientProtos {
|
|||
if (other.hasAssignSeqNum()) {
|
||||
setAssignSeqNum(other.getAssignSeqNum());
|
||||
}
|
||||
if (!other.clusterIds_.isEmpty()) {
|
||||
if (clusterIds_.isEmpty()) {
|
||||
clusterIds_ = other.clusterIds_;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
} else {
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.addAll(other.clusterIds_);
|
||||
}
|
||||
onChanged();
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -24442,6 +24540,99 @@ public final class ClientProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 4;
|
||||
private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
private void ensureClusterIdsIsMutable() {
|
||||
if (!((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
|
||||
bitField0_ |= 0x00000008;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return java.util.Collections.unmodifiableList(clusterIds_);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public Builder setClusterIds(
|
||||
int index, java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.set(index, value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public Builder addClusterIds(
|
||||
java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public Builder addAllClusterIds(
|
||||
java.lang.Iterable<java.lang.String> values) {
|
||||
ensureClusterIdsIsMutable();
|
||||
super.addAll(values, clusterIds_);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public Builder clearClusterIds() {
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 4;</code>
|
||||
*/
|
||||
public Builder addClusterIdsBytes(
|
||||
com.google.protobuf.ByteString value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
|
||||
}
|
||||
|
||||
|
@ -37378,66 +37569,66 @@ public final class ClientProtos {
|
|||
"(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014scan_me" +
|
||||
"trics\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mv" +
|
||||
"cc_read_point\030\013 \001(\004:\0010\022 \n\006cursor\030\014 \001(\0132\020" +
|
||||
".hbase.pb.Cursor\"\305\001\n\024BulkLoadHFileReques" +
|
||||
".hbase.pb.Cursor\"\332\001\n\024BulkLoadHFileReques" +
|
||||
"t\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpeci" +
|
||||
"fier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bul" +
|
||||
"kLoadHFileRequest.FamilyPath\022\026\n\016assign_s" +
|
||||
"eq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" +
|
||||
"(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
|
||||
"e\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServiceC",
|
||||
"all\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023" +
|
||||
"\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030" +
|
||||
"CoprocessorServiceResult\022&\n\005value\030\001 \001(\0132" +
|
||||
"\027.hbase.pb.NameBytesPair\"v\n\031CoprocessorS" +
|
||||
"erviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
|
||||
".RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.p" +
|
||||
"b.CoprocessorServiceCall\"o\n\032CoprocessorS" +
|
||||
"erviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.p" +
|
||||
"b.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase" +
|
||||
".pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 ",
|
||||
"\001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutatio" +
|
||||
"nProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014se" +
|
||||
"rvice_call\030\004 \001(\0132 .hbase.pb.CoprocessorS" +
|
||||
"erviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002" +
|
||||
"(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030" +
|
||||
"\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"" +
|
||||
"c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005" +
|
||||
":\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compact" +
|
||||
"ionPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadS" +
|
||||
"tats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSp",
|
||||
"ecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionL" +
|
||||
"oadStats\"\336\001\n\021ResultOrException\022\r\n\005index\030" +
|
||||
"\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022" +
|
||||
"*\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesP" +
|
||||
"air\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.C" +
|
||||
"oprocessorServiceResult\0220\n\tloadStats\030\005 \001" +
|
||||
"(\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Reg" +
|
||||
"ionActionResult\0226\n\021resultOrException\030\001 \003" +
|
||||
"(\0132\033.hbase.pb.ResultOrException\022*\n\texcep" +
|
||||
"tion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014M",
|
||||
"ultiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbas" +
|
||||
"e.pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n" +
|
||||
"\tcondition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001" +
|
||||
"\n\rMultiResponse\0228\n\022regionActionResult\030\001 " +
|
||||
"\003(\0132\034.hbase.pb.RegionActionResult\022\021\n\tpro" +
|
||||
"cessed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036" +
|
||||
".hbase.pb.MultiRegionLoadStats*\'\n\013Consis" +
|
||||
"tency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClie" +
|
||||
"ntService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025" +
|
||||
".hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.",
|
||||
"pb.MutateRequest\032\030.hbase.pb.MutateRespon" +
|
||||
"se\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbas" +
|
||||
"e.pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hba" +
|
||||
"se.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bu" +
|
||||
"lkLoadHFileResponse\022X\n\013ExecService\022#.hba" +
|
||||
"se.pb.CoprocessorServiceRequest\032$.hbase." +
|
||||
"pb.CoprocessorServiceResponse\022d\n\027ExecReg" +
|
||||
"ionServerService\022#.hbase.pb.CoprocessorS" +
|
||||
"erviceRequest\032$.hbase.pb.CoprocessorServ" +
|
||||
"iceResponse\0228\n\005Multi\022\026.hbase.pb.MultiReq",
|
||||
"uest\032\027.hbase.pb.MultiResponseBB\n*org.apa" +
|
||||
"che.hadoop.hbase.protobuf.generatedB\014Cli" +
|
||||
"entProtosH\001\210\001\001\240\001\001"
|
||||
"eq_num\030\003 \001(\010\022\023\n\013cluster_ids\030\004 \003(\t\032*\n\nFam" +
|
||||
"ilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n" +
|
||||
"\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a",
|
||||
"\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n" +
|
||||
"\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t" +
|
||||
"\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServiceR" +
|
||||
"esult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.NameByte" +
|
||||
"sPair\"v\n\031CoprocessorServiceRequest\022)\n\006re" +
|
||||
"gion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022.\n" +
|
||||
"\004call\030\002 \002(\0132 .hbase.pb.CoprocessorServic" +
|
||||
"eCall\"o\n\032CoprocessorServiceResponse\022)\n\006r" +
|
||||
"egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022&" +
|
||||
"\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226",
|
||||
"\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001" +
|
||||
"(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003 \001(\013" +
|
||||
"2\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(\0132 ." +
|
||||
"hbase.pb.CoprocessorServiceCall\"k\n\014Regio" +
|
||||
"nAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Regio" +
|
||||
"nSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003" +
|
||||
"(\0132\020.hbase.pb.Action\"c\n\017RegionLoadStats\022" +
|
||||
"\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupanc" +
|
||||
"y\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\001" +
|
||||
"0\"j\n\024MultiRegionLoadStats\022)\n\006region\030\001 \003(",
|
||||
"\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat\030\002 \003" +
|
||||
"(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Result" +
|
||||
"OrException\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001" +
|
||||
"(\0132\020.hbase.pb.Result\022*\n\texception\030\003 \001(\0132" +
|
||||
"\027.hbase.pb.NameBytesPair\022:\n\016service_resu" +
|
||||
"lt\030\004 \001(\0132\".hbase.pb.CoprocessorServiceRe" +
|
||||
"sult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.Regio" +
|
||||
"nLoadStatsB\002\030\001\"x\n\022RegionActionResult\0226\n\021" +
|
||||
"resultOrException\030\001 \003(\0132\033.hbase.pb.Resul" +
|
||||
"tOrException\022*\n\texception\030\002 \001(\0132\027.hbase.",
|
||||
"pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014regi" +
|
||||
"onAction\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022" +
|
||||
"\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023." +
|
||||
"hbase.pb.Condition\"\226\001\n\rMultiResponse\0228\n\022" +
|
||||
"regionActionResult\030\001 \003(\0132\034.hbase.pb.Regi" +
|
||||
"onActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020reg" +
|
||||
"ionStatistics\030\003 \001(\0132\036.hbase.pb.MultiRegi" +
|
||||
"onLoadStats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014" +
|
||||
"\n\010TIMELINE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024." +
|
||||
"hbase.pb.GetRequest\032\025.hbase.pb.GetRespon",
|
||||
"se\022;\n\006Mutate\022\027.hbase.pb.MutateRequest\032\030." +
|
||||
"hbase.pb.MutateResponse\0225\n\004Scan\022\025.hbase." +
|
||||
"pb.ScanRequest\032\026.hbase.pb.ScanResponse\022P" +
|
||||
"\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadHFile" +
|
||||
"Request\032\037.hbase.pb.BulkLoadHFileResponse" +
|
||||
"\022X\n\013ExecService\022#.hbase.pb.CoprocessorSe" +
|
||||
"rviceRequest\032$.hbase.pb.CoprocessorServi" +
|
||||
"ceResponse\022d\n\027ExecRegionServerService\022#." +
|
||||
"hbase.pb.CoprocessorServiceRequest\032$.hba" +
|
||||
"se.pb.CoprocessorServiceResponse\0228\n\005Mult",
|
||||
"i\022\026.hbase.pb.MultiRequest\032\027.hbase.pb.Mul" +
|
||||
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
|
||||
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -37551,7 +37742,7 @@ public final class ClientProtos {
|
|||
internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
|
||||
new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", });
|
||||
new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "ClusterIds", });
|
||||
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
|
||||
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
|
||||
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new
|
||||
|
|
|
@ -74,6 +74,26 @@ public final class SecureBulkLoadProtos {
|
|||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getBulkTokenBytes();
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
java.util.List<java.lang.String>
|
||||
getClusterIdsList();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
int getClusterIdsCount();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
java.lang.String getClusterIds(int index);
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index);
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.SecureBulkLoadHFilesRequest}
|
||||
|
@ -157,6 +177,14 @@ public final class SecureBulkLoadProtos {
|
|||
bulkToken_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
case 42: {
|
||||
if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList();
|
||||
mutable_bitField0_ |= 0x00000010;
|
||||
}
|
||||
clusterIds_.add(input.readBytes());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -168,6 +196,9 @@ public final class SecureBulkLoadProtos {
|
|||
if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
familyPath_ = java.util.Collections.unmodifiableList(familyPath_);
|
||||
}
|
||||
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
|
||||
}
|
||||
this.unknownFields = unknownFields.build();
|
||||
makeExtensionsImmutable();
|
||||
}
|
||||
|
@ -317,11 +348,42 @@ public final class SecureBulkLoadProtos {
|
|||
}
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
public static final int CLUSTER_IDS_FIELD_NUMBER = 5;
|
||||
private com.google.protobuf.LazyStringList clusterIds_;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return clusterIds_;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
familyPath_ = java.util.Collections.emptyList();
|
||||
assignSeqNum_ = false;
|
||||
fsToken_ = org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.DelegationToken.getDefaultInstance();
|
||||
bulkToken_ = "";
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -361,6 +423,9 @@ public final class SecureBulkLoadProtos {
|
|||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeBytes(4, getBulkTokenBytes());
|
||||
}
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
output.writeBytes(5, clusterIds_.getByteString(i));
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -386,6 +451,15 @@ public final class SecureBulkLoadProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(4, getBulkTokenBytes());
|
||||
}
|
||||
{
|
||||
int dataSize = 0;
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
dataSize += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSizeNoTag(clusterIds_.getByteString(i));
|
||||
}
|
||||
size += dataSize;
|
||||
size += 1 * getClusterIdsList().size();
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -426,6 +500,8 @@ public final class SecureBulkLoadProtos {
|
|||
result = result && getBulkToken()
|
||||
.equals(other.getBulkToken());
|
||||
}
|
||||
result = result && getClusterIdsList()
|
||||
.equals(other.getClusterIdsList());
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -455,6 +531,10 @@ public final class SecureBulkLoadProtos {
|
|||
hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getBulkToken().hashCode();
|
||||
}
|
||||
if (getClusterIdsCount() > 0) {
|
||||
hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getClusterIdsList().hashCode();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -582,6 +662,8 @@ public final class SecureBulkLoadProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
bulkToken_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -635,6 +717,12 @@ public final class SecureBulkLoadProtos {
|
|||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.bulkToken_ = bulkToken_;
|
||||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
|
||||
clusterIds_);
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
}
|
||||
result.clusterIds_ = clusterIds_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -688,6 +776,16 @@ public final class SecureBulkLoadProtos {
|
|||
bulkToken_ = other.bulkToken_;
|
||||
onChanged();
|
||||
}
|
||||
if (!other.clusterIds_.isEmpty()) {
|
||||
if (clusterIds_.isEmpty()) {
|
||||
clusterIds_ = other.clusterIds_;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
} else {
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.addAll(other.clusterIds_);
|
||||
}
|
||||
onChanged();
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -1193,6 +1291,99 @@ public final class SecureBulkLoadProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
private void ensureClusterIdsIsMutable() {
|
||||
if (!((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
|
||||
bitField0_ |= 0x00000010;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return java.util.Collections.unmodifiableList(clusterIds_);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder setClusterIds(
|
||||
int index, java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.set(index, value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addClusterIds(
|
||||
java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addAllClusterIds(
|
||||
java.lang.Iterable<java.lang.String> values) {
|
||||
ensureClusterIdsIsMutable();
|
||||
super.addAll(values, clusterIds_);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder clearClusterIds() {
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addClusterIdsBytes(
|
||||
com.google.protobuf.ByteString value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.SecureBulkLoadHFilesRequest)
|
||||
}
|
||||
|
||||
|
@ -4858,30 +5049,30 @@ public final class SecureBulkLoadProtos {
|
|||
static {
|
||||
java.lang.String[] descriptorData = {
|
||||
"\n\024SecureBulkLoad.proto\022\010hbase.pb\032\013Table." +
|
||||
"proto\032\013HBase.proto\032\014Client.proto\"\266\001\n\033Sec" +
|
||||
"proto\032\013HBase.proto\032\014Client.proto\"\313\001\n\033Sec" +
|
||||
"ureBulkLoadHFilesRequest\022>\n\013family_path\030" +
|
||||
"\001 \003(\0132).hbase.pb.BulkLoadHFileRequest.Fa" +
|
||||
"milyPath\022\026\n\016assign_seq_num\030\002 \001(\010\022+\n\010fs_t" +
|
||||
"oken\030\003 \002(\0132\031.hbase.pb.DelegationToken\022\022\n" +
|
||||
"\nbulk_token\030\004 \002(\t\".\n\034SecureBulkLoadHFile" +
|
||||
"sResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
|
||||
"oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001" +
|
||||
"(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"A\n\026Pre",
|
||||
"pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
|
||||
"\023.hbase.pb.TableName\"-\n\027PrepareBulkLoadR" +
|
||||
"esponse\022\022\n\nbulk_token\030\001 \002(\t\",\n\026CleanupBu" +
|
||||
"lkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\"\031\n\027Cle" +
|
||||
"anupBulkLoadResponse2\256\002\n\025SecureBulkLoadS" +
|
||||
"ervice\022V\n\017PrepareBulkLoad\022 .hbase.pb.Pre" +
|
||||
"pareBulkLoadRequest\032!.hbase.pb.PrepareBu" +
|
||||
"lkLoadResponse\022e\n\024SecureBulkLoadHFiles\022%" +
|
||||
".hbase.pb.SecureBulkLoadHFilesRequest\032&." +
|
||||
"hbase.pb.SecureBulkLoadHFilesResponse\022V\n",
|
||||
"\017CleanupBulkLoad\022 .hbase.pb.CleanupBulkL" +
|
||||
"oadRequest\032!.hbase.pb.CleanupBulkLoadRes" +
|
||||
"ponseBJ\n*org.apache.hadoop.hbase.protobu" +
|
||||
"f.generatedB\024SecureBulkLoadProtosH\001\210\001\001\240\001" +
|
||||
"\001"
|
||||
"\nbulk_token\030\004 \002(\t\022\023\n\013cluster_ids\030\005 \003(\t\"." +
|
||||
"\n\034SecureBulkLoadHFilesResponse\022\016\n\006loaded" +
|
||||
"\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nidentifier\030" +
|
||||
"\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n",
|
||||
"\007service\030\004 \001(\t\"A\n\026PrepareBulkLoadRequest" +
|
||||
"\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.TableNam" +
|
||||
"e\"-\n\027PrepareBulkLoadResponse\022\022\n\nbulk_tok" +
|
||||
"en\030\001 \002(\t\",\n\026CleanupBulkLoadRequest\022\022\n\nbu" +
|
||||
"lk_token\030\001 \002(\t\"\031\n\027CleanupBulkLoadRespons" +
|
||||
"e2\256\002\n\025SecureBulkLoadService\022V\n\017PrepareBu" +
|
||||
"lkLoad\022 .hbase.pb.PrepareBulkLoadRequest" +
|
||||
"\032!.hbase.pb.PrepareBulkLoadResponse\022e\n\024S" +
|
||||
"ecureBulkLoadHFiles\022%.hbase.pb.SecureBul" +
|
||||
"kLoadHFilesRequest\032&.hbase.pb.SecureBulk",
|
||||
"LoadHFilesResponse\022V\n\017CleanupBulkLoad\022 ." +
|
||||
"hbase.pb.CleanupBulkLoadRequest\032!.hbase." +
|
||||
"pb.CleanupBulkLoadResponseBJ\n*org.apache" +
|
||||
".hadoop.hbase.protobuf.generatedB\024Secure" +
|
||||
"BulkLoadProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -4893,7 +5084,7 @@ public final class SecureBulkLoadProtos {
|
|||
internal_static_hbase_pb_SecureBulkLoadHFilesRequest_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_SecureBulkLoadHFilesRequest_descriptor,
|
||||
new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
|
||||
new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "ClusterIds", });
|
||||
internal_static_hbase_pb_SecureBulkLoadHFilesResponse_descriptor =
|
||||
getDescriptor().getMessageTypes().get(1);
|
||||
internal_static_hbase_pb_SecureBulkLoadHFilesResponse_fieldAccessorTable = new
|
||||
|
|
|
@ -9852,6 +9852,26 @@ public final class WALProtos {
|
|||
* <code>required int64 bulkload_seq_num = 4;</code>
|
||||
*/
|
||||
long getBulkloadSeqNum();
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
java.util.List<java.lang.String>
|
||||
getClusterIdsList();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
int getClusterIdsCount();
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
java.lang.String getClusterIds(int index);
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index);
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.BulkLoadDescriptor}
|
||||
|
@ -9940,6 +9960,14 @@ public final class WALProtos {
|
|||
bulkloadSeqNum_ = input.readInt64();
|
||||
break;
|
||||
}
|
||||
case 42: {
|
||||
if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList();
|
||||
mutable_bitField0_ |= 0x00000010;
|
||||
}
|
||||
clusterIds_.add(input.readBytes());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -9951,6 +9979,9 @@ public final class WALProtos {
|
|||
if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
stores_ = java.util.Collections.unmodifiableList(stores_);
|
||||
}
|
||||
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
|
||||
}
|
||||
this.unknownFields = unknownFields.build();
|
||||
makeExtensionsImmutable();
|
||||
}
|
||||
|
@ -10073,11 +10104,42 @@ public final class WALProtos {
|
|||
return bulkloadSeqNum_;
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
public static final int CLUSTER_IDS_FIELD_NUMBER = 5;
|
||||
private com.google.protobuf.LazyStringList clusterIds_;
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return clusterIds_;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
tableName_ = org.apache.hadoop.hbase.protobuf.generated.TableProtos.TableName.getDefaultInstance();
|
||||
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
|
||||
stores_ = java.util.Collections.emptyList();
|
||||
bulkloadSeqNum_ = 0L;
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -10125,6 +10187,9 @@ public final class WALProtos {
|
|||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeInt64(4, bulkloadSeqNum_);
|
||||
}
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
output.writeBytes(5, clusterIds_.getByteString(i));
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -10150,6 +10215,15 @@ public final class WALProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeInt64Size(4, bulkloadSeqNum_);
|
||||
}
|
||||
{
|
||||
int dataSize = 0;
|
||||
for (int i = 0; i < clusterIds_.size(); i++) {
|
||||
dataSize += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSizeNoTag(clusterIds_.getByteString(i));
|
||||
}
|
||||
size += dataSize;
|
||||
size += 1 * getClusterIdsList().size();
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -10190,6 +10264,8 @@ public final class WALProtos {
|
|||
result = result && (getBulkloadSeqNum()
|
||||
== other.getBulkloadSeqNum());
|
||||
}
|
||||
result = result && getClusterIdsList()
|
||||
.equals(other.getClusterIdsList());
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -10219,6 +10295,10 @@ public final class WALProtos {
|
|||
hash = (37 * hash) + BULKLOAD_SEQ_NUM_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getBulkloadSeqNum());
|
||||
}
|
||||
if (getClusterIdsCount() > 0) {
|
||||
hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getClusterIdsList().hashCode();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -10351,6 +10431,8 @@ public final class WALProtos {
|
|||
}
|
||||
bulkloadSeqNum_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -10404,6 +10486,12 @@ public final class WALProtos {
|
|||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.bulkloadSeqNum_ = bulkloadSeqNum_;
|
||||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
|
||||
clusterIds_);
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
}
|
||||
result.clusterIds_ = clusterIds_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -10455,6 +10543,16 @@ public final class WALProtos {
|
|||
if (other.hasBulkloadSeqNum()) {
|
||||
setBulkloadSeqNum(other.getBulkloadSeqNum());
|
||||
}
|
||||
if (!other.clusterIds_.isEmpty()) {
|
||||
if (clusterIds_.isEmpty()) {
|
||||
clusterIds_ = other.clusterIds_;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
} else {
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.addAll(other.clusterIds_);
|
||||
}
|
||||
onChanged();
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -10930,6 +11028,99 @@ public final class WALProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// repeated string cluster_ids = 5;
|
||||
private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
private void ensureClusterIdsIsMutable() {
|
||||
if (!((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
|
||||
bitField0_ |= 0x00000010;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.util.List<java.lang.String>
|
||||
getClusterIdsList() {
|
||||
return java.util.Collections.unmodifiableList(clusterIds_);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public int getClusterIdsCount() {
|
||||
return clusterIds_.size();
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public java.lang.String getClusterIds(int index) {
|
||||
return clusterIds_.get(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getClusterIdsBytes(int index) {
|
||||
return clusterIds_.getByteString(index);
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder setClusterIds(
|
||||
int index, java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.set(index, value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addClusterIds(
|
||||
java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addAllClusterIds(
|
||||
java.lang.Iterable<java.lang.String> values) {
|
||||
ensureClusterIdsIsMutable();
|
||||
super.addAll(values, clusterIds_);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder clearClusterIds() {
|
||||
clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>repeated string cluster_ids = 5;</code>
|
||||
*/
|
||||
public Builder addClusterIdsBytes(
|
||||
com.google.protobuf.ByteString value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
ensureClusterIdsIsMutable();
|
||||
clusterIds_.add(value);
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadDescriptor)
|
||||
}
|
||||
|
||||
|
@ -13007,23 +13198,23 @@ public final class WALProtos {
|
|||
"LUSH\020\002\022\020\n\014CANNOT_FLUSH\020\003\"q\n\017StoreDescrip" +
|
||||
"tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" +
|
||||
"ir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003(\t\022\035\n\025store_fi" +
|
||||
"le_size_bytes\030\004 \001(\004\"\237\001\n\022BulkLoadDescript" +
|
||||
"le_size_bytes\030\004 \001(\004\"\264\001\n\022BulkLoadDescript" +
|
||||
"or\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.TableN" +
|
||||
"ame\022\033\n\023encoded_region_name\030\002 \002(\014\022)\n\006stor" +
|
||||
"es\030\003 \003(\0132\031.hbase.pb.StoreDescriptor\022\030\n\020b" +
|
||||
"ulkload_seq_num\030\004 \002(\003\"\272\002\n\025RegionEventDes",
|
||||
"criptor\022=\n\nevent_type\030\001 \002(\0162).hbase.pb.R" +
|
||||
"egionEventDescriptor.EventType\022\022\n\ntable_" +
|
||||
"name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022" +
|
||||
"\033\n\023log_sequence_number\030\004 \001(\004\022)\n\006stores\030\005" +
|
||||
" \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006serve" +
|
||||
"r\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013region_" +
|
||||
"name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN\020\000" +
|
||||
"\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScop" +
|
||||
"eType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030RE" +
|
||||
"PLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.h",
|
||||
"adoop.hbase.protobuf.generatedB\tWALProto" +
|
||||
"sH\001\210\001\000\240\001\001"
|
||||
"ulkload_seq_num\030\004 \002(\003\022\023\n\013cluster_ids\030\005 \003",
|
||||
"(\t\"\272\002\n\025RegionEventDescriptor\022=\n\nevent_ty" +
|
||||
"pe\030\001 \002(\0162).hbase.pb.RegionEventDescripto" +
|
||||
"r.EventType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encod" +
|
||||
"ed_region_name\030\003 \002(\014\022\033\n\023log_sequence_num" +
|
||||
"ber\030\004 \001(\004\022)\n\006stores\030\005 \003(\0132\031.hbase.pb.Sto" +
|
||||
"reDescriptor\022$\n\006server\030\006 \001(\0132\024.hbase.pb." +
|
||||
"ServerName\022\023\n\013region_name\030\007 \001(\014\".\n\tEvent" +
|
||||
"Type\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"" +
|
||||
"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICATIO" +
|
||||
"N_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLO",
|
||||
"BAL\020\001B?\n*org.apache.hadoop.hbase.protobu" +
|
||||
"f.generatedB\tWALProtosH\001\210\001\000\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -13083,7 +13274,7 @@ public final class WALProtos {
|
|||
internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_BulkLoadDescriptor_descriptor,
|
||||
new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", });
|
||||
new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", "ClusterIds", });
|
||||
internal_static_hbase_pb_RegionEventDescriptor_descriptor =
|
||||
getDescriptor().getMessageTypes().get(8);
|
||||
internal_static_hbase_pb_RegionEventDescriptor_fieldAccessorTable = new
|
||||
|
|
|
@ -373,6 +373,7 @@ message BulkLoadHFileRequest {
|
|||
required RegionSpecifier region = 1;
|
||||
repeated FamilyPath family_path = 2;
|
||||
optional bool assign_seq_num = 3;
|
||||
repeated string cluster_ids = 4;
|
||||
|
||||
message FamilyPath {
|
||||
required bytes family = 1;
|
||||
|
|
|
@ -32,6 +32,7 @@ message SecureBulkLoadHFilesRequest {
|
|||
optional bool assign_seq_num = 2;
|
||||
required DelegationToken fs_token = 3;
|
||||
required string bulk_token = 4;
|
||||
repeated string cluster_ids = 5;
|
||||
}
|
||||
|
||||
message SecureBulkLoadHFilesResponse {
|
||||
|
|
|
@ -150,6 +150,7 @@ message BulkLoadDescriptor {
|
|||
required bytes encoded_region_name = 2;
|
||||
repeated StoreDescriptor stores = 3;
|
||||
required int64 bulkload_seq_num = 4;
|
||||
repeated string cluster_ids = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -139,6 +139,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private int nrThreads;
|
||||
private int depth = 2;
|
||||
|
||||
private List<String> clusterIds = new ArrayList<>();
|
||||
|
||||
private LoadIncrementalHFiles() {}
|
||||
|
||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||
|
@ -146,6 +148,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
initialize();
|
||||
}
|
||||
|
||||
public void setClusterIds(List<String> clusterIds) {
|
||||
this.clusterIds = clusterIds;
|
||||
}
|
||||
|
||||
public void setDepth(int depth) {
|
||||
this.depth = depth;
|
||||
}
|
||||
|
@ -875,7 +881,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
|
||||
byte[] regionName = getLocation().getRegionInfo().getRegionName();
|
||||
if (!isSecureBulkLoadEndpointAvailable()) {
|
||||
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
|
||||
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds,
|
||||
clusterIds);
|
||||
} else {
|
||||
try (Table table = conn.getTable(getTableName())) {
|
||||
secureClient = new SecureBulkLoadClient(table);
|
||||
|
|
|
@ -5843,7 +5843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
|
||||
BulkLoadListener bulkLoadListener) throws IOException {
|
||||
BulkLoadListener bulkLoadListener, List<String> clusterIds) throws IOException {
|
||||
long seqId = -1;
|
||||
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
|
||||
Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
|
||||
|
@ -6019,7 +6019,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
WALProtos.BulkLoadDescriptor loadDescriptor =
|
||||
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
|
||||
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles,
|
||||
storeFilesSizes, seqId);
|
||||
storeFilesSizes, seqId, clusterIds);
|
||||
WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
|
||||
loadDescriptor, mvcc);
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -461,7 +461,7 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* Unique identifier for the cluster we are a part of.
|
||||
*/
|
||||
private String clusterId;
|
||||
String clusterId;
|
||||
|
||||
/**
|
||||
* MX Bean for RegionServerInfo
|
||||
|
|
|
@ -2212,6 +2212,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
|
||||
final BulkLoadHFileRequest request) throws ServiceException {
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
List<String> clusterIds = new ArrayList<String>(request.getClusterIdsList());
|
||||
if(clusterIds.contains(this.regionServer.clusterId)){
|
||||
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
|
||||
} else {
|
||||
clusterIds.add(this.regionServer.clusterId);
|
||||
}
|
||||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
|
@ -2228,7 +2234,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
boolean loaded = false;
|
||||
try {
|
||||
if (!bypass) {
|
||||
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
|
||||
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, clusterIds);
|
||||
}
|
||||
} finally {
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
|
|
|
@ -552,14 +552,15 @@ public interface Region extends ConfigurationObserver {
|
|||
* rows with multiple column families atomically.
|
||||
*
|
||||
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
|
||||
* @param assignSeqId
|
||||
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
|
||||
* file about to be bulk loaded
|
||||
* @param assignSeqId
|
||||
* @param clusterIds
|
||||
* @return true if successful, false if failed recoverably
|
||||
* @throws IOException if failed unrecoverably.
|
||||
*/
|
||||
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
|
||||
BulkLoadListener bulkLoadListener) throws IOException;
|
||||
BulkLoadListener bulkLoadListener, List<String> clusterIds) throws IOException;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Coprocessors
|
||||
|
|
|
@ -86,17 +86,19 @@ public class HFileReplicator {
|
|||
private ThreadPoolExecutor exec;
|
||||
private int maxCopyThreads;
|
||||
private int copiesPerThread;
|
||||
private List<String> sourceClusterIds;
|
||||
|
||||
public HFileReplicator(Configuration sourceClusterConf,
|
||||
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
|
||||
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
|
||||
Connection connection) throws IOException {
|
||||
Connection connection, List<String> sourceClusterIds) throws IOException {
|
||||
this.sourceClusterConf = sourceClusterConf;
|
||||
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
|
||||
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
|
||||
this.bulkLoadHFileMap = tableQueueMap;
|
||||
this.conf = conf;
|
||||
this.connection = connection;
|
||||
this.sourceClusterIds = sourceClusterIds;
|
||||
|
||||
userProvider = UserProvider.instantiate(conf);
|
||||
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||
|
@ -130,6 +132,7 @@ public class HFileReplicator {
|
|||
LoadIncrementalHFiles loadHFiles = null;
|
||||
try {
|
||||
loadHFiles = new LoadIncrementalHFiles(conf);
|
||||
loadHFiles.setClusterIds(sourceClusterIds);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
|
||||
+ " data.", e);
|
||||
|
|
|
@ -157,9 +157,7 @@ public class ReplicationSink {
|
|||
Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
|
||||
new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
|
||||
|
||||
// Map of table name Vs list of pair of family and list of hfile paths from its namespace
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
|
||||
|
||||
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
|
||||
for (WALEntry entry : entries) {
|
||||
TableName table =
|
||||
TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
||||
|
@ -174,10 +172,19 @@ public class ReplicationSink {
|
|||
Cell cell = cells.current();
|
||||
// Handle bulk load hfiles replication
|
||||
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
|
||||
if (bulkLoadHFileMap == null) {
|
||||
bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
if(bulkLoadsPerClusters == null) {
|
||||
bulkLoadsPerClusters = new HashMap<>();
|
||||
}
|
||||
buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
|
||||
// Map of table name Vs list of pair of family and list of
|
||||
// hfile paths from its namespace
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
|
||||
bulkLoadsPerClusters.get(bld.getClusterIdsList());
|
||||
if (bulkLoadHFileMap == null) {
|
||||
bulkLoadHFileMap = new HashMap<>();
|
||||
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
|
||||
}
|
||||
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
|
||||
} else {
|
||||
// Handle wal replication
|
||||
if (isNewRowOrType(previousCell, cell)) {
|
||||
|
@ -213,14 +220,26 @@ public class ReplicationSink {
|
|||
LOG.debug("Finished replicating mutations.");
|
||||
}
|
||||
|
||||
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
|
||||
LOG.debug("Started replicating bulk loaded data.");
|
||||
HFileReplicator hFileReplicator =
|
||||
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
|
||||
if(bulkLoadsPerClusters != null) {
|
||||
for (Entry<List<String>, Map<String, List<Pair<byte[],
|
||||
List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
|
||||
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started replicating bulk loaded data from cluster ids: " +
|
||||
entry.getKey().toString());
|
||||
}
|
||||
HFileReplicator hFileReplicator =
|
||||
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
|
||||
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
|
||||
getConnection());
|
||||
hFileReplicator.replicate();
|
||||
LOG.debug("Finished replicating bulk loaded data.");
|
||||
getConnection(), entry.getKey());
|
||||
hFileReplicator.replicate();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finished replicating bulk loaded data from cluster id: " +
|
||||
entry.getKey().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int size = entries.size();
|
||||
|
@ -235,8 +254,7 @@ public class ReplicationSink {
|
|||
|
||||
private void buildBulkLoadHFileMap(
|
||||
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
|
||||
Cell cell) throws IOException {
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
BulkLoadDescriptor bld) throws IOException {
|
||||
List<StoreDescriptor> storesList = bld.getStoresList();
|
||||
int storesSize = storesList.size();
|
||||
for (int j = 0; j < storesSize; j++) {
|
||||
|
|
|
@ -298,7 +298,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
|
||||
@Override
|
||||
public void secureBulkLoadHFiles(RpcController controller,
|
||||
SecureBulkLoadHFilesRequest request,
|
||||
final SecureBulkLoadHFilesRequest request,
|
||||
RpcCallback<SecureBulkLoadHFilesResponse> done) {
|
||||
final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
|
||||
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
|
||||
|
@ -391,7 +391,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
//We call bulkLoadHFiles as requesting user
|
||||
//To enable access prior to staging
|
||||
return env.getRegion().bulkLoadHFiles(familyPaths, true,
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf));
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf), request.getClusterIdsList());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to complete bulk load", e);
|
||||
}
|
||||
|
|
|
@ -122,14 +122,14 @@ public class TestBulkLoad {
|
|||
};
|
||||
});
|
||||
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
|
||||
.bulkLoadHFiles(familyPaths, false, null);
|
||||
.bulkLoadHFiles(familyPaths, false, null, null);
|
||||
verify(log).sync(anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
|
||||
false, null);
|
||||
false, null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -147,7 +147,7 @@ public class TestBulkLoad {
|
|||
return 01L;
|
||||
};
|
||||
});
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null);
|
||||
verify(log).sync(anyLong());
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,7 @@ public class TestBulkLoad {
|
|||
};
|
||||
});
|
||||
testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
|
||||
false, null);
|
||||
false, null, null);
|
||||
verify(log).sync(anyLong());
|
||||
}
|
||||
|
||||
|
@ -188,33 +188,33 @@ public class TestBulkLoad {
|
|||
});
|
||||
TableName tableName = TableName.valueOf("test", "test");
|
||||
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
|
||||
.bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
|
||||
.bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null, null);
|
||||
verify(log).sync(anyLong());
|
||||
}
|
||||
|
||||
@Test(expected = DoNotRetryIOException.class)
|
||||
public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
|
||||
null);
|
||||
null, null);
|
||||
}
|
||||
|
||||
@Test(expected = DoNotRetryIOException.class)
|
||||
public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
|
||||
throws IOException {
|
||||
testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
|
||||
testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null);
|
||||
}
|
||||
|
||||
@Test(expected = DoNotRetryIOException.class)
|
||||
public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
|
||||
testRegionWithFamilies()
|
||||
.bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
|
||||
false, null);
|
||||
false, null, null);
|
||||
}
|
||||
|
||||
@Test(expected = FileNotFoundException.class)
|
||||
public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
|
||||
List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
|
||||
testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null, null);
|
||||
}
|
||||
|
||||
private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
|
||||
|
|
|
@ -0,0 +1,299 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
|
||||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Integration test for bulk load replication. Defines three clusters, with the following
|
||||
* replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
|
||||
* 2 and 3).
|
||||
*
|
||||
* For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
|
||||
* gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
|
||||
* topology all these bulk loads should get replicated only once on each peer. To assert this,
|
||||
* this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
|
||||
* clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
|
||||
* we are not entering the infinite loop condition addressed by HBASE-22380.
|
||||
*/
|
||||
@Category({ ReplicationTests.class, MediumTests.class})
|
||||
public class TestBulkLoadReplication extends TestReplicationBase {
|
||||
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestBulkLoadReplication.class);
|
||||
|
||||
private static final String PEER1_CLUSTER_ID = "peer1";
|
||||
private static final String PEER4_CLUSTER_ID = "peer4";
|
||||
private static final String PEER3_CLUSTER_ID = "peer3";
|
||||
|
||||
private static final String PEER_ID1 = "1";
|
||||
private static final String PEER_ID3 = "3";
|
||||
private static final String PEER_ID4 = "4";
|
||||
|
||||
private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
|
||||
private static CountDownLatch BULK_LOAD_LATCH;
|
||||
|
||||
private static HBaseTestingUtility utility3;
|
||||
private static HBaseTestingUtility utility4;
|
||||
private static Configuration conf3;
|
||||
private static Configuration conf4;
|
||||
// private static Table htable3;
|
||||
// private static Table htable4;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder testFolder = new TemporaryFolder();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
|
||||
conf3 = HBaseConfiguration.create(conf1);
|
||||
setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
|
||||
conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
|
||||
utility3 = new HBaseTestingUtility(conf3);
|
||||
conf4 = HBaseConfiguration.create(conf1);
|
||||
setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
|
||||
conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
|
||||
utility4 = new HBaseTestingUtility(conf4);
|
||||
TestReplicationBase.setUpBeforeClass();
|
||||
startCluster(utility3, conf3);
|
||||
startCluster(utility4, conf4);
|
||||
}
|
||||
|
||||
private static void startCluster(HBaseTestingUtility util, Configuration configuration)
|
||||
throws Exception {
|
||||
LOG.info("Setup Zk to same one from utility1 and utility4");
|
||||
util.setZkCluster(utility1.getZkCluster());
|
||||
util.startMiniCluster(2);
|
||||
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor columnDesc = new HColumnDescriptor(famName);
|
||||
columnDesc.setScope(1);
|
||||
tableDesc.addFamily(columnDesc);
|
||||
|
||||
Connection connection = ConnectionFactory.createConnection(configuration);
|
||||
try (Admin admin = connection.getAdmin()) {
|
||||
admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
}
|
||||
util.waitUntilAllRegionsAssigned(tableName);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpBase() throws Exception {
|
||||
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
|
||||
ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
|
||||
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
|
||||
//adds cluster4 as a remote peer on cluster1
|
||||
getReplicationAdmin(utility1.getConfiguration()).addPeer(PEER_ID4, peer4Config);
|
||||
//adds cluster1 as a remote peer on cluster4
|
||||
ReplicationAdmin admin4 = getReplicationAdmin(utility4.getConfiguration());
|
||||
admin4.addPeer(PEER_ID1, peer1Config);
|
||||
//adds cluster3 as a remote peer on cluster4
|
||||
admin4.addPeer(PEER_ID3, peer3Config);
|
||||
//adds cluster4 as a remote peer on cluster3
|
||||
getReplicationAdmin(utility3.getConfiguration()).addPeer(PEER_ID4, peer4Config);
|
||||
setupCoprocessor(utility1);
|
||||
setupCoprocessor(utility4);
|
||||
setupCoprocessor(utility3);
|
||||
}
|
||||
|
||||
private ReplicationAdmin getReplicationAdmin(Configuration configuration) throws IOException {
|
||||
return new ReplicationAdmin(configuration);
|
||||
}
|
||||
|
||||
private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
|
||||
ReplicationPeerConfig config = new ReplicationPeerConfig();
|
||||
config.setClusterKey(util.getClusterKey());
|
||||
return config;
|
||||
}
|
||||
|
||||
private void setupCoprocessor(HBaseTestingUtility cluster) throws IOException {
|
||||
for(HRegion region : cluster.getHBaseCluster().getRegions(tableName)){
|
||||
region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class,
|
||||
0, cluster.getConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownBase() throws Exception {
|
||||
getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID1);
|
||||
getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID3);
|
||||
getReplicationAdmin(utility3.getConfiguration()).removePeer(PEER_ID4);
|
||||
}
|
||||
|
||||
private static void setupBulkLoadConfigsForCluster(Configuration config,
|
||||
String clusterReplicationId) throws Exception {
|
||||
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
|
||||
File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
|
||||
File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
|
||||
+ "/hbase-site.xml");
|
||||
config.writeXml(new FileOutputStream(sourceConfigFile));
|
||||
config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkLoadReplicationActiveActive() throws Exception {
|
||||
Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
|
||||
Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
|
||||
Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
|
||||
byte[] row = Bytes.toBytes("001");
|
||||
byte[] value = Bytes.toBytes("v1");
|
||||
assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
|
||||
row = Bytes.toBytes("002");
|
||||
value = Bytes.toBytes("v2");
|
||||
assertBulkLoadConditions(row, value, utility4, peer4TestTable, peer1TestTable, peer3TestTable);
|
||||
row = Bytes.toBytes("003");
|
||||
value = Bytes.toBytes("v3");
|
||||
assertBulkLoadConditions(row, value, utility3, peer3TestTable, peer4TestTable, peer1TestTable);
|
||||
//Additional wait to make sure no extra bulk load happens
|
||||
Thread.sleep(400);
|
||||
//We have 3 bulk load events (1 initiated on each cluster).
|
||||
//Each event gets 3 counts (the originator cluster, plus the two peers),
|
||||
//so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
|
||||
assertEquals(9, BULK_LOADS_COUNT.get());
|
||||
}
|
||||
|
||||
private void assertBulkLoadConditions(byte[] row, byte[] value,
|
||||
HBaseTestingUtility utility, Table...tables) throws Exception {
|
||||
BULK_LOAD_LATCH = new CountDownLatch(3);
|
||||
bulkLoadOnCluster(row, value, utility);
|
||||
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
|
||||
assertTableHasValue(tables[0], row, value);
|
||||
assertTableHasValue(tables[1], row, value);
|
||||
assertTableHasValue(tables[2], row, value);
|
||||
}
|
||||
|
||||
private void bulkLoadOnCluster(byte[] row, byte[] value,
|
||||
HBaseTestingUtility cluster) throws Exception {
|
||||
String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration());
|
||||
copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
|
||||
LoadIncrementalHFiles bulkLoadHFilesTool =
|
||||
new LoadIncrementalHFiles(cluster.getConfiguration());
|
||||
bulkLoadHFilesTool.run(new String[]{"/bulk_dir/region1/", tableName.getNameAsString()});
|
||||
}
|
||||
|
||||
private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
|
||||
Path bulkLoadDir = new Path("/bulk_dir/region1/f");
|
||||
cluster.getFileSystem().mkdirs(bulkLoadDir);
|
||||
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
|
||||
}
|
||||
|
||||
private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
|
||||
Get get = new Get(row);
|
||||
Result result = table.get(get);
|
||||
assertTrue(result.advance());
|
||||
assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
|
||||
}
|
||||
|
||||
private String createHFileForFamilies(byte[] row, byte[] value,
|
||||
Configuration clusterConfig) throws IOException {
|
||||
final KeyValue kv = new KeyValue(row, famName, Bytes.toBytes("1"), System.currentTimeMillis(),
|
||||
KeyValue.Type.Put, value);
|
||||
final HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
|
||||
// TODO We need a way to do this without creating files
|
||||
final File hFileLocation = testFolder.newFile();
|
||||
final FSDataOutputStream out =
|
||||
new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
|
||||
try {
|
||||
hFileFactory.withOutputStream(out);
|
||||
hFileFactory.withFileContext(new HFileContext());
|
||||
HFile.Writer writer = hFileFactory.create();
|
||||
try {
|
||||
writer.append(kv);
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
return hFileLocation.getAbsoluteFile().getAbsolutePath();
|
||||
}
|
||||
|
||||
public static class BulkReplicationTestObserver extends BaseRegionObserver {
|
||||
|
||||
@Override
|
||||
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
List<Pair<byte[], String>> familyPaths) throws IOException {
|
||||
BULK_LOADS_COUNT.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
|
||||
if(hasLoaded) {
|
||||
BULK_LOAD_LATCH.countDown();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1480,7 +1480,7 @@ public class TestHRegionReplayEvents {
|
|||
randomValues)));
|
||||
expectedLoadFileCount++;
|
||||
}
|
||||
primaryRegion.bulkLoadHFiles(familyPaths, false, null);
|
||||
primaryRegion.bulkLoadHFiles(familyPaths, false, null, null);
|
||||
|
||||
// now replay the edits and the bulk load marker
|
||||
reader = createWALReaderForPrimary();
|
||||
|
|
|
@ -361,7 +361,7 @@ public class TestWALReplay {
|
|||
Bytes.toBytes("z"), 10);
|
||||
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
|
||||
hfs.add(Pair.newPair(family, f.toString()));
|
||||
region.bulkLoadHFiles(hfs, true, null);
|
||||
region.bulkLoadHFiles(hfs, true, null, null);
|
||||
|
||||
// Add an edit so something in the WAL
|
||||
byte [] row = tableName.getName();
|
||||
|
@ -435,7 +435,7 @@ public class TestWALReplay {
|
|||
Bytes.toBytes(i + "50"), 10);
|
||||
hfs.add(Pair.newPair(family, f.toString()));
|
||||
}
|
||||
region.bulkLoadHFiles(hfs, true, null);
|
||||
region.bulkLoadHFiles(hfs, true, null, null);
|
||||
final int rowsInsertedCount = 31;
|
||||
assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
|
||||
|
||||
|
|
Loading…
Reference in New Issue