Port HBASE-6590 to trunk : Asssign sequence number to bulk loaded files (Amitanand)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1382351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2c270de896
commit
564fc2e7c3
|
@ -97,11 +97,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private Configuration cfg;
|
private Configuration cfg;
|
||||||
|
|
||||||
public static String NAME = "completebulkload";
|
public static String NAME = "completebulkload";
|
||||||
|
private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
||||||
|
private boolean assignSeqIds;
|
||||||
|
|
||||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.cfg = conf;
|
this.cfg = conf;
|
||||||
this.hbAdmin = new HBaseAdmin(conf);
|
this.hbAdmin = new HBaseAdmin(conf);
|
||||||
|
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void usage() {
|
private void usage() {
|
||||||
|
@ -482,7 +485,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
LOG.debug("Going to connect to server " + location + " for row "
|
LOG.debug("Going to connect to server " + location + " for row "
|
||||||
+ Bytes.toStringBinary(row));
|
+ Bytes.toStringBinary(row));
|
||||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||||
return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName);
|
return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
|
||||||
|
assignSeqIds);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1265,14 +1265,15 @@ public final class ProtobufUtil {
|
||||||
* @param client
|
* @param client
|
||||||
* @param familyPaths
|
* @param familyPaths
|
||||||
* @param regionName
|
* @param regionName
|
||||||
|
* @param assignSeqNum
|
||||||
* @return true if all are loaded
|
* @return true if all are loaded
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static boolean bulkLoadHFile(final ClientProtocol client,
|
public static boolean bulkLoadHFile(final ClientProtocol client,
|
||||||
final List<Pair<byte[], String>> familyPaths,
|
final List<Pair<byte[], String>> familyPaths,
|
||||||
final byte[] regionName) throws IOException {
|
final byte[] regionName, boolean assignSeqNum) throws IOException {
|
||||||
BulkLoadHFileRequest request =
|
BulkLoadHFileRequest request =
|
||||||
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName);
|
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
|
||||||
try {
|
try {
|
||||||
BulkLoadHFileResponse response =
|
BulkLoadHFileResponse response =
|
||||||
client.bulkLoadHFile(null, request);
|
client.bulkLoadHFile(null, request);
|
||||||
|
|
|
@ -454,10 +454,12 @@ public final class RequestConverter {
|
||||||
*
|
*
|
||||||
* @param familyPaths
|
* @param familyPaths
|
||||||
* @param regionName
|
* @param regionName
|
||||||
|
* @param assignSeqNum
|
||||||
* @return a bulk load request
|
* @return a bulk load request
|
||||||
*/
|
*/
|
||||||
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
|
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
|
||||||
final List<Pair<byte[], String>> familyPaths, final byte[] regionName) {
|
final List<Pair<byte[], String>> familyPaths,
|
||||||
|
final byte[] regionName, boolean assignSeqNum) {
|
||||||
BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
|
BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
|
||||||
RegionSpecifier region = buildRegionSpecifier(
|
RegionSpecifier region = buildRegionSpecifier(
|
||||||
RegionSpecifierType.REGION_NAME, regionName);
|
RegionSpecifierType.REGION_NAME, regionName);
|
||||||
|
@ -468,6 +470,7 @@ public final class RequestConverter {
|
||||||
familyPathBuilder.setPath(familyPath.getSecond());
|
familyPathBuilder.setPath(familyPath.getSecond());
|
||||||
builder.addFamilyPath(familyPathBuilder.build());
|
builder.addFamilyPath(familyPathBuilder.build());
|
||||||
}
|
}
|
||||||
|
builder.setAssignSeqNum(assignSeqNum);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13967,6 +13967,10 @@ public final class ClientProtos {
|
||||||
getFamilyPathOrBuilderList();
|
getFamilyPathOrBuilderList();
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPathOrBuilder getFamilyPathOrBuilder(
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPathOrBuilder getFamilyPathOrBuilder(
|
||||||
int index);
|
int index);
|
||||||
|
|
||||||
|
// optional bool assignSeqNum = 3;
|
||||||
|
boolean hasAssignSeqNum();
|
||||||
|
boolean getAssignSeqNum();
|
||||||
}
|
}
|
||||||
public static final class BulkLoadHFileRequest extends
|
public static final class BulkLoadHFileRequest extends
|
||||||
com.google.protobuf.GeneratedMessage
|
com.google.protobuf.GeneratedMessage
|
||||||
|
@ -14524,9 +14528,20 @@ public final class ClientProtos {
|
||||||
return familyPath_.get(index);
|
return familyPath_.get(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional bool assignSeqNum = 3;
|
||||||
|
public static final int ASSIGNSEQNUM_FIELD_NUMBER = 3;
|
||||||
|
private boolean assignSeqNum_;
|
||||||
|
public boolean hasAssignSeqNum() {
|
||||||
|
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||||
|
}
|
||||||
|
public boolean getAssignSeqNum() {
|
||||||
|
return assignSeqNum_;
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
|
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
|
||||||
familyPath_ = java.util.Collections.emptyList();
|
familyPath_ = java.util.Collections.emptyList();
|
||||||
|
assignSeqNum_ = false;
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -14560,6 +14575,9 @@ public final class ClientProtos {
|
||||||
for (int i = 0; i < familyPath_.size(); i++) {
|
for (int i = 0; i < familyPath_.size(); i++) {
|
||||||
output.writeMessage(2, familyPath_.get(i));
|
output.writeMessage(2, familyPath_.get(i));
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||||
|
output.writeBool(3, assignSeqNum_);
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14577,6 +14595,10 @@ public final class ClientProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeMessageSize(2, familyPath_.get(i));
|
.computeMessageSize(2, familyPath_.get(i));
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeBoolSize(3, assignSeqNum_);
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -14607,6 +14629,11 @@ public final class ClientProtos {
|
||||||
}
|
}
|
||||||
result = result && getFamilyPathList()
|
result = result && getFamilyPathList()
|
||||||
.equals(other.getFamilyPathList());
|
.equals(other.getFamilyPathList());
|
||||||
|
result = result && (hasAssignSeqNum() == other.hasAssignSeqNum());
|
||||||
|
if (hasAssignSeqNum()) {
|
||||||
|
result = result && (getAssignSeqNum()
|
||||||
|
== other.getAssignSeqNum());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -14624,6 +14651,10 @@ public final class ClientProtos {
|
||||||
hash = (37 * hash) + FAMILYPATH_FIELD_NUMBER;
|
hash = (37 * hash) + FAMILYPATH_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + getFamilyPathList().hashCode();
|
hash = (53 * hash) + getFamilyPathList().hashCode();
|
||||||
}
|
}
|
||||||
|
if (hasAssignSeqNum()) {
|
||||||
|
hash = (37 * hash) + ASSIGNSEQNUM_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + hashBoolean(getAssignSeqNum());
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
@ -14754,6 +14785,8 @@ public final class ClientProtos {
|
||||||
} else {
|
} else {
|
||||||
familyPathBuilder_.clear();
|
familyPathBuilder_.clear();
|
||||||
}
|
}
|
||||||
|
assignSeqNum_ = false;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000004);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14809,6 +14842,10 @@ public final class ClientProtos {
|
||||||
} else {
|
} else {
|
||||||
result.familyPath_ = familyPathBuilder_.build();
|
result.familyPath_ = familyPathBuilder_.build();
|
||||||
}
|
}
|
||||||
|
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||||
|
to_bitField0_ |= 0x00000002;
|
||||||
|
}
|
||||||
|
result.assignSeqNum_ = assignSeqNum_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -14854,6 +14891,9 @@ public final class ClientProtos {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (other.hasAssignSeqNum()) {
|
||||||
|
setAssignSeqNum(other.getAssignSeqNum());
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -14914,6 +14954,11 @@ public final class ClientProtos {
|
||||||
addFamilyPath(subBuilder.buildPartial());
|
addFamilyPath(subBuilder.buildPartial());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 24: {
|
||||||
|
bitField0_ |= 0x00000004;
|
||||||
|
assignSeqNum_ = input.readBool();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15196,6 +15241,27 @@ public final class ClientProtos {
|
||||||
return familyPathBuilder_;
|
return familyPathBuilder_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional bool assignSeqNum = 3;
|
||||||
|
private boolean assignSeqNum_ ;
|
||||||
|
public boolean hasAssignSeqNum() {
|
||||||
|
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||||
|
}
|
||||||
|
public boolean getAssignSeqNum() {
|
||||||
|
return assignSeqNum_;
|
||||||
|
}
|
||||||
|
public Builder setAssignSeqNum(boolean value) {
|
||||||
|
bitField0_ |= 0x00000004;
|
||||||
|
assignSeqNum_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearAssignSeqNum() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000004);
|
||||||
|
assignSeqNum_ = false;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:BulkLoadHFileRequest)
|
// @@protoc_insertion_point(builder_scope:BulkLoadHFileRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21617,38 +21683,39 @@ public final class ClientProtos {
|
||||||
"\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" +
|
"\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" +
|
||||||
"\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " +
|
"\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " +
|
||||||
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" +
|
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" +
|
||||||
"Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024BulkLo" +
|
"Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" +
|
||||||
"adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
|
"adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
|
||||||
"pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" +
|
"pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" +
|
||||||
"FileRequest.FamilyPath\032*\n\nFamilyPath\022\016\n\006" +
|
"FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" +
|
||||||
"family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF" +
|
" \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" +
|
||||||
"ileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003" +
|
"th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" +
|
||||||
"row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nmetho",
|
"ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto",
|
||||||
"dName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameStri" +
|
"colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" +
|
||||||
"ngPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytesPai" +
|
"perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" +
|
||||||
"r\"O\n\026ExecCoprocessorRequest\022 \n\006region\030\001 " +
|
"r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" +
|
||||||
"\002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005.Ex" +
|
"orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
|
||||||
"ec\"8\n\027ExecCoprocessorResponse\022\035\n\005value\030\001" +
|
"ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" +
|
||||||
" \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006m" +
|
"sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" +
|
||||||
"utate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022" +
|
"ir\"N\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Muta" +
|
||||||
"\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005" +
|
"te\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Ex" +
|
||||||
"value\030\001 \001(\0132\016.NameBytesPair\022!\n\texception" +
|
"ec\"P\n\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.Name" +
|
||||||
"\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 ",
|
"BytesPair\022!\n\texception\030\002 \001(\0132\016.NameBytes",
|
||||||
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006acti" +
|
"Pair\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.R" +
|
||||||
"on\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\"." +
|
"egionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAc" +
|
||||||
"\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Action" +
|
"tion\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n" +
|
||||||
"Result2\221\003\n\rClientService\022 \n\003get\022\013.GetReq" +
|
"\006result\030\001 \003(\0132\r.ActionResult2\221\003\n\rClientS" +
|
||||||
"uest\032\014.GetResponse\022)\n\006mutate\022\016.MutateReq" +
|
"ervice\022 \n\003get\022\013.GetRequest\032\014.GetResponse" +
|
||||||
"uest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequ" +
|
"\022)\n\006mutate\022\016.MutateRequest\032\017.MutateRespo" +
|
||||||
"est\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowR" +
|
"nse\022#\n\004scan\022\014.ScanRequest\032\r.ScanResponse" +
|
||||||
"equest\032\020.LockRowResponse\0222\n\tunlockRow\022\021." +
|
"\022,\n\007lockRow\022\017.LockRowRequest\032\020.LockRowRe" +
|
||||||
"UnlockRowRequest\032\022.UnlockRowResponse\022>\n\r" +
|
"sponse\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022" +
|
||||||
"bulkLoadHFile\022\025.BulkLoadHFileRequest\032\026.B",
|
".UnlockRowResponse\022>\n\rbulkLoadHFile\022\025.Bu",
|
||||||
"ulkLoadHFileResponse\022D\n\017execCoprocessor\022" +
|
"lkLoadHFileRequest\032\026.BulkLoadHFileRespon" +
|
||||||
"\027.ExecCoprocessorRequest\032\030.ExecCoprocess" +
|
"se\022D\n\017execCoprocessor\022\027.ExecCoprocessorR" +
|
||||||
"orResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
|
"equest\032\030.ExecCoprocessorResponse\022&\n\005mult" +
|
||||||
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
|
"i\022\r.MultiRequest\032\016.MultiResponseBB\n*org." +
|
||||||
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
|
"apache.hadoop.hbase.protobuf.generatedB\014" +
|
||||||
|
"ClientProtosH\001\210\001\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -21804,7 +21871,7 @@ public final class ClientProtos {
|
||||||
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
|
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_BulkLoadHFileRequest_descriptor,
|
internal_static_BulkLoadHFileRequest_descriptor,
|
||||||
new java.lang.String[] { "Region", "FamilyPath", },
|
new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", },
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.class,
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.class,
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder.class);
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder.class);
|
||||||
internal_static_BulkLoadHFileRequest_FamilyPath_descriptor =
|
internal_static_BulkLoadHFileRequest_FamilyPath_descriptor =
|
||||||
|
|
|
@ -564,11 +564,14 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
HStore store = future.get();
|
HStore store = future.get();
|
||||||
|
|
||||||
this.stores.put(store.getColumnFamilyName().getBytes(), store);
|
this.stores.put(store.getColumnFamilyName().getBytes(), store);
|
||||||
long storeSeqId = store.getMaxSequenceId();
|
// Do not include bulk loaded files when determining seqIdForReplay
|
||||||
|
long storeSeqIdForReplay = store.getMaxSequenceId(false);
|
||||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
||||||
storeSeqId);
|
storeSeqIdForReplay);
|
||||||
if (maxSeqId == -1 || storeSeqId > maxSeqId) {
|
// Include bulk loaded files when determining seqIdForAssignment
|
||||||
maxSeqId = storeSeqId;
|
long storeSeqIdForAssignment = store.getMaxSequenceId(true);
|
||||||
|
if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
|
||||||
|
maxSeqId = storeSeqIdForAssignment;
|
||||||
}
|
}
|
||||||
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
|
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
|
||||||
if (maxStoreMemstoreTS > maxMemstoreTS) {
|
if (maxStoreMemstoreTS > maxMemstoreTS) {
|
||||||
|
@ -3314,11 +3317,12 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* rows with multiple column families atomically.
|
* rows with multiple column families atomically.
|
||||||
*
|
*
|
||||||
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
|
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
|
||||||
|
* @param assignSeqId
|
||||||
* @return true if successful, false if failed recoverably
|
* @return true if successful, false if failed recoverably
|
||||||
* @throws IOException if failed unrecoverably.
|
* @throws IOException if failed unrecoverably.
|
||||||
*/
|
*/
|
||||||
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
|
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
|
||||||
throws IOException {
|
boolean assignSeqId) throws IOException {
|
||||||
Preconditions.checkNotNull(familyPaths);
|
Preconditions.checkNotNull(familyPaths);
|
||||||
// we need writeLock for multi-family bulk load
|
// we need writeLock for multi-family bulk load
|
||||||
startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
|
startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
|
||||||
|
@ -3378,7 +3382,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
String path = p.getSecond();
|
String path = p.getSecond();
|
||||||
Store store = getStore(familyName);
|
Store store = getStore(familyName);
|
||||||
try {
|
try {
|
||||||
store.bulkLoadHFile(path);
|
store.bulkLoadHFile(path, assignSeqId ? this.log.obtainSeqNum() : -1);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// A failure here can cause an atomicity violation that we currently
|
// A failure here can cause an atomicity violation that we currently
|
||||||
// cannot recover from since it is likely a failed HDFS operation.
|
// cannot recover from since it is likely a failed HDFS operation.
|
||||||
|
|
|
@ -3284,7 +3284,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
}
|
}
|
||||||
boolean loaded = false;
|
boolean loaded = false;
|
||||||
if (!bypass) {
|
if (!bypass) {
|
||||||
loaded = region.bulkLoadHFiles(familyPaths);
|
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
|
||||||
}
|
}
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
|
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
|
||||||
|
|
|
@ -311,8 +311,8 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
/**
|
/**
|
||||||
* @return The maximum sequence id in all store files.
|
* @return The maximum sequence id in all store files.
|
||||||
*/
|
*/
|
||||||
long getMaxSequenceId() {
|
long getMaxSequenceId(boolean includeBulkFiles) {
|
||||||
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
|
return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -532,7 +532,7 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bulkLoadHFile(String srcPathStr) throws IOException {
|
public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
|
||||||
Path srcPath = new Path(srcPathStr);
|
Path srcPath = new Path(srcPathStr);
|
||||||
|
|
||||||
// Copy the file if it's on another filesystem
|
// Copy the file if it's on another filesystem
|
||||||
|
@ -547,7 +547,8 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
srcPath = tmpPath;
|
srcPath = tmpPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
Path dstPath = StoreFile.getRandomFilename(fs, homedir);
|
Path dstPath = StoreFile.getRandomFilename(fs, homedir,
|
||||||
|
(seqNum == -1) ? null : "_SeqId_" + seqNum + "_");
|
||||||
LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
|
LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
|
||||||
StoreFile.rename(fs, srcPath, dstPath);
|
StoreFile.rename(fs, srcPath, dstPath);
|
||||||
|
|
||||||
|
@ -990,7 +991,7 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Max-sequenceID is the last key in the files we're compacting
|
// Max-sequenceID is the last key in the files we're compacting
|
||||||
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
|
||||||
|
|
||||||
// Ready to go. Have list of files to compact.
|
// Ready to go. Have list of files to compact.
|
||||||
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
|
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
|
||||||
|
@ -1057,10 +1058,10 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
filesToCompact = filesToCompact.subList(count - N, count);
|
filesToCompact = filesToCompact.subList(count - N, count);
|
||||||
maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
|
maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
|
||||||
isMajor = (filesToCompact.size() == storefiles.size());
|
isMajor = (filesToCompact.size() == storefiles.size());
|
||||||
filesCompacting.addAll(filesToCompact);
|
filesCompacting.addAll(filesToCompact);
|
||||||
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
|
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
|
@ -1275,7 +1276,7 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
filesToCompact, filesCompacting);
|
filesToCompact, filesCompacting);
|
||||||
}
|
}
|
||||||
filesCompacting.addAll(filesToCompact.getFilesToCompact());
|
filesCompacting.addAll(filesToCompact.getFilesToCompact());
|
||||||
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
|
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
|
||||||
|
|
||||||
// major compaction iff all StoreFiles are included
|
// major compaction iff all StoreFiles are included
|
||||||
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
|
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
|
||||||
|
@ -1616,7 +1617,7 @@ public class HStore extends SchemaConfigured implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
|
public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
|
||||||
Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
|
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
|
||||||
ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
|
ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
|
||||||
return newList;
|
return newList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,8 +191,11 @@ public interface Store extends SchemaAware, HeapSize {
|
||||||
/**
|
/**
|
||||||
* This method should only be called from HRegion. It is assumed that the ranges of values in the
|
* This method should only be called from HRegion. It is assumed that the ranges of values in the
|
||||||
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
|
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
|
||||||
|
*
|
||||||
|
* @param srcPathStr
|
||||||
|
* @param sequenceId sequence Id associated with the HFile
|
||||||
*/
|
*/
|
||||||
public void bulkLoadHFile(String srcPathStr) throws IOException;
|
public void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
|
||||||
|
|
||||||
// General accessors into the state of the store
|
// General accessors into the state of the store
|
||||||
// TODO abstract some of this out into a metrics class
|
// TODO abstract some of this out into a metrics class
|
||||||
|
|
|
@ -201,7 +201,7 @@ public class StoreFile extends SchemaConfigured {
|
||||||
* this files id. Group 2 the referenced region name, etc.
|
* this files id. Group 2 the referenced region name, etc.
|
||||||
*/
|
*/
|
||||||
private static final Pattern REF_NAME_PARSER =
|
private static final Pattern REF_NAME_PARSER =
|
||||||
Pattern.compile("^([0-9a-f]+)(?:\\.(.+))?$");
|
Pattern.compile("^([0-9a-f]+(?:_SeqId_[0-9]+_)?)(?:\\.(.+))?$");
|
||||||
|
|
||||||
// StoreFile.Reader
|
// StoreFile.Reader
|
||||||
private volatile Reader reader;
|
private volatile Reader reader;
|
||||||
|
@ -390,13 +390,16 @@ public class StoreFile extends SchemaConfigured {
|
||||||
* the given list. Store files that were created by a mapreduce
|
* the given list. Store files that were created by a mapreduce
|
||||||
* bulk load are ignored, as they do not correspond to any edit
|
* bulk load are ignored, as they do not correspond to any edit
|
||||||
* log items.
|
* log items.
|
||||||
|
* @param sfs
|
||||||
|
* @param includeBulkLoadedFiles
|
||||||
* @return 0 if no non-bulk-load files are provided or, this is Store that
|
* @return 0 if no non-bulk-load files are provided or, this is Store that
|
||||||
* does not yet have any store files.
|
* does not yet have any store files.
|
||||||
*/
|
*/
|
||||||
public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
|
public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
|
||||||
|
boolean includeBulkLoadedFiles) {
|
||||||
long max = 0;
|
long max = 0;
|
||||||
for (StoreFile sf : sfs) {
|
for (StoreFile sf : sfs) {
|
||||||
if (!sf.isBulkLoadResult()) {
|
if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
|
||||||
max = Math.max(max, sf.getMaxSequenceId());
|
max = Math.max(max, sf.getMaxSequenceId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -539,6 +542,24 @@ public class StoreFile extends SchemaConfigured {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isBulkLoadResult()){
|
||||||
|
// generate the sequenceId from the fileName
|
||||||
|
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
|
||||||
|
String fileName = this.path.getName();
|
||||||
|
int startPos = fileName.indexOf("SeqId_");
|
||||||
|
if (startPos != -1) {
|
||||||
|
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
|
||||||
|
fileName.indexOf('_', startPos + 6)));
|
||||||
|
// Handle reference files as done above.
|
||||||
|
if (isReference()) {
|
||||||
|
if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
|
||||||
|
this.sequenceid += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.reader.setSequenceID(this.sequenceid);
|
this.reader.setSequenceID(this.sequenceid);
|
||||||
|
|
||||||
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||||
|
@ -1718,18 +1739,27 @@ public class StoreFile extends SchemaConfigured {
|
||||||
*/
|
*/
|
||||||
abstract static class Comparators {
|
abstract static class Comparators {
|
||||||
/**
|
/**
|
||||||
* Comparator that compares based on the flush time of
|
* Comparator that compares based on the Sequence Ids of the
|
||||||
* the StoreFiles. All bulk loads are placed before all non-
|
* the StoreFiles. Bulk loads that did not request a seq ID
|
||||||
* bulk loads, and then all files are sorted by sequence ID.
|
* are given a seq id of -1; thus, they are placed before all non-
|
||||||
|
* bulk loads, and bulk loads with sequence Id. Among these files,
|
||||||
|
* the bulkLoadTime is used to determine the ordering.
|
||||||
* If there are ties, the path name is used as a tie-breaker.
|
* If there are ties, the path name is used as a tie-breaker.
|
||||||
*/
|
*/
|
||||||
static final Comparator<StoreFile> FLUSH_TIME =
|
static final Comparator<StoreFile> SEQ_ID =
|
||||||
Ordering.compound(ImmutableList.of(
|
Ordering.compound(ImmutableList.of(
|
||||||
Ordering.natural().onResultOf(new GetBulkTime()),
|
|
||||||
Ordering.natural().onResultOf(new GetSeqId()),
|
Ordering.natural().onResultOf(new GetSeqId()),
|
||||||
|
Ordering.natural().onResultOf(new GetBulkTime()),
|
||||||
Ordering.natural().onResultOf(new GetPathName())
|
Ordering.natural().onResultOf(new GetPathName())
|
||||||
));
|
));
|
||||||
|
|
||||||
|
private static class GetSeqId implements Function<StoreFile, Long> {
|
||||||
|
@Override
|
||||||
|
public Long apply(StoreFile sf) {
|
||||||
|
return sf.getMaxSequenceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class GetBulkTime implements Function<StoreFile, Long> {
|
private static class GetBulkTime implements Function<StoreFile, Long> {
|
||||||
@Override
|
@Override
|
||||||
public Long apply(StoreFile sf) {
|
public Long apply(StoreFile sf) {
|
||||||
|
@ -1737,13 +1767,7 @@ public class StoreFile extends SchemaConfigured {
|
||||||
return sf.getBulkLoadTimestamp();
|
return sf.getBulkLoadTimestamp();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private static class GetSeqId implements Function<StoreFile, Long> {
|
|
||||||
@Override
|
|
||||||
public Long apply(StoreFile sf) {
|
|
||||||
if (sf.isBulkLoadResult()) return -1L;
|
|
||||||
return sf.getMaxSequenceId();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private static class GetPathName implements Function<StoreFile, String> {
|
private static class GetPathName implements Function<StoreFile, String> {
|
||||||
@Override
|
@Override
|
||||||
public String apply(StoreFile sf) {
|
public String apply(StoreFile sf) {
|
||||||
|
|
|
@ -1499,7 +1499,7 @@ public class HLog implements Syncable {
|
||||||
/**
|
/**
|
||||||
* Obtain a log sequence number.
|
* Obtain a log sequence number.
|
||||||
*/
|
*/
|
||||||
private long obtainSeqNum() {
|
public long obtainSeqNum() {
|
||||||
return this.logSeqNum.incrementAndGet();
|
return this.logSeqNum.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -247,6 +247,7 @@ message UnlockRowResponse {
|
||||||
message BulkLoadHFileRequest {
|
message BulkLoadHFileRequest {
|
||||||
required RegionSpecifier region = 1;
|
required RegionSpecifier region = 1;
|
||||||
repeated FamilyPath familyPath = 2;
|
repeated FamilyPath familyPath = 2;
|
||||||
|
optional bool assignSeqNum = 3;
|
||||||
|
|
||||||
message FamilyPath {
|
message FamilyPath {
|
||||||
required bytes family = 1;
|
required bytes family = 1;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -31,10 +32,12 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.*;
|
import org.junit.*;
|
||||||
|
@ -155,6 +158,55 @@ public class TestLoadIncrementalHFiles {
|
||||||
assertEquals(expectedRows, util.countRows(table));
|
assertEquals(expectedRows, util.countRows(table));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyAssignedSequenceNumber(String testName,
|
||||||
|
byte[][][] hfileRanges, boolean nonZero) throws Exception {
|
||||||
|
Path dir = util.getDataTestDir(testName);
|
||||||
|
FileSystem fs = util.getTestFileSystem();
|
||||||
|
dir = dir.makeQualified(fs);
|
||||||
|
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
|
||||||
|
|
||||||
|
int hfileIdx = 0;
|
||||||
|
for (byte[][] range : hfileRanges) {
|
||||||
|
byte[] from = range[0];
|
||||||
|
byte[] to = range[1];
|
||||||
|
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||||
|
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
|
||||||
|
|
||||||
|
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
||||||
|
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
|
||||||
|
htd.addFamily(familyDesc);
|
||||||
|
admin.createTable(htd, SPLIT_KEYS);
|
||||||
|
|
||||||
|
HTable table = new HTable(util.getConfiguration(), TABLE);
|
||||||
|
util.waitTableAvailable(TABLE, 30000);
|
||||||
|
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
|
||||||
|
util.getConfiguration());
|
||||||
|
|
||||||
|
// Do a dummy put to increase the hlog sequence number
|
||||||
|
Put put = new Put(Bytes.toBytes("row"));
|
||||||
|
put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
|
||||||
|
table.put(put);
|
||||||
|
|
||||||
|
loader.doBulkLoad(dir, table);
|
||||||
|
|
||||||
|
// Get the store files
|
||||||
|
List<StoreFile> files = util.getHBaseCluster().
|
||||||
|
getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
|
||||||
|
for (StoreFile file: files) {
|
||||||
|
// the sequenceId gets initialized during createReader
|
||||||
|
file.createReader();
|
||||||
|
|
||||||
|
if (nonZero)
|
||||||
|
assertTrue(file.getMaxSequenceId() > 0);
|
||||||
|
else
|
||||||
|
assertTrue(file.getMaxSequenceId() == -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitStoreFile() throws IOException {
|
public void testSplitStoreFile() throws IOException {
|
||||||
Path dir = util.getDataTestDir("testSplitHFile");
|
Path dir = util.getDataTestDir("testSplitHFile");
|
||||||
|
@ -220,6 +272,8 @@ public class TestLoadIncrementalHFiles {
|
||||||
writer.append(kv);
|
writer.append(kv);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
||||||
|
Bytes.toBytes(System.currentTimeMillis()));
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -587,7 +587,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
|
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
|
||||||
|
|
||||||
List<StoreFile> storeFiles = store.getStorefiles();
|
List<StoreFile> storeFiles = store.getStorefiles();
|
||||||
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
|
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true);
|
||||||
Compactor tool = new Compactor(this.conf);
|
Compactor tool = new Compactor(this.conf);
|
||||||
|
|
||||||
StoreFile.Writer compactedFile =
|
StoreFile.Writer compactedFile =
|
||||||
|
|
|
@ -150,7 +150,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
+ Bytes.toStringBinary(row));
|
+ Bytes.toStringBinary(row));
|
||||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||||
BulkLoadHFileRequest request =
|
BulkLoadHFileRequest request =
|
||||||
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
|
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
|
||||||
server.bulkLoadHFile(null, request);
|
server.bulkLoadHFile(null, request);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -608,8 +608,8 @@ public class TestStoreFile extends HBaseTestCase {
|
||||||
fs.delete(f, true);
|
fs.delete(f, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFlushTimeComparator() {
|
public void testSeqIdComparator() {
|
||||||
assertOrdering(StoreFile.Comparators.FLUSH_TIME,
|
assertOrdering(StoreFile.Comparators.SEQ_ID,
|
||||||
mockStoreFile(true, 1000, -1, "/foo/123"),
|
mockStoreFile(true, 1000, -1, "/foo/123"),
|
||||||
mockStoreFile(true, 1000, -1, "/foo/126"),
|
mockStoreFile(true, 1000, -1, "/foo/126"),
|
||||||
mockStoreFile(true, 2000, -1, "/foo/126"),
|
mockStoreFile(true, 2000, -1, "/foo/126"),
|
||||||
|
@ -640,13 +640,7 @@ public class TestStoreFile extends HBaseTestCase {
|
||||||
StoreFile mock = Mockito.mock(StoreFile.class);
|
StoreFile mock = Mockito.mock(StoreFile.class);
|
||||||
Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
|
Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
|
||||||
Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
|
Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
|
||||||
if (bulkLoad) {
|
Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
|
||||||
// Bulk load files will throw if you ask for their sequence ID
|
|
||||||
Mockito.doThrow(new IllegalAccessError("bulk load"))
|
|
||||||
.when(mock).getMaxSequenceId();
|
|
||||||
} else {
|
|
||||||
Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
|
|
||||||
}
|
|
||||||
Mockito.doReturn(new Path(path)).when(mock).getPath();
|
Mockito.doReturn(new Path(path)).when(mock).getPath();
|
||||||
String name = "mock storefile, bulkLoad=" + bulkLoad +
|
String name = "mock storefile, bulkLoad=" + bulkLoad +
|
||||||
" bulkTimestamp=" + bulkTimestamp +
|
" bulkTimestamp=" + bulkTimestamp +
|
||||||
|
|
|
@ -310,7 +310,7 @@ public class TestWALReplay {
|
||||||
writer.close();
|
writer.close();
|
||||||
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
|
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
|
||||||
hfs.add(Pair.newPair(family, f.toString()));
|
hfs.add(Pair.newPair(family, f.toString()));
|
||||||
region.bulkLoadHFiles(hfs);
|
region.bulkLoadHFiles(hfs, true);
|
||||||
// Add an edit so something in the WAL
|
// Add an edit so something in the WAL
|
||||||
region.put((new Put(row)).add(family, family, family));
|
region.put((new Put(row)).add(family, family, family));
|
||||||
wal.sync();
|
wal.sync();
|
||||||
|
|
Loading…
Reference in New Issue