diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index a0a91bc4ff2..d6afec055eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -496,6 +496,16 @@ public class BaseRegionObserver implements RegionObserver { List> familyPaths) throws IOException { } + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + } + + @Override + public void postCommitStoreFile(final ObserverContext ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException { + } + @Override public boolean postBulkLoadHFile(ObserverContext ctx, List> familyPaths, boolean hasLoaded) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 7cd1be53e04..ad562314a75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1238,6 +1238,29 @@ public interface RegionObserver extends Coprocessor { void preBulkLoadHFile(final ObserverContext ctx, List> familyPaths) throws IOException; + /** + * Called before moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir } + * Each pair are for the same hfile. + * @throws IOException + */ + void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException; + + /** + * Called after moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param srcPath Path to file before the move + * @param dstPath Path to file after the move + */ + void postCommitStoreFile(final ObserverContext ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException; + /** * Called after bulkLoadHFile. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3930d7138dd..730a7f30f79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5682,37 +5682,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + Map>> familyWithFinalPath = + new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Pair p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); Store store = getStore(familyName); + if (!familyWithFinalPath.containsKey(familyName)) { + familyWithFinalPath.put(familyName, new ArrayList>()); + } + List> lst = familyWithFinalPath.get(familyName); try { String finalPath = path; if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); - - // Note the size of the store file - try { - FileSystem fs = commitedStoreFile.getFileSystem(baseConf); - storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) - .getLen()); - } catch (IOException e) { - LOG.warn("Failed to find the size of hfile " + commitedStoreFile); - storeFilesSizes.put(commitedStoreFile.getName(), 0L); - } - - if(storeFiles.containsKey(familyName)) { - storeFiles.get(familyName).add(commitedStoreFile); - } else { - List storeFileNames = new ArrayList(); - storeFileNames.add(commitedStoreFile); - storeFiles.put(familyName, storeFileNames); - } - if (bulkLoadListener != null) { - bulkLoadListener.doneBulkLoad(familyName, path); - } + Pair pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId); + lst.add(pair); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently // cannot recover from since it is likely a failed HDFS operation. @@ -5732,6 +5718,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + if (this.getCoprocessorHost() != null) { + for (Map.Entry>> entry : familyWithFinalPath.entrySet()) { + this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue()); + } + } + for (Map.Entry>> entry : familyWithFinalPath.entrySet()) { + byte[] familyName = entry.getKey(); + for (Pair p : entry.getValue()) { + String path = p.getFirst().toString(); + Path commitedStoreFile = p.getSecond(); + Store store = getStore(familyName); + try { + store.bulkLoadHFile(familyName, path, commitedStoreFile); + // Note the size of the store file + try { + FileSystem fs = commitedStoreFile.getFileSystem(baseConf); + storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) + .getLen()); + } catch (IOException e) { + LOG.warn("Failed to find the size of hfile " + commitedStoreFile); + storeFilesSizes.put(commitedStoreFile.getName(), 0L); + } + + if(storeFiles.containsKey(familyName)) { + storeFiles.get(familyName).add(commitedStoreFile); + } else { + List storeFileNames = new ArrayList(); + storeFileNames.add(commitedStoreFile); + storeFiles.put(familyName, storeFileNames); + } + if (bulkLoadListener != null) { + bulkLoadListener.doneBulkLoad(familyName, path); + } + } catch (IOException ioe) { + // A failure here can cause an atomicity violation that we currently + // cannot recover from since it is likely a failed HDFS operation. + + // TODO Need a better story for reverting partial failures due to HDFS. + LOG.error("There was a partial failure due to IO when attempting to" + + " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe); + if (bulkLoadListener != null) { + try { + bulkLoadListener.failedBulkLoad(familyName, path); + } catch (Exception ex) { + LOG.error("Error while calling failedBulkLoad for family " + + Bytes.toString(familyName) + " with path " + path, ex); + } + } + throw ioe; + } + } + } + isSuccessful = true; } finally { if (wal != null && !storeFiles.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index b5ffb197a1f..2a37315abfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.Lists; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -54,8 +52,11 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import com.google.common.collect.Lists; + /** * View to an on-disk Region. * Provides the set of methods necessary to interact with the on-disk region data. @@ -396,19 +397,21 @@ public class HRegionFileSystem { * @throws IOException */ public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException { - return commitStoreFile(familyName, buildPath, -1, false); + Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false); + return commitStoreFile(buildPath, dstPath); } /** - * Move the file from a build/temp location to the main family store directory. + * Generate the filename in the main family store directory for moving the file from a build/temp + * location. * @param familyName Family that will gain the file * @param buildPath {@link Path} to the file to commit. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) * @param generateNewName False if you want to keep the buildPath name - * @return The new {@link Path} of the committed file + * @return The new {@link Path} of the to be committed file * @throws IOException */ - private Path commitStoreFile(final String familyName, final Path buildPath, + private Path preCommitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { Path storeDir = getStoreDir(familyName); if(!fs.exists(storeDir) && !createDir(storeDir)) @@ -425,6 +428,17 @@ public class HRegionFileSystem { if (LOG.isDebugEnabled()) { LOG.debug("Committing store file " + buildPath + " as " + dstPath); } + return dstPath; + } + + /* + * Moves file from staging dir to region dir + * @param buildPath {@link Path} to the file to commit. + * @param dstPath {@link Path} to the file under region dir + * @return The {@link Path} of the committed file + * @throws IOException + */ + Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException { // buildPath exists, therefore not doing an exists() check. if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); @@ -432,7 +446,6 @@ public class HRegionFileSystem { return dstPath; } - /** * Moves multiple store files to the relative region's family store directory. * @param storeFiles list of store files divided by family @@ -482,7 +495,7 @@ public class HRegionFileSystem { * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ - Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) + Pair bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); @@ -500,7 +513,7 @@ public class HRegionFileSystem { srcPath = tmpPath; } - return commitStoreFile(familyName, srcPath, seqNum, true); + return new Pair<>(srcPath, preCommitStoreFile(familyName, srcPath, seqNum, true)); } // =========================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 6f47b0e9c66..a15bf13db88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -799,10 +800,21 @@ public class HStore implements Store { } } - @Override - public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Pair preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { Path srcPath = new Path(srcPathStr); - Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + } + + @Override + public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { + Path srcPath = new Path(srcPathStr); + try { + fs.commitStoreFile(srcPath, dstPath); + } finally { + if (this.getCoprocessorHost() != null) { + this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); + } + } LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fc77c2c15f6..ba7beb0993a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2016,11 +2016,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } boolean loaded = false; - if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); - } - if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + try { + if (!bypass) { + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + } + } finally { + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + } } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(loaded); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index c564091cd19..8adc3a44b3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1507,6 +1507,27 @@ public class RegionCoprocessorHost postWALRestore(info, (WALKey)logKey, logEdit); } + public boolean preCommitStoreFile(final byte[] family, final List> pairs) + throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCommitStoreFile(ctx, family, pairs); + } + }); + } + public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCommitStoreFile(ctx, family, srcPath, dstPath); + } + }); + } + /** * @param familyPaths pairs of { CF, file path } submitted for bulk load * @return true if the default operation should be bypassed diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index c78e0e96c87..ccdc5239ba3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -324,10 +324,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * This method should only be called from Region. It is assumed that the ranges of values in the * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) * + * @param family the column family * @param srcPathStr - * @param sequenceId sequence Id associated with the HFile + * @param dstPath */ - Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; + Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException; // General accessors into the state of the store // TODO abstract some of this out into a metrics class