diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index a32b26991c7..0d94496ed80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Pair; import java.io.IOException; @@ -279,4 +280,15 @@ public abstract class BaseRegionObserver implements RegionObserver { public void postWALRestore(ObserverContext env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { } + + @Override + public void preBulkLoadHFile(final ObserverContext ctx, + List> familyPaths) throws IOException { + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> familyPaths, boolean hasLoaded) throws IOException { + return hasLoaded; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 30c397cac98..5f90713b38f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.util.Pair; /** * Coprocessors implement this interface to observe and mediate client actions @@ -655,4 +656,28 @@ public interface RegionObserver extends Coprocessor { */ void postWALRestore(final ObserverContext ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called before bulkLoadHFile. Users can create a StoreFile instance to + * access the contents of a HFile. + * + * @param ctx + * @param familyPaths pairs of { CF, HFile path } submitted for bulk load. Adding + * or removing from this list will add or remove HFiles to be bulk loaded. + * @throws IOException + */ + void preBulkLoadHFile(final ObserverContext ctx, + List> familyPaths) throws IOException; + + /** + * Called after bulkLoadHFile. + * + * @param ctx + * @param familyPaths pairs of { CF, HFile path } submitted for bulk load + * @param hasLoaded whether the bulkLoad was successful + * @return the new value of hasLoaded + * @throws IOException + */ + boolean postBulkLoadHFile(final ObserverContext ctx, + List> familyPaths, boolean hasLoaded) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a548e4ed6e1..2242d1fb6a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3122,10 +3122,20 @@ public class HRegionServer implements ClientProtocol, HRegion region = getRegion(request.getRegion()); List> familyPaths = new ArrayList>(); for (FamilyPath familyPath: request.getFamilyPathList()) { - familyPaths.add(new Pair( - familyPath.getFamily().toByteArray(), familyPath.getPath())); + familyPaths.add(new Pair(familyPath.getFamily().toByteArray(), + familyPath.getPath())); + } + boolean bypass = false; + if (region.getCoprocessorHost() != null) { + bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + } + boolean loaded = false; + if (!bypass) { + loaded = region.bulkLoadHFiles(familyPaths); + } + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); } - boolean loaded = region.bulkLoadHFiles(familyPaths); BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(loaded); return builder.build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 44db31db394..a550d139174 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import java.io.IOException; @@ -1267,4 +1268,58 @@ public class RegionCoprocessorHost } } } + + /** + * @param familyPaths pairs of { CF, file path } submitted for bulk load + * @return true if the default operation should be bypassed + * @throws IOException + */ + public boolean preBulkLoadHFile(List> familyPaths) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + + return bypass; + } + + /** + * @param familyPaths pairs of { CF, file path } submitted for bulk load + * @param hasLoaded whether load was successful or not + * @return the possibly modified value of hasLoaded + * @throws IOException + */ + public boolean postBulkLoadHFile(List> familyPaths, boolean hasLoaded) + throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx, + familyPaths, hasLoaded); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + + return hasLoaded; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 53f643be063..253b445a1d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -88,7 +88,7 @@ import com.google.common.collect.Ordering; * The reason for this weird pattern where you use a different instance for the * writer and a reader is that we write once but read a lot more. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("Coprocessor") public class StoreFile extends SchemaConfigured { static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); @@ -233,7 +233,7 @@ public class StoreFile extends SchemaConfigured { * @param dataBlockEncoder data block encoding algorithm. * @throws IOException When opening the reader fails. */ - StoreFile(final FileSystem fs, + public StoreFile(final FileSystem fs, final Path p, final Configuration conf, final CacheConfig cacheConf, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 6cb6640b253..5682d940e23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; * KeyValueScanner adaptor over the Reader. It also provides hooks into * bloom filter things. */ -@InterfaceAudience.Private -class StoreFileScanner implements KeyValueScanner { +@InterfaceAudience.LimitedPrivate("Coprocessor") +public class StoreFileScanner implements KeyValueScanner { static final Log LOG = LogFactory.getLog(Store.class); // the reader it comes from: diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index dacb9361cb3..a691bacc436 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.coprocessor; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * A sample region observer that tests the RegionObserver interface. @@ -85,6 +88,8 @@ public class SimpleRegionObserver extends BaseRegionObserver { boolean hadPostScannerClose = false; boolean hadPreScannerOpen = false; boolean hadPostScannerOpen = false; + boolean hadPreBulkLoadHFile = false; + boolean hadPostBulkLoadHFile = false; @Override public void preOpen(ObserverContext c) { @@ -384,6 +389,43 @@ public class SimpleRegionObserver extends BaseRegionObserver { return result; } + @Override + public void preBulkLoadHFile(ObserverContext ctx, + List> familyPaths) throws IOException { + RegionCoprocessorEnvironment e = ctx.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + if (Arrays.equals(e.getRegion().getTableDesc().getName(), + TestRegionObserverInterface.TEST_TABLE)) { + assertNotNull(familyPaths); + assertEquals(1,familyPaths.size()); + assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A); + String familyPath = familyPaths.get(0).getSecond(); + String familyName = Bytes.toString(TestRegionObserverInterface.A); + assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName); + } + hadPreBulkLoadHFile = true; + } + + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> familyPaths, boolean hasLoaded) throws IOException { + RegionCoprocessorEnvironment e = ctx.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + if (Arrays.equals(e.getRegion().getTableDesc().getName(), + TestRegionObserverInterface.TEST_TABLE)) { + assertNotNull(familyPaths); + assertEquals(1,familyPaths.size()); + assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A); + String familyPath = familyPaths.get(0).getSecond(); + String familyName = Bytes.toString(TestRegionObserverInterface.A); + assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName); + } + hadPostBulkLoadHFile = true; + return hasLoaded; + } + public boolean hadPreGet() { return hadPreGet; } @@ -430,4 +472,12 @@ public class SimpleRegionObserver extends BaseRegionObserver { public boolean hadDeleted() { return hadPreDeleted && hadPostDeleted; } + + public boolean hadPostBulkLoadHFile() { + return hadPostBulkLoadHFile; + } + + public boolean hadPreBulkLoadHFile() { + return hadPreBulkLoadHFile; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 69e88374d3b..91deda489a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -34,6 +34,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -52,6 +54,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -447,6 +452,37 @@ public class TestRegionObserverInterface { table.close(); } + @Test + public void bulkLoadHFileTest() throws Exception { + String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; + byte[] tableName = TEST_TABLE; + Configuration conf = util.getConfiguration(); + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, + tableName, + new Boolean[] {false, false} + ); + + FileSystem fs = util.getTestFileSystem(); + final Path dir = util.getDataTestDir(testName).makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(A)); + + createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); + + //Bulk load + new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, + tableName, + new Boolean[] {true, true} + ); + util.deleteTable(tableName); + table.close(); + } + // check each region whether the coprocessor upcalls are called or not. private void verifyMethodResult(Class c, String methodName[], byte[] tableName, Object value[]) throws IOException { @@ -475,6 +511,25 @@ public class TestRegionObserverInterface { } } + private static void createHFile( + Configuration conf, + FileSystem fs, Path path, + byte[] family, byte[] qualifier) throws IOException { + HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)) + .withPath(fs, path) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + long now = System.currentTimeMillis(); + try { + for (int i =1;i<=9;i++) { + KeyValue kv = new KeyValue(Bytes.toBytes(i+""), family, qualifier, now, Bytes.toBytes(i+"")); + writer.append(kv); + } + } finally { + writer.close(); + } + } + private static byte [][] makeN(byte [] base, int n) { byte [][] ret = new byte[n][]; for(int i=0;i