diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 0e8f50d2efe..49335c4d80f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -97,11 +97,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private Configuration cfg; public static String NAME = "completebulkload"; + private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + private boolean assignSeqIds; public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); this.cfg = conf; this.hbAdmin = new HBaseAdmin(conf); + assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); } private void usage() { @@ -482,7 +485,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.debug("Going to connect to server " + location + " for row " + Bytes.toStringBinary(row)); byte[] regionName = location.getRegionInfo().getRegionName(); - return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName); + return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName, + assignSeqIds); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 17dbddaad5d..bc0df407557 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1265,14 +1265,15 @@ public final class ProtobufUtil { * @param client * @param familyPaths * @param regionName + * @param assignSeqNum * @return true if all are loaded * @throws IOException */ public static boolean bulkLoadHFile(final ClientProtocol client, final List> familyPaths, - final byte[] regionName) throws IOException { + final byte[] regionName, boolean assignSeqNum) throws IOException { BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName); + RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index b5bc865ce9b..59741def3db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -454,10 +454,12 @@ public final class RequestConverter { * * @param familyPaths * @param regionName + * @param assignSeqNum * @return a bulk load request */ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( - final List> familyPaths, final byte[] regionName) { + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum) { BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -468,6 +470,7 @@ public final class RequestConverter { familyPathBuilder.setPath(familyPath.getSecond()); builder.addFamilyPath(familyPathBuilder.build()); } + builder.setAssignSeqNum(assignSeqNum); return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 790c508ffe0..25e07053b63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -13967,6 +13967,10 @@ public final class ClientProtos { getFamilyPathOrBuilderList(); org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPathOrBuilder getFamilyPathOrBuilder( int index); + + // optional bool assignSeqNum = 3; + boolean hasAssignSeqNum(); + boolean getAssignSeqNum(); } public static final class BulkLoadHFileRequest extends com.google.protobuf.GeneratedMessage @@ -14524,9 +14528,20 @@ public final class ClientProtos { return familyPath_.get(index); } + // optional bool assignSeqNum = 3; + public static final int ASSIGNSEQNUM_FIELD_NUMBER = 3; + private boolean assignSeqNum_; + public boolean hasAssignSeqNum() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getAssignSeqNum() { + return assignSeqNum_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); familyPath_ = java.util.Collections.emptyList(); + assignSeqNum_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -14560,6 +14575,9 @@ public final class ClientProtos { for (int i = 0; i < familyPath_.size(); i++) { output.writeMessage(2, familyPath_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(3, assignSeqNum_); + } getUnknownFields().writeTo(output); } @@ -14577,6 +14595,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(2, familyPath_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, assignSeqNum_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -14607,6 +14629,11 @@ public final class ClientProtos { } result = result && getFamilyPathList() .equals(other.getFamilyPathList()); + result = result && (hasAssignSeqNum() == other.hasAssignSeqNum()); + if (hasAssignSeqNum()) { + result = result && (getAssignSeqNum() + == other.getAssignSeqNum()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -14624,6 +14651,10 @@ public final class ClientProtos { hash = (37 * hash) + FAMILYPATH_FIELD_NUMBER; hash = (53 * hash) + getFamilyPathList().hashCode(); } + if (hasAssignSeqNum()) { + hash = (37 * hash) + ASSIGNSEQNUM_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getAssignSeqNum()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -14754,6 +14785,8 @@ public final class ClientProtos { } else { familyPathBuilder_.clear(); } + assignSeqNum_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -14809,6 +14842,10 @@ public final class ClientProtos { } else { result.familyPath_ = familyPathBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.assignSeqNum_ = assignSeqNum_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -14854,6 +14891,9 @@ public final class ClientProtos { } } } + if (other.hasAssignSeqNum()) { + setAssignSeqNum(other.getAssignSeqNum()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -14914,6 +14954,11 @@ public final class ClientProtos { addFamilyPath(subBuilder.buildPartial()); break; } + case 24: { + bitField0_ |= 0x00000004; + assignSeqNum_ = input.readBool(); + break; + } } } } @@ -15196,6 +15241,27 @@ public final class ClientProtos { return familyPathBuilder_; } + // optional bool assignSeqNum = 3; + private boolean assignSeqNum_ ; + public boolean hasAssignSeqNum() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public boolean getAssignSeqNum() { + return assignSeqNum_; + } + public Builder setAssignSeqNum(boolean value) { + bitField0_ |= 0x00000004; + assignSeqNum_ = value; + onChanged(); + return this; + } + public Builder clearAssignSeqNum() { + bitField0_ = (bitField0_ & ~0x00000004); + assignSeqNum_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:BulkLoadHFileRequest) } @@ -21617,38 +21683,39 @@ public final class ClientProtos { "\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" + "\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " + "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" + - "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024BulkLo" + + "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" + "adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" + "pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" + - "FileRequest.FamilyPath\032*\n\nFamilyPath\022\016\n\006" + - "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF" + - "ileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003" + - "row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nmetho", - "dName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameStri" + - "ngPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytesPai" + - "r\"O\n\026ExecCoprocessorRequest\022 \n\006region\030\001 " + - "\002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005.Ex" + - "ec\"8\n\027ExecCoprocessorResponse\022\035\n\005value\030\001" + - " \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006m" + - "utate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022" + - "\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005" + - "value\030\001 \001(\0132\016.NameBytesPair\022!\n\texception" + - "\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 ", - "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006acti" + - "on\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\"." + - "\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Action" + - "Result2\221\003\n\rClientService\022 \n\003get\022\013.GetReq" + - "uest\032\014.GetResponse\022)\n\006mutate\022\016.MutateReq" + - "uest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequ" + - "est\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowR" + - "equest\032\020.LockRowResponse\0222\n\tunlockRow\022\021." + - "UnlockRowRequest\032\022.UnlockRowResponse\022>\n\r" + - "bulkLoadHFile\022\025.BulkLoadHFileRequest\032\026.B", - "ulkLoadHFileResponse\022D\n\017execCoprocessor\022" + - "\027.ExecCoprocessorRequest\032\030.ExecCoprocess" + - "orResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" + - "tiResponseBB\n*org.apache.hadoop.hbase.pr" + - "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" + + " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" + + "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" + + "ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto", + "colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" + + "perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" + + "r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" + + "orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + + "ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" + + "sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" + + "ir\"N\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Muta" + + "te\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Ex" + + "ec\"P\n\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.Name" + + "BytesPair\022!\n\texception\030\002 \001(\0132\016.NameBytes", + "Pair\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.R" + + "egionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAc" + + "tion\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n" + + "\006result\030\001 \003(\0132\r.ActionResult2\221\003\n\rClientS" + + "ervice\022 \n\003get\022\013.GetRequest\032\014.GetResponse" + + "\022)\n\006mutate\022\016.MutateRequest\032\017.MutateRespo" + + "nse\022#\n\004scan\022\014.ScanRequest\032\r.ScanResponse" + + "\022,\n\007lockRow\022\017.LockRowRequest\032\020.LockRowRe" + + "sponse\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022" + + ".UnlockRowResponse\022>\n\rbulkLoadHFile\022\025.Bu", + "lkLoadHFileRequest\032\026.BulkLoadHFileRespon" + + "se\022D\n\017execCoprocessor\022\027.ExecCoprocessorR" + + "equest\032\030.ExecCoprocessorResponse\022&\n\005mult" + + "i\022\r.MultiRequest\032\016.MultiResponseBB\n*org." + + "apache.hadoop.hbase.protobuf.generatedB\014" + + "ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21804,7 +21871,7 @@ public final class ClientProtos { internal_static_BulkLoadHFileRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_BulkLoadHFileRequest_descriptor, - new java.lang.String[] { "Region", "FamilyPath", }, + new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder.class); internal_static_BulkLoadHFileRequest_FamilyPath_descriptor = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c06e29a3837..d76c282bb92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -564,11 +564,14 @@ public class HRegion implements HeapSize { // , Writable{ HStore store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store); - long storeSeqId = store.getMaxSequenceId(); + // Do not include bulk loaded files when determining seqIdForReplay + long storeSeqIdForReplay = store.getMaxSequenceId(false); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), - storeSeqId); - if (maxSeqId == -1 || storeSeqId > maxSeqId) { - maxSeqId = storeSeqId; + storeSeqIdForReplay); + // Include bulk loaded files when determining seqIdForAssignment + long storeSeqIdForAssignment = store.getMaxSequenceId(true); + if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) { + maxSeqId = storeSeqIdForAssignment; } long maxStoreMemstoreTS = store.getMaxMemstoreTS(); if (maxStoreMemstoreTS > maxMemstoreTS) { @@ -3314,11 +3317,12 @@ public class HRegion implements HeapSize { // , Writable{ * rows with multiple column families atomically. * * @param familyPaths List of Pair + * @param assignSeqId * @return true if successful, false if failed recoverably * @throws IOException if failed unrecoverably. */ - public boolean bulkLoadHFiles(List> familyPaths) - throws IOException { + public boolean bulkLoadHFiles(List> familyPaths, + boolean assignSeqId) throws IOException { Preconditions.checkNotNull(familyPaths); // we need writeLock for multi-family bulk load startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); @@ -3378,7 +3382,7 @@ public class HRegion implements HeapSize { // , Writable{ String path = p.getSecond(); Store store = getStore(familyName); try { - store.bulkLoadHFile(path); + store.bulkLoadHFile(path, assignSeqId ? this.log.obtainSeqNum() : -1); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently // cannot recover from since it is likely a failed HDFS operation. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 508fb132a43..d9c56cc91b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3284,7 +3284,7 @@ public class HRegionServer implements ClientProtocol, } boolean loaded = false; if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths); + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum()); } if (region.getCoprocessorHost() != null) { loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4d12e7294a7..b8bb95a25c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -311,8 +311,8 @@ public class HStore extends SchemaConfigured implements Store { /** * @return The maximum sequence id in all store files. */ - long getMaxSequenceId() { - return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); + long getMaxSequenceId(boolean includeBulkFiles) { + return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles); } @Override @@ -532,7 +532,7 @@ public class HStore extends SchemaConfigured implements Store { } @Override - public void bulkLoadHFile(String srcPathStr) throws IOException { + public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { Path srcPath = new Path(srcPathStr); // Copy the file if it's on another filesystem @@ -547,7 +547,8 @@ public class HStore extends SchemaConfigured implements Store { srcPath = tmpPath; } - Path dstPath = StoreFile.getRandomFilename(fs, homedir); + Path dstPath = StoreFile.getRandomFilename(fs, homedir, + (seqNum == -1) ? null : "_SeqId_" + seqNum + "_"); LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath); StoreFile.rename(fs, srcPath, dstPath); @@ -990,7 +991,7 @@ public class HStore extends SchemaConfigured implements Store { } // Max-sequenceID is the last key in the files we're compacting - long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); + long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true); // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " @@ -1057,10 +1058,10 @@ public class HStore extends SchemaConfigured implements Store { } filesToCompact = filesToCompact.subList(count - N, count); - maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); + maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true); isMajor = (filesToCompact.size() == storefiles.size()); filesCompacting.addAll(filesToCompact); - Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME); + Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); } } finally { this.lock.readLock().unlock(); @@ -1275,7 +1276,7 @@ public class HStore extends SchemaConfigured implements Store { filesToCompact, filesCompacting); } filesCompacting.addAll(filesToCompact.getFilesToCompact()); - Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME); + Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); // major compaction iff all StoreFiles are included boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size()); @@ -1616,7 +1617,7 @@ public class HStore extends SchemaConfigured implements Store { } public ImmutableList sortAndClone(List storeFiles) { - Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME); + Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID); ImmutableList newList = ImmutableList.copyOf(storeFiles); return newList; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 3ef97e73fe6..d391a16df74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -191,8 +191,11 @@ public interface Store extends SchemaAware, HeapSize { /** * This method should only be called from HRegion. It is assumed that the ranges of values in the * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) + * + * @param srcPathStr + * @param sequenceId sequence Id associated with the HFile */ - public void bulkLoadHFile(String srcPathStr) throws IOException; + public void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; // General accessors into the state of the store // TODO abstract some of this out into a metrics class diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index f6d5ea84fcd..d2132a527ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -201,7 +201,7 @@ public class StoreFile extends SchemaConfigured { * this files id. Group 2 the referenced region name, etc. */ private static final Pattern REF_NAME_PARSER = - Pattern.compile("^([0-9a-f]+)(?:\\.(.+))?$"); + Pattern.compile("^([0-9a-f]+(?:_SeqId_[0-9]+_)?)(?:\\.(.+))?$"); // StoreFile.Reader private volatile Reader reader; @@ -390,13 +390,16 @@ public class StoreFile extends SchemaConfigured { * the given list. Store files that were created by a mapreduce * bulk load are ignored, as they do not correspond to any edit * log items. + * @param sfs + * @param includeBulkLoadedFiles * @return 0 if no non-bulk-load files are provided or, this is Store that * does not yet have any store files. */ - public static long getMaxSequenceIdInList(Collection sfs) { + public static long getMaxSequenceIdInList(Collection sfs, + boolean includeBulkLoadedFiles) { long max = 0; for (StoreFile sf : sfs) { - if (!sf.isBulkLoadResult()) { + if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) { max = Math.max(max, sf.getMaxSequenceId()); } } @@ -539,6 +542,24 @@ public class StoreFile extends SchemaConfigured { } } } + + if (isBulkLoadResult()){ + // generate the sequenceId from the fileName + // fileName is of the form _SeqId__ + String fileName = this.path.getName(); + int startPos = fileName.indexOf("SeqId_"); + if (startPos != -1) { + this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, + fileName.indexOf('_', startPos + 6))); + // Handle reference files as done above. + if (isReference()) { + if (Reference.isTopFileRegion(this.reference.getFileRegion())) { + this.sequenceid += 1; + } + } + } + } + this.reader.setSequenceID(this.sequenceid); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); @@ -1718,18 +1739,27 @@ public class StoreFile extends SchemaConfigured { */ abstract static class Comparators { /** - * Comparator that compares based on the flush time of - * the StoreFiles. All bulk loads are placed before all non- - * bulk loads, and then all files are sorted by sequence ID. + * Comparator that compares based on the Sequence Ids of the + * the StoreFiles. Bulk loads that did not request a seq ID + * are given a seq id of -1; thus, they are placed before all non- + * bulk loads, and bulk loads with sequence Id. Among these files, + * the bulkLoadTime is used to determine the ordering. * If there are ties, the path name is used as a tie-breaker. */ - static final Comparator FLUSH_TIME = + static final Comparator SEQ_ID = Ordering.compound(ImmutableList.of( - Ordering.natural().onResultOf(new GetBulkTime()), Ordering.natural().onResultOf(new GetSeqId()), + Ordering.natural().onResultOf(new GetBulkTime()), Ordering.natural().onResultOf(new GetPathName()) )); + private static class GetSeqId implements Function { + @Override + public Long apply(StoreFile sf) { + return sf.getMaxSequenceId(); + } + } + private static class GetBulkTime implements Function { @Override public Long apply(StoreFile sf) { @@ -1737,13 +1767,7 @@ public class StoreFile extends SchemaConfigured { return sf.getBulkLoadTimestamp(); } } - private static class GetSeqId implements Function { - @Override - public Long apply(StoreFile sf) { - if (sf.isBulkLoadResult()) return -1L; - return sf.getMaxSequenceId(); - } - } + private static class GetPathName implements Function { @Override public String apply(StoreFile sf) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1f0a15f9c7b..3eb99219581 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1499,7 +1499,7 @@ public class HLog implements Syncable { /** * Obtain a log sequence number. */ - private long obtainSeqNum() { + public long obtainSeqNum() { return this.logSeqNum.incrementAndGet(); } diff --git a/hbase-server/src/main/protobuf/Client.proto b/hbase-server/src/main/protobuf/Client.proto index 986de49a161..0952d83035a 100644 --- a/hbase-server/src/main/protobuf/Client.proto +++ b/hbase-server/src/main/protobuf/Client.proto @@ -247,6 +247,7 @@ message UnlockRowResponse { message BulkLoadHFileRequest { required RegionSpecifier region = 1; repeated FamilyPath familyPath = 2; + optional bool assignSeqNum = 3; message FamilyPath { required bytes family = 1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index cb68cf42b40..b1a71997439 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.TreeMap; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,10 +32,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.*; @@ -155,6 +158,55 @@ public class TestLoadIncrementalHFiles { assertEquals(expectedRows, util.countRows(table)); } + private void verifyAssignedSequenceNumber(String testName, + byte[][][] hfileRanges, boolean nonZero) throws Exception { + Path dir = util.getDataTestDir(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); + } + + final byte[] TABLE = Bytes.toBytes("mytable_"+testName); + + HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); + HTableDescriptor htd = new HTableDescriptor(TABLE); + HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); + htd.addFamily(familyDesc); + admin.createTable(htd, SPLIT_KEYS); + + HTable table = new HTable(util.getConfiguration(), TABLE); + util.waitTableAvailable(TABLE, 30000); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + util.getConfiguration()); + + // Do a dummy put to increase the hlog sequence number + Put put = new Put(Bytes.toBytes("row")); + put.add(FAMILY, QUALIFIER, Bytes.toBytes("value")); + table.put(put); + + loader.doBulkLoad(dir, table); + + // Get the store files + List files = util.getHBaseCluster(). + getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles(); + for (StoreFile file: files) { + // the sequenceId gets initialized during createReader + file.createReader(); + + if (nonZero) + assertTrue(file.getMaxSequenceId() > 0); + else + assertTrue(file.getMaxSequenceId() == -1); + } + } + @Test public void testSplitStoreFile() throws IOException { Path dir = util.getDataTestDir("testSplitHFile"); @@ -220,6 +272,8 @@ public class TestLoadIncrementalHFiles { writer.append(kv); } } finally { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); writer.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 477ec511fe7..aec6bb5f6ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -587,7 +587,7 @@ public class TestCompaction extends HBaseTestCase { HStore store = (HStore) r.getStore(COLUMN_FAMILY); List storeFiles = store.getStorefiles(); - long maxId = StoreFile.getMaxSequenceIdInList(storeFiles); + long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true); Compactor tool = new Compactor(this.conf); StoreFile.Writer compactedFile = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index b5e43e06bc7..cc8408d5cb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -150,7 +150,7 @@ public class TestHRegionServerBulkLoad { + Bytes.toStringBinary(row)); byte[] regionName = location.getRegionInfo().getRegionName(); BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName); + RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); server.bulkLoadHFile(null, request); return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index f216e835b50..dd6fa8152ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -608,8 +608,8 @@ public class TestStoreFile extends HBaseTestCase { fs.delete(f, true); } - public void testFlushTimeComparator() { - assertOrdering(StoreFile.Comparators.FLUSH_TIME, + public void testSeqIdComparator() { + assertOrdering(StoreFile.Comparators.SEQ_ID, mockStoreFile(true, 1000, -1, "/foo/123"), mockStoreFile(true, 1000, -1, "/foo/126"), mockStoreFile(true, 2000, -1, "/foo/126"), @@ -640,13 +640,7 @@ public class TestStoreFile extends HBaseTestCase { StoreFile mock = Mockito.mock(StoreFile.class); Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult(); Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp(); - if (bulkLoad) { - // Bulk load files will throw if you ask for their sequence ID - Mockito.doThrow(new IllegalAccessError("bulk load")) - .when(mock).getMaxSequenceId(); - } else { - Mockito.doReturn(seqId).when(mock).getMaxSequenceId(); - } + Mockito.doReturn(seqId).when(mock).getMaxSequenceId(); Mockito.doReturn(new Path(path)).when(mock).getPath(); String name = "mock storefile, bulkLoad=" + bulkLoad + " bulkTimestamp=" + bulkTimestamp + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 21867c0f735..a700cb75815 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -310,7 +310,7 @@ public class TestWALReplay { writer.close(); List > hfs= new ArrayList>(1); hfs.add(Pair.newPair(family, f.toString())); - region.bulkLoadHFiles(hfs); + region.bulkLoadHFiles(hfs, true); // Add an edit so something in the WAL region.put((new Put(row)).add(family, family, family)); wal.sync();