HBASE-17292 Add observer notification before bulk loaded hfile is moved - addendum properly handles copied hfile

This commit is contained in:
tedyu 2016-12-16 15:31:30 -08:00
parent c3ce02d592
commit 49b0bab504
3 changed files with 18 additions and 16 deletions

View File

@ -5630,8 +5630,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (bulkLoadListener != null) { if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
} }
Path commitedStoreFile = store.preBulkLoadHFile(finalPath, seqId); Pair<Path, Path> pair = store.preBulkLoadHFile(finalPath, seqId);
lst.add(new Pair<Path, Path>(new Path(finalPath), commitedStoreFile)); lst.add(pair);
} catch (IOException ioe) { } catch (IOException ioe) {
// A failure here can cause an atomicity violation that we currently // A failure here can cause an atomicity violation that we currently
// cannot recover from since it is likely a failed HDFS operation. // cannot recover from since it is likely a failed HDFS operation.

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.Lists;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -54,8 +52,11 @@ import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import com.google.common.collect.Lists;
/** /**
* View to an on-disk Region. * View to an on-disk Region.
* Provides the set of methods necessary to interact with the on-disk region data. * Provides the set of methods necessary to interact with the on-disk region data.
@ -494,7 +495,7 @@ public class HRegionFileSystem {
* @return The destination {@link Path} of the bulk loaded file * @return The destination {@link Path} of the bulk loaded file
* @throws IOException * @throws IOException
*/ */
Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
throws IOException { throws IOException {
// Copy the file if it's on another filesystem // Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem srcFs = srcPath.getFileSystem(conf);
@ -512,7 +513,7 @@ public class HRegionFileSystem {
srcPath = tmpPath; srcPath = tmpPath;
} }
return preCommitStoreFile(familyName, srcPath, seqNum, true); return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
} }
// =========================================================================== // ===========================================================================

View File

@ -18,13 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -76,8 +69,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -88,14 +79,24 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/** /**
* A Store holds a column family in a Region. Its a memstore and a set of zero * A Store holds a column family in a Region. Its a memstore and a set of zero
* or more StoreFiles, which stretch backwards over time. * or more StoreFiles, which stretch backwards over time.
@ -780,7 +781,7 @@ public class HStore implements Store {
* @param srcPathStr * @param srcPathStr
* @param seqNum sequence Id associated with the HFile * @param seqNum sequence Id associated with the HFile
*/ */
public Path preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
Path srcPath = new Path(srcPathStr); Path srcPath = new Path(srcPathStr);
return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
} }