HBASE-6224. Add pre and post coprocessor hooks for bulk load (Francis Liu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1353057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2012-06-23 00:54:11 +00:00
parent 0f558bb126
commit 22ca8dc565
8 changed files with 214 additions and 7 deletions

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException; import java.io.IOException;
@ -279,4 +280,15 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
} }
@Override
public void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException {
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Coprocessors implement this interface to observe and mediate client actions * Coprocessors implement this interface to observe and mediate client actions
@ -655,4 +656,28 @@ public interface RegionObserver extends Coprocessor {
*/ */
void postWALRestore(final ObserverContext<RegionCoprocessorEnvironment> ctx, void postWALRestore(final ObserverContext<RegionCoprocessorEnvironment> ctx,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
/**
* Called before bulkLoadHFile. Users can create a StoreFile instance to
* access the contents of a HFile.
*
* @param ctx
* @param familyPaths pairs of { CF, HFile path } submitted for bulk load. Adding
* or removing from this list will add or remove HFiles to be bulk loaded.
* @throws IOException
*/
void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException;
/**
* Called after bulkLoadHFile.
*
* @param ctx
* @param familyPaths pairs of { CF, HFile path } submitted for bulk load
* @param hasLoaded whether the bulkLoad was successful
* @return the new value of hasLoaded
* @throws IOException
*/
boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
} }

View File

@ -3122,10 +3122,20 @@ public class HRegionServer implements ClientProtocol,
HRegion region = getRegion(request.getRegion()); HRegion region = getRegion(request.getRegion());
List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(); List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
for (FamilyPath familyPath: request.getFamilyPathList()) { for (FamilyPath familyPath: request.getFamilyPathList()) {
familyPaths.add(new Pair<byte[], String>( familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
familyPath.getFamily().toByteArray(), familyPath.getPath())); familyPath.getPath()));
}
boolean bypass = false;
if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
if (!bypass) {
loaded = region.bulkLoadHFiles(familyPaths);
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
} }
boolean loaded = region.bulkLoadHFiles(familyPaths);
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(loaded); builder.setLoaded(loaded);
return builder.build(); return builder.build();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import java.io.IOException; import java.io.IOException;
@ -1267,4 +1268,58 @@ public class RegionCoprocessorHost
} }
} }
} }
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @return true if the default operation should be bypassed
* @throws IOException
*/
public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
boolean bypass = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass;
}
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @param hasLoaded whether load was successful or not
* @return the possibly modified value of hasLoaded
* @throws IOException
*/
public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
familyPaths, hasLoaded);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return hasLoaded;
}
} }

View File

@ -88,7 +88,7 @@ import com.google.common.collect.Ordering;
* The reason for this weird pattern where you use a different instance for the * The reason for this weird pattern where you use a different instance for the
* writer and a reader is that we write once but read a lot more. * writer and a reader is that we write once but read a lot more.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("Coprocessor")
public class StoreFile extends SchemaConfigured { public class StoreFile extends SchemaConfigured {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
@ -233,7 +233,7 @@ public class StoreFile extends SchemaConfigured {
* @param dataBlockEncoder data block encoding algorithm. * @param dataBlockEncoder data block encoding algorithm.
* @throws IOException When opening the reader fails. * @throws IOException When opening the reader fails.
*/ */
StoreFile(final FileSystem fs, public StoreFile(final FileSystem fs,
final Path p, final Path p,
final Configuration conf, final Configuration conf,
final CacheConfig cacheConf, final CacheConfig cacheConf,

View File

@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
* KeyValueScanner adaptor over the Reader. It also provides hooks into * KeyValueScanner adaptor over the Reader. It also provides hooks into
* bloom filter things. * bloom filter things.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("Coprocessor")
class StoreFileScanner implements KeyValueScanner { public class StoreFileScanner implements KeyValueScanner {
static final Log LOG = LogFactory.getLog(Store.class); static final Log LOG = LogFactory.getLog(Store.class);
// the reader it comes from: // the reader it comes from:

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* A sample region observer that tests the RegionObserver interface. * A sample region observer that tests the RegionObserver interface.
@ -85,6 +88,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
boolean hadPostScannerClose = false; boolean hadPostScannerClose = false;
boolean hadPreScannerOpen = false; boolean hadPreScannerOpen = false;
boolean hadPostScannerOpen = false; boolean hadPostScannerOpen = false;
boolean hadPreBulkLoadHFile = false;
boolean hadPostBulkLoadHFile = false;
@Override @Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) { public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@ -384,6 +389,43 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return result; return result;
} }
@Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
assertNotNull(familyPaths);
assertEquals(1,familyPaths.size());
assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
String familyPath = familyPaths.get(0).getSecond();
String familyName = Bytes.toString(TestRegionObserverInterface.A);
assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
}
hadPreBulkLoadHFile = true;
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
assertNotNull(familyPaths);
assertEquals(1,familyPaths.size());
assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
String familyPath = familyPaths.get(0).getSecond();
String familyName = Bytes.toString(TestRegionObserverInterface.A);
assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
}
hadPostBulkLoadHFile = true;
return hasLoaded;
}
public boolean hadPreGet() { public boolean hadPreGet() {
return hadPreGet; return hadPreGet;
} }
@ -430,4 +472,12 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public boolean hadDeleted() { public boolean hadDeleted() {
return hadPreDeleted && hadPostDeleted; return hadPreDeleted && hadPostDeleted;
} }
public boolean hadPostBulkLoadHFile() {
return hadPostBulkLoadHFile;
}
public boolean hadPreBulkLoadHFile() {
return hadPreBulkLoadHFile;
}
} }

View File

@ -34,6 +34,8 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -52,6 +54,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -447,6 +452,37 @@ public class TestRegionObserverInterface {
table.close(); table.close();
} }
@Test
public void bulkLoadHFileTest() throws Exception {
String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
byte[] tableName = TEST_TABLE;
Configuration conf = util.getConfiguration();
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {false, false}
);
FileSystem fs = util.getTestFileSystem();
final Path dir = util.getDataTestDir(testName).makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(A));
createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
//Bulk load
new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
tableName,
new Boolean[] {true, true}
);
util.deleteTable(tableName);
table.close();
}
// check each region whether the coprocessor upcalls are called or not. // check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class c, String methodName[], byte[] tableName, private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
Object value[]) throws IOException { Object value[]) throws IOException {
@ -475,6 +511,25 @@ public class TestRegionObserverInterface {
} }
} }
private static void createHFile(
Configuration conf,
FileSystem fs, Path path,
byte[] family, byte[] qualifier) throws IOException {
HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
.withPath(fs, path)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
long now = System.currentTimeMillis();
try {
for (int i =1;i<=9;i++) {
KeyValue kv = new KeyValue(Bytes.toBytes(i+""), family, qualifier, now, Bytes.toBytes(i+""));
writer.append(kv);
}
} finally {
writer.close();
}
}
private static byte [][] makeN(byte [] base, int n) { private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][]; byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) { for(int i=0;i<n;i++) {