From 49b0bab50436433feefd842a223e0656e86df247 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 16 Dec 2016 15:31:30 -0800 Subject: [PATCH] HBASE-17292 Add observer notification before bulk loaded hfile is moved - addendum properly handles copied hfile --- .../hadoop/hbase/regionserver/HRegion.java | 4 ++-- .../hbase/regionserver/HRegionFileSystem.java | 9 ++++---- .../hadoop/hbase/regionserver/HStore.java | 21 ++++++++++--------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2e3fc45fe5e..b664a4a392a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5630,8 +5630,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); } - Path commitedStoreFile = store.preBulkLoadHFile(finalPath, seqId); - lst.add(new Pair(new Path(finalPath), commitedStoreFile)); + Pair pair = store.preBulkLoadHFile(finalPath, seqId); + lst.add(pair); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently // cannot recover from since it is likely a failed HDFS operation. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 0cfa0d6c7ae..aae85aadbae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.Lists; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -54,8 +52,11 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import com.google.common.collect.Lists; + /** * View to an on-disk Region. * Provides the set of methods necessary to interact with the on-disk region data. @@ -494,7 +495,7 @@ public class HRegionFileSystem { * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ - Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) + Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); @@ -512,7 +513,7 @@ public class HRegionFileSystem { srcPath = tmpPath; } - return preCommitStoreFile(familyName, srcPath, seqNum, true); + return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true)); } // =========================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f8e91cf00ee..007c28b0ea0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,13 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; @@ -76,8 +69,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -88,14 +79,24 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -780,7 +781,7 @@ public class HStore implements Store { * @param srcPathStr * @param seqNum sequence Id associated with the HFile */ - public Path preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Pair preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { Path srcPath = new Path(srcPathStr); return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); }