From 768b4119dae6069d4dea7f13018d89aeb7017ce4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 20 Nov 2016 09:26:42 -0800 Subject: [PATCH] HBASE-17123 Add postBulkLoadHFile variant that notifies the final paths for the hfiles --- .../access/SecureBulkLoadEndpoint.java | 7 +++- .../hbase/coprocessor/BaseRegionObserver.java | 10 +++++- .../hbase/coprocessor/RegionObserver.java | 33 +++++++++++++++---- .../hadoop/hbase/regionserver/HRegion.java | 10 +++--- .../hbase/regionserver/RSRpcServices.java | 14 ++++++-- .../hadoop/hbase/regionserver/Region.java | 13 ++++---- .../regionserver/RegionCoprocessorHost.java | 6 ++-- .../regionserver/SecureBulkLoadManager.java | 17 ++++++---- .../coprocessor/SimpleRegionObserver.java | 3 +- .../regionserver/TestMobStoreCompaction.java | 4 +-- 10 files changed, 83 insertions(+), 34 deletions(-) diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 1849d90cd8a..9bc592fc8fd 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.security.access; import java.io.IOException; +import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -128,12 +131,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request, RpcCallback done) { boolean loaded = false; + Map> map = null; try { SecureBulkLoadManager secureBulkLoadManager = this.env.getRegionServerServices().getSecureBulkLoadManager(); BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request); - loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), + map = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), convert(bulkLoadHFileRequest)); + loaded = map != null && !map.isEmpty(); } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 9f033c0a0f0..3333e77127e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import org.apache.hadoop.fs.FileSystem; @@ -511,7 +512,14 @@ public class BaseRegionObserver implements RegionObserver { @Override public boolean postBulkLoadHFile(ObserverContext ctx, - List> familyPaths, boolean hasLoaded) throws IOException { + List> stagingFamilyPaths, Map> finalPaths, + boolean hasLoaded) throws IOException { + return hasLoaded; + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 586545c7436..d9a509c5f0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -21,16 +21,17 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -45,11 +46,12 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.OperationStatus; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -58,9 +60,9 @@ import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALKey; import com.google.common.collect.ImmutableList; @@ -1386,13 +1388,30 @@ public interface RegionObserver extends Coprocessor { * Called after bulkLoadHFile. * * @param ctx - * @param familyPaths pairs of { CF, HFile path } submitted for bulk load + * @param stagingFamilyPaths pairs of { CF, HFile path } submitted for bulk load + * @param finalPaths Map of CF to List of file paths for the final loaded files * @param hasLoaded whether the bulkLoad was successful * @return the new value of hasLoaded * @throws IOException */ + default boolean postBulkLoadHFile(final ObserverContext ctx, + List> stagingFamilyPaths, Map> finalPaths, + boolean hasLoaded) throws IOException { + return postBulkLoadHFile(ctx, stagingFamilyPaths, hasLoaded); + } + + /** + * Called after bulkLoadHFile. + * + * @param ctx + * @param stagingFamilyPaths pairs of { CF, HFile path } submitted for bulk load + * @param hasLoaded whether the bulkLoad was successful + * @return the new value of hasLoaded + * @throws IOException + * @deprecated Use {@link #postBulkLoadHFile(ObserverContext, List, Map, boolean)} + */ boolean postBulkLoadHFile(final ObserverContext ctx, - List> familyPaths, boolean hasLoaded) throws IOException; + List> stagingFamilyPaths, boolean hasLoaded) throws IOException; /** * Called before creation of Reader for a store file. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c372faa05dc..64156724717 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5470,14 +5470,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); } @Override - public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { + public Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap(); @@ -5532,7 +5532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // problem when validating LOG.warn("There was a recoverable bulk load failure likely due to a" + " split. These (family, HFile) pairs were not loaded: " + list); - return isSuccessful; + return null; } // We need to assign a sequential ID that's in between two memstores in order to preserve @@ -5626,7 +5626,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi closeBulkRegionOperation(); } - return isSuccessful; + return isSuccessful ? storeFiles : null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index cd62115c0df..f3f24b91d22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; @@ -2142,6 +2143,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Region region = getRegion(request.getRegion()); boolean bypass = false; boolean loaded = false; + Map> map = null; if (!request.hasBulkToken()) { // Old style bulk load. This will not be supported in future releases @@ -2155,17 +2157,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, request.getCopyFile()); + if (map != null) { + loaded = true; + } } if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } } else { // secure bulk load - loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); + map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); + if (map != null) { + loaded = true; + } builder.setLoaded(loaded); return builder.build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 1b106b280a6..09016e81111 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -558,11 +559,11 @@ public interface Region extends ConfigurationObserver { * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param assignSeqId - * @return true if successful, false if failed recoverably + * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ - boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener) throws IOException; + Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException; /** * Attempts to atomically load a group of hfiles. This is critical for loading @@ -573,11 +574,11 @@ public interface Region extends ConfigurationObserver { * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param copyFile always copy hfiles if true - * @return true if successful, false if failed recoverably + * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ - boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; + Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; /////////////////////////////////////////////////////////////////////////// // Coprocessors diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index c7e35984c92..f6388fa4d77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -1549,18 +1550,19 @@ public class RegionCoprocessorHost /** * @param familyPaths pairs of { CF, file path } submitted for bulk load + * @param map Map of CF to List of file paths for the final loaded files * @param hasLoaded whether load was successful or not * @return the possibly modified value of hasLoaded * @throws IOException */ public boolean postBulkLoadHFile(final List> familyPaths, - boolean hasLoaded) throws IOException { + Map> map, boolean hasLoaded) throws IOException { return execOperationWithResult(hasLoaded, coprocessors.isEmpty() ? null : new RegionOperationWithResult() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { - setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult())); + setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult())); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index e84ca40651a..8b44f22b37d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -171,7 +171,7 @@ public class SecureBulkLoadManager { fs.delete(new Path(request.getBulkToken()), true); } - public boolean secureBulkLoadHFiles(final Region region, + public Map> secureBulkLoadHFiles(final Region region, final BulkLoadHFileRequest request) throws IOException { final List> familyPaths = new ArrayList>(request.getFamilyPathCount()); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { @@ -200,6 +200,8 @@ public class SecureBulkLoadManager { bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } boolean loaded = false; + Map> map = null; + if (!bypass) { // Get the target fs (HBase region server fs) delegation token // Since we have checked the permission via 'preBulkLoadHFile', now let's give @@ -217,9 +219,9 @@ public class SecureBulkLoadManager { } } - loaded = ugi.doAs(new PrivilegedAction() { + map = ugi.doAs(new PrivilegedAction>>() { @Override - public Boolean run() { + public Map> run() { FileSystem fs = null; try { fs = FileSystem.get(conf); @@ -237,14 +239,17 @@ public class SecureBulkLoadManager { } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } - return false; + return null; } }); + if (map != null) { + loaded = true; + } } if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } - return loaded; + return map; } private List getBulkLoadObservers(Region region) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index dfd41a8750b..2ea343aed16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -635,7 +635,8 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public boolean postBulkLoadHFile(ObserverContext ctx, - List> familyPaths, boolean hasLoaded) throws IOException { + List> familyPaths, Map> map, boolean hasLoaded) + throws IOException { RegionCoprocessorEnvironment e = ctx.getEnvironment(); assertNotNull(e); assertNotNull(e.getRegion()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java index 6f7154802b6..2e7735b29ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java @@ -196,8 +196,8 @@ public class TestMobStoreCompaction { // The following will bulk load the above generated store files and compact, with 600(fileSize) // > 300(threshold) - boolean result = region.bulkLoadHFiles(hfiles, true, null); - assertTrue("Bulkload result:", result); + Map> map = region.bulkLoadHFiles(hfiles, true, null); + assertTrue("Bulkload result:", !map.isEmpty()); assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); assertEquals("Before compaction: mob file count", 0, countMobFiles()); assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));