HBASE-16672 Add option for bulk load to copy hfile(s) instead of renaming

This commit is contained in:
tedyu 2016-09-26 18:56:38 -07:00
parent b9ec59ebbe
commit 219c786457
11 changed files with 313 additions and 106 deletions

View File

@ -113,9 +113,30 @@ public class SecureBulkLoadClient {
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false);
}
/**
* Securely bulk load a list of HFiles using client protocol.
*
* @param client
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @param userToken
* @param bulkToken
* @param copyFiles
* @return true if all are loaded
* @throws IOException
*/
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
userToken, bulkToken);
userToken, bulkToken, copyFiles);
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);

View File

@ -522,12 +522,33 @@ public final class RequestConverter {
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @param userToken
* @param bulkToken
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false);
}
/**
* Create a protocol buffer bulk load request
*
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @param userToken
* @param bulkToken
* @param copyFiles
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@ -560,6 +581,7 @@ public final class RequestConverter {
if (bulkToken != null) {
request.setBulkToken(bulkToken);
}
request.setCopyFile(copyFiles);
return request.build();
}

View File

@ -21077,6 +21077,16 @@ public final class ClientProtos {
*/
com.google.protobuf.ByteString
getBulkTokenBytes();
// optional bool copy_file = 6 [default = false];
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
boolean hasCopyFile();
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
boolean getCopyFile();
}
/**
* Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
@ -21179,6 +21189,11 @@ public final class ClientProtos {
bulkToken_ = input.readBytes();
break;
}
case 48: {
bitField0_ |= 0x00000010;
copyFile_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -21979,12 +21994,29 @@ public final class ClientProtos {
}
}
// optional bool copy_file = 6 [default = false];
public static final int COPY_FILE_FIELD_NUMBER = 6;
private boolean copyFile_;
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public boolean hasCopyFile() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public boolean getCopyFile() {
return copyFile_;
}
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
familyPath_ = java.util.Collections.emptyList();
assignSeqNum_ = false;
fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance();
bulkToken_ = "";
copyFile_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -22027,6 +22059,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(5, getBulkTokenBytes());
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBool(6, copyFile_);
}
getUnknownFields().writeTo(output);
}
@ -22056,6 +22091,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, getBulkTokenBytes());
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(6, copyFile_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -22101,6 +22140,11 @@ public final class ClientProtos {
result = result && getBulkToken()
.equals(other.getBulkToken());
}
result = result && (hasCopyFile() == other.hasCopyFile());
if (hasCopyFile()) {
result = result && (getCopyFile()
== other.getCopyFile());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -22134,6 +22178,10 @@ public final class ClientProtos {
hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
hash = (53 * hash) + getBulkToken().hashCode();
}
if (hasCopyFile()) {
hash = (37 * hash) + COPY_FILE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getCopyFile());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -22274,6 +22322,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000008);
bulkToken_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
copyFile_ = false;
bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@ -22335,6 +22385,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000008;
}
result.bulkToken_ = bulkToken_;
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000010;
}
result.copyFile_ = copyFile_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -22391,6 +22445,9 @@ public final class ClientProtos {
bulkToken_ = other.bulkToken_;
onChanged();
}
if (other.hasCopyFile()) {
setCopyFile(other.getCopyFile());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -23013,6 +23070,39 @@ public final class ClientProtos {
return this;
}
// optional bool copy_file = 6 [default = false];
private boolean copyFile_ ;
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public boolean hasCopyFile() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public boolean getCopyFile() {
return copyFile_;
}
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public Builder setCopyFile(boolean value) {
bitField0_ |= 0x00000020;
copyFile_ = value;
onChanged();
return this;
}
/**
* <code>optional bool copy_file = 6 [default = false];</code>
*/
public Builder clearCopyFile() {
bitField0_ = (bitField0_ & ~0x00000020);
copyFile_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
}
@ -39320,81 +39410,81 @@ public final class ClientProtos {
"_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" +
"ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" +
"scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" +
"s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
"s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
"(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" +
"path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" +
"st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" +
"\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" +
"en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" +
"family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF",
"ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" +
"nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" +
" \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" +
"repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" +
"\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" +
".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" +
"LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" +
"nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" +
"\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" +
"r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces",
"sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" +
"ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" +
"t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" +
"alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" +
"oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" +
"2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" +
"\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" +
"oprocessorServiceResponse\022)\n\006region\030\001 \002(" +
"\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " +
"\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022",
"\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" +
".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" +
"b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" +
"oprocessorServiceCall\"k\n\014RegionAction\022)\n" +
"\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" +
"\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" +
".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" +
"eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" +
"\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" +
"RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.",
"pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" +
".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" +
"n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" +
".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" +
".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" +
".hbase.pb.CoprocessorServiceResult\0220\n\tlo" +
"adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" +
"B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" +
"ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" +
"on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt",
"esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" +
" \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" +
"up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" +
"ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" +
"onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" +
"sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" +
"ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" +
"s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" +
"\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" +
"etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta",
"te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" +
"utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" +
"uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" +
"HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." +
"hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" +
"eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" +
"est\032!.hbase.pb.PrepareBulkLoadResponse\022V" +
"\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" +
"LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" +
"sponse\022X\n\013ExecService\022#.hbase.pb.Coproce",
"ssorServiceRequest\032$.hbase.pb.Coprocesso" +
"rServiceResponse\022d\n\027ExecRegionServerServ" +
"ice\022#.hbase.pb.CoprocessorServiceRequest" +
"\032$.hbase.pb.CoprocessorServiceResponse\0228" +
"\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." +
"pb.MultiResponseBB\n*org.apache.hadoop.hb" +
"ase.protobuf.generatedB\014ClientProtosH\001\210\001" +
"\001\240\001\001"
"en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" +
"\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014",
"\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" +
"\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" +
"tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" +
" \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" +
"Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" +
"ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" +
"onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" +
"\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" +
"uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" +
"2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu",
"lkLoadResponse\"a\n\026CoprocessorServiceCall" +
"\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" +
"ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" +
"rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" +
"base.pb.NameBytesPair\"v\n\031CoprocessorServ" +
"iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" +
"gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" +
"oprocessorServiceCall\"o\n\032CoprocessorServ" +
"iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" +
"egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb",
".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" +
"\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" +
"oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" +
"ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" +
"iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" +
"\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" +
"(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" +
"RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" +
"\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" +
"Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat",
"s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" +
"fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" +
"Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
"(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" +
"exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" +
"\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" +
"ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" +
"\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" +
"ActionResult\0226\n\021resultOrException\030\001 \003(\0132" +
"\033.hbase.pb.ResultOrException\022*\n\texceptio",
"n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" +
"iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" +
"b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" +
"ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" +
"ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" +
"2\034.hbase.pb.RegionActionResult\022\021\n\tproces" +
"sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" +
"ase.pb.MultiRegionLoadStats*\'\n\013Consisten" +
"cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" +
"ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb",
"ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." +
"MutateRequest\032\030.hbase.pb.MutateResponse\022" +
"5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" +
"b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." +
"pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" +
"oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" +
"ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" +
".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" +
"oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." +
"hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec",
"Service\022#.hbase.pb.CoprocessorServiceReq" +
"uest\032$.hbase.pb.CoprocessorServiceRespon" +
"se\022d\n\027ExecRegionServerService\022#.hbase.pb" +
".CoprocessorServiceRequest\032$.hbase.pb.Co" +
"processorServiceResponse\0228\n\005Multi\022\026.hbas" +
"e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" +
"seBB\n*org.apache.hadoop.hbase.protobuf.g" +
"eneratedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -39502,7 +39592,7 @@ public final class ClientProtos {
internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", });
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new

View File

@ -339,6 +339,7 @@ message BulkLoadHFileRequest {
optional bool assign_seq_num = 3;
optional DelegationToken fs_token = 4;
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
message FamilyPath {
required bytes family = 1;

View File

@ -115,6 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
public final static String ALWAYS_COPY_FILES = "always.copy.files";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@ -328,7 +329,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
doBulkLoad(hfofDir, admin, table, regionLocator, false);
doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
}
void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
@ -360,10 +361,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @param copyFile always copy hfiles if true
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
RegionLocator regionLocator, boolean silence, boolean copyFile)
throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
@ -386,7 +389,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
break;
}
}
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@ -402,10 +405,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @param copyFile always copy hfiles if true
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
RegionLocator regionLocator, boolean silence, boolean copyFile)
throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
@ -437,7 +442,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@ -445,7 +450,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue, ExecutorService pool,
SecureBulkLoadClient secureClient) throws IOException {
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0;
if(isSecureBulkLoadEndpointAvailable()) {
@ -486,7 +491,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ " hfiles to one family of one region");
}
bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile);
// NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and
@ -599,12 +604,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
Pair<byte[][], byte[][]> startEndKeys) throws IOException {
loadHFileQueue(table, conn, queue, startEndKeys, false);
}
/**
* Used by the replication sink to load the hfiles from the source cluster. It does the following,
* <ol>
* <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
* <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
* </li>
* </ol>
* @param table Table to which these hfiles should be loaded to
* @param conn Connection to use
* @param queue {@link LoadQueueItem} has hfiles yet to be loaded
* @param startEndKeys starting and ending row keys of the region
*/
public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
ExecutorService pool = null;
try {
pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups =
groupOrSplitPhase(table, pool, queue, startEndKeys);
bulkLoadPhase(table, conn, pool, queue, regionGroups);
bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
} finally {
if (pool != null) {
pool.shutdown();
@ -619,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
protected void bulkLoadPhase(final Table table, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile) throws IOException {
// atomically bulk load the groups.
Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
@ -630,7 +652,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> toRetry =
tryAtomicRegionLoad(conn, table.getName(), first, lqis);
tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
return toRetry;
}
};
@ -890,8 +912,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* failure
*/
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
boolean copyFile) throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
@ -911,7 +933,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
}
return success;
} finally {
@ -1172,10 +1194,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence);
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
} else {
doBulkLoad(map, admin, table, locator, silence);
doBulkLoad(map, admin, table, locator, silence, copyFiles);
}
}
}

View File

@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
}
@Override
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
@ -5503,7 +5509,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
String finalPath = path;
if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
}
Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);

View File

@ -2078,7 +2078,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
if (!bypass) {
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
request.getCopyFile());
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);

View File

@ -512,7 +512,6 @@ public interface Region extends ConfigurationObserver {
* pre/post processing of a given bulkload call
*/
interface BulkLoadListener {
/**
* Called before an HFile is actually loaded
* @param family family being loaded to
@ -520,7 +519,8 @@ public interface Region extends ConfigurationObserver {
* @return final path to be used for actual loading
* @throws IOException
*/
String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
throws IOException;
/**
* Called after a successful HFile load
@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver {
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException;
/**
* Attempts to atomically load a group of hfiles. This is critical for loading
* rows with multiple column families atomically.
*
* @param familyPaths List of Pair&lt;byte[] column family, String hfilePath&gt;
* @param assignSeqId
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
* @param copyFile always copy hfiles if true
* @return true if successful, false if failed recoverably
* @throws IOException if failed unrecoverably.
*/
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
///////////////////////////////////////////////////////////////////////////
// Coprocessors

View File

@ -233,7 +233,7 @@ public class SecureBulkLoadManager {
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf));
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
@ -305,7 +305,8 @@ public class SecureBulkLoadManager {
}
@Override
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
@ -329,6 +330,9 @@ public class SecureBulkLoadManager {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else if (copyFile) {
LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);

View File

@ -131,6 +131,17 @@ public class TestLoadIncrementalHFiles {
});
}
@Test(timeout = 120000)
public void testSimpleLoadWithFileCopy() throws Exception {
String testName = "mytable_testSimpleLoadWithFileCopy";
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
false, null, new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}, false, true);
}
/**
* Test case that creates some regions and loads
* HFiles that cross the boundaries of those regions
@ -291,12 +302,12 @@ public class TestLoadIncrementalHFiles {
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
boolean copyFiles) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
@ -305,9 +316,11 @@ public class TestLoadIncrementalHFiles {
int hfileIdx = 0;
Map<byte[], List<Path>> map = null;
List<Path> list = null;
if (useMap || copyFiles) {
list = new ArrayList<>();
}
if (useMap) {
map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
list = new ArrayList<>();
map.put(FAMILY, list);
}
for (byte[][] range : hfileRanges) {
@ -326,7 +339,11 @@ public class TestLoadIncrementalHFiles {
}
final TableName tableName = htd.getTableName();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
Configuration conf = util.getConfiguration();
if (copyFiles) {
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
}
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()};
if (useMap) {
loader.run(null, map, tableName);
@ -334,6 +351,12 @@ public class TestLoadIncrementalHFiles {
loader.run(args);
}
if (copyFiles) {
for (Path p : list) {
assertTrue(fs.exists(p));
}
}
Table table = util.getConnection().getTable(tableName);
try {
assertEquals(expectedRows, util.countRows(table));
@ -419,7 +442,7 @@ public class TestLoadIncrementalHFiles {
htd.addFamily(family);
try {
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false);
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException);

View File

@ -281,8 +281,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
throws IOException {
TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
boolean copyFile) throws IOException {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
Connection errConn;
@ -293,10 +293,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throw new RuntimeException("mocking cruft, should never happen");
}
failedCalls.incrementAndGet();
return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile);
}
return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile);
}
};
try {
@ -359,13 +359,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
@Override
protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile)
throws IOException {
int i = attemptedCalls.incrementAndGet();
if (i == 1) {
// On first attempt force a split.
forceSplit(table);
}
super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile);
}
};