HBASE-17292 Add observer notification before bulk loaded hfile is moved to region directory
This commit is contained in:
parent
b9689808eb
commit
0b69f59133
|
@ -496,6 +496,16 @@ public class BaseRegionObserver implements RegionObserver {
|
||||||
List<Pair<byte[], String>> familyPaths) throws IOException {
|
List<Pair<byte[], String>> familyPaths) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
|
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
|
final byte[] family, Path srcPath, Path dstPath) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
|
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
|
||||||
|
|
|
@ -1238,6 +1238,29 @@ public interface RegionObserver extends Coprocessor {
|
||||||
void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
List<Pair<byte[], String>> familyPaths) throws IOException;
|
List<Pair<byte[], String>> familyPaths) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before moving bulk loaded hfile to region directory.
|
||||||
|
*
|
||||||
|
* @param ctx
|
||||||
|
* @param family column family
|
||||||
|
* @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir }
|
||||||
|
* Each pair are for the same hfile.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
|
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after moving bulk loaded hfile to region directory.
|
||||||
|
*
|
||||||
|
* @param ctx
|
||||||
|
* @param family column family
|
||||||
|
* @param srcPath Path to file before the move
|
||||||
|
* @param dstPath Path to file after the move
|
||||||
|
*/
|
||||||
|
void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||||
|
final byte[] family, Path srcPath, Path dstPath) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called after bulkLoadHFile.
|
* Called after bulkLoadHFile.
|
||||||
*
|
*
|
||||||
|
|
|
@ -5682,17 +5682,55 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
|
||||||
|
new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
for (Pair<byte[], String> p : familyPaths) {
|
for (Pair<byte[], String> p : familyPaths) {
|
||||||
byte[] familyName = p.getFirst();
|
byte[] familyName = p.getFirst();
|
||||||
String path = p.getSecond();
|
String path = p.getSecond();
|
||||||
Store store = getStore(familyName);
|
Store store = getStore(familyName);
|
||||||
|
if (!familyWithFinalPath.containsKey(familyName)) {
|
||||||
|
familyWithFinalPath.put(familyName, new ArrayList<Pair<Path, Path>>());
|
||||||
|
}
|
||||||
|
List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
|
||||||
try {
|
try {
|
||||||
String finalPath = path;
|
String finalPath = path;
|
||||||
if (bulkLoadListener != null) {
|
if (bulkLoadListener != null) {
|
||||||
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
|
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
|
||||||
}
|
}
|
||||||
Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
|
Pair<Path, Path> pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId);
|
||||||
|
lst.add(pair);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// A failure here can cause an atomicity violation that we currently
|
||||||
|
// cannot recover from since it is likely a failed HDFS operation.
|
||||||
|
|
||||||
|
// TODO Need a better story for reverting partial failures due to HDFS.
|
||||||
|
LOG.error("There was a partial failure due to IO when attempting to" +
|
||||||
|
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
|
||||||
|
if (bulkLoadListener != null) {
|
||||||
|
try {
|
||||||
|
bulkLoadListener.failedBulkLoad(familyName, path);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Error while calling failedBulkLoad for family " +
|
||||||
|
Bytes.toString(familyName) + " with path " + path, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.getCoprocessorHost() != null) {
|
||||||
|
for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
|
||||||
|
this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
|
||||||
|
byte[] familyName = entry.getKey();
|
||||||
|
for (Pair<Path, Path> p : entry.getValue()) {
|
||||||
|
String path = p.getFirst().toString();
|
||||||
|
Path commitedStoreFile = p.getSecond();
|
||||||
|
Store store = getStore(familyName);
|
||||||
|
try {
|
||||||
|
store.bulkLoadHFile(familyName, path, commitedStoreFile);
|
||||||
// Note the size of the store file
|
// Note the size of the store file
|
||||||
try {
|
try {
|
||||||
FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
|
FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
|
||||||
|
@ -5719,7 +5757,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
// TODO Need a better story for reverting partial failures due to HDFS.
|
// TODO Need a better story for reverting partial failures due to HDFS.
|
||||||
LOG.error("There was a partial failure due to IO when attempting to" +
|
LOG.error("There was a partial failure due to IO when attempting to" +
|
||||||
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
|
" load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe);
|
||||||
if (bulkLoadListener != null) {
|
if (bulkLoadListener != null) {
|
||||||
try {
|
try {
|
||||||
bulkLoadListener.failedBulkLoad(familyName, path);
|
bulkLoadListener.failedBulkLoad(familyName, path);
|
||||||
|
@ -5731,6 +5769,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
isSuccessful = true;
|
isSuccessful = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -54,8 +52,11 @@ import org.apache.hadoop.hbase.io.Reference;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* View to an on-disk Region.
|
* View to an on-disk Region.
|
||||||
* Provides the set of methods necessary to interact with the on-disk region data.
|
* Provides the set of methods necessary to interact with the on-disk region data.
|
||||||
|
@ -396,19 +397,21 @@ public class HRegionFileSystem {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
|
public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
|
||||||
return commitStoreFile(familyName, buildPath, -1, false);
|
Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
|
||||||
|
return commitStoreFile(buildPath, dstPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the file from a build/temp location to the main family store directory.
|
* Generate the filename in the main family store directory for moving the file from a build/temp
|
||||||
|
* location.
|
||||||
* @param familyName Family that will gain the file
|
* @param familyName Family that will gain the file
|
||||||
* @param buildPath {@link Path} to the file to commit.
|
* @param buildPath {@link Path} to the file to commit.
|
||||||
* @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
|
* @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
|
||||||
* @param generateNewName False if you want to keep the buildPath name
|
* @param generateNewName False if you want to keep the buildPath name
|
||||||
* @return The new {@link Path} of the committed file
|
* @return The new {@link Path} of the to be committed file
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private Path commitStoreFile(final String familyName, final Path buildPath,
|
private Path preCommitStoreFile(final String familyName, final Path buildPath,
|
||||||
final long seqNum, final boolean generateNewName) throws IOException {
|
final long seqNum, final boolean generateNewName) throws IOException {
|
||||||
Path storeDir = getStoreDir(familyName);
|
Path storeDir = getStoreDir(familyName);
|
||||||
if(!fs.exists(storeDir) && !createDir(storeDir))
|
if(!fs.exists(storeDir) && !createDir(storeDir))
|
||||||
|
@ -425,6 +428,17 @@ public class HRegionFileSystem {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
|
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
|
||||||
}
|
}
|
||||||
|
return dstPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Moves file from staging dir to region dir
|
||||||
|
* @param buildPath {@link Path} to the file to commit.
|
||||||
|
* @param dstPath {@link Path} to the file under region dir
|
||||||
|
* @return The {@link Path} of the committed file
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
|
||||||
// buildPath exists, therefore not doing an exists() check.
|
// buildPath exists, therefore not doing an exists() check.
|
||||||
if (!rename(buildPath, dstPath)) {
|
if (!rename(buildPath, dstPath)) {
|
||||||
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
|
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
|
||||||
|
@ -432,7 +446,6 @@ public class HRegionFileSystem {
|
||||||
return dstPath;
|
return dstPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Moves multiple store files to the relative region's family store directory.
|
* Moves multiple store files to the relative region's family store directory.
|
||||||
* @param storeFiles list of store files divided by family
|
* @param storeFiles list of store files divided by family
|
||||||
|
@ -482,7 +495,7 @@ public class HRegionFileSystem {
|
||||||
* @return The destination {@link Path} of the bulk loaded file
|
* @return The destination {@link Path} of the bulk loaded file
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
|
Pair<Path, Path> bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Copy the file if it's on another filesystem
|
// Copy the file if it's on another filesystem
|
||||||
FileSystem srcFs = srcPath.getFileSystem(conf);
|
FileSystem srcFs = srcPath.getFileSystem(conf);
|
||||||
|
@ -500,7 +513,7 @@ public class HRegionFileSystem {
|
||||||
srcPath = tmpPath;
|
srcPath = tmpPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
return commitStoreFile(familyName, srcPath, seqNum, true);
|
return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===========================================================================
|
// ===========================================================================
|
||||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||||
|
@ -799,10 +800,21 @@ public class HStore implements Store {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
|
||||||
public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
|
|
||||||
Path srcPath = new Path(srcPathStr);
|
Path srcPath = new Path(srcPathStr);
|
||||||
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
|
return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
|
||||||
|
Path srcPath = new Path(srcPathStr);
|
||||||
|
try {
|
||||||
|
fs.commitStoreFile(srcPath, dstPath);
|
||||||
|
} finally {
|
||||||
|
if (this.getCoprocessorHost() != null) {
|
||||||
|
this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
|
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
|
||||||
+ dstPath + " - updating store file list.");
|
+ dstPath + " - updating store file list.");
|
||||||
|
|
|
@ -2016,12 +2016,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||||
}
|
}
|
||||||
boolean loaded = false;
|
boolean loaded = false;
|
||||||
|
try {
|
||||||
if (!bypass) {
|
if (!bypass) {
|
||||||
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
|
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
|
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
||||||
builder.setLoaded(loaded);
|
builder.setLoaded(loaded);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
|
|
|
@ -1507,6 +1507,27 @@ public class RegionCoprocessorHost
|
||||||
postWALRestore(info, (WALKey)logKey, logEdit);
|
postWALRestore(info, (WALKey)logKey, logEdit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
|
||||||
|
throws IOException {
|
||||||
|
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
|
throws IOException {
|
||||||
|
oserver.preCommitStoreFile(ctx, family, pairs);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath)
|
||||||
|
throws IOException {
|
||||||
|
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||||
|
throws IOException {
|
||||||
|
oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param familyPaths pairs of { CF, file path } submitted for bulk load
|
* @param familyPaths pairs of { CF, file path } submitted for bulk load
|
||||||
* @return true if the default operation should be bypassed
|
* @return true if the default operation should be bypassed
|
||||||
|
|
|
@ -324,10 +324,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
||||||
* This method should only be called from Region. It is assumed that the ranges of values in the
|
* This method should only be called from Region. It is assumed that the ranges of values in the
|
||||||
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
|
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
|
||||||
*
|
*
|
||||||
|
* @param family the column family
|
||||||
* @param srcPathStr
|
* @param srcPathStr
|
||||||
* @param sequenceId sequence Id associated with the HFile
|
* @param dstPath
|
||||||
*/
|
*/
|
||||||
Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
|
Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException;
|
||||||
|
|
||||||
// General accessors into the state of the store
|
// General accessors into the state of the store
|
||||||
// TODO abstract some of this out into a metrics class
|
// TODO abstract some of this out into a metrics class
|
||||||
|
|
Loading…
Reference in New Issue