HBASE-9244 Add CP hooks around StoreFileReader creation

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1524949 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2013-09-20 08:51:37 +00:00
parent 0541a8da5c
commit 1bd014c106
6 changed files with 201 additions and 5 deletions

View File

@ -22,6 +22,8 @@ import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
@ -38,6 +40,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -46,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -455,4 +462,18 @@ public abstract class BaseRegionObserver implements RegionObserver {
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException {
return reader;
}
@Override
public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException {
return reader;
}
}

View File

@ -21,6 +21,8 @@ import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HRegionInfo;
@ -36,6 +38,10 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -1002,4 +1008,47 @@ public interface RegionObserver extends Coprocessor {
*/
boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
/**
* Called before creation of Reader for a store file.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
*
* @param ctx the environment provided by the region server
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param preferredEncodingInCache
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader, if not {@code null}, from previous RegionObserver in the chain
* @return a Reader instance to use instead of the base reader if overriding
* default behavior, null otherwise
* @throws IOException
*/
StoreFile.Reader preStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache,
final Reference r, StoreFile.Reader reader) throws IOException;
/**
* Called after the creation of Reader for a store file.
*
* @param ctx the environment provided by the region server
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param preferredEncodingInCache
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
* @throws IOException
*/
StoreFile.Reader postStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache,
final Reference r, StoreFile.Reader reader) throws IOException;
}

View File

@ -474,7 +474,9 @@ public class HStore implements Store {
}
private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException {
StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
this.family.getBloomFilterType(), encoder);
storeFile.createReader();
return storeFile;

View File

@ -33,6 +33,7 @@ import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
@ -56,7 +57,11 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -1636,4 +1641,71 @@ public class RegionCoprocessorHost
return hasLoaded;
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param preferredEncodingInCache
* @param r original reference file. This will be not null only when reading a split file.
* @return a Reader instance to use instead of the base reader if overriding
* default behavior, null otherwise
* @throws IOException
*/
public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf,
final DataBlockEncoding preferredEncodingInCache, final Reference r) throws IOException {
StoreFile.Reader reader = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p, in,
size, cacheConf, preferredEncodingInCache, r, reader);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return reader;
}
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param preferredEncodingInCache
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
* @throws IOException
*/
public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf,
final DataBlockEncoding preferredEncodingInCache, final Reference r, StoreFile.Reader reader)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs, p, in,
size, cacheConf, preferredEncodingInCache, r, reader);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return reader;
}
}

View File

@ -79,6 +79,8 @@ public class StoreFileInfo {
// FileSystem information for the file.
private final FileStatus fileStatus;
private RegionCoprocessorHost coprocessorHost;
/**
* Create a Store File Info
* @param conf the {@link Configuration} to use
@ -126,6 +128,14 @@ public class StoreFileInfo {
}
}
/**
* Sets the region coprocessor env.
* @param coprocessorHost
*/
public void setRegionCoprocessorHost(RegionCoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;
}
/*
* @return the Reference object associated to this StoreFileInfo.
* null if the StoreFile is not a reference.
@ -182,12 +192,27 @@ public class StoreFileInfo {
long length = status.getLen();
if (this.reference != null) {
hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
return new HalfStoreFileReader(
fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding);
} else {
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
}
StoreFile.Reader reader = null;
if (this.coprocessorHost != null) {
reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
cacheConf, dataBlockEncoding, reference);
}
if (reader == null) {
if (this.reference != null) {
reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
dataBlockEncoding);
} else {
reader = new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
}
}
if (this.coprocessorHost != null) {
reader = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(), in, length,
cacheConf, dataBlockEncoding, reference, reader);
}
return reader;
}
/**

View File

@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@ -47,6 +49,10 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -56,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@ -110,7 +117,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
final AtomicInteger ctPreSplitBeforePONR = new AtomicInteger(0);
final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
@ -544,6 +552,21 @@ public class SimpleRegionObserver extends BaseRegionObserver {
ctPostWALRestore.incrementAndGet();
}
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException {
ctPreStoreFileReaderOpen.incrementAndGet();
return null;
}
@Override
public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException {
ctPostStoreFileReaderOpen.incrementAndGet();
return reader;
}
public boolean hadPreGet() {
return ctPreGet.get() > 0;
@ -725,4 +748,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public int getCtPostWALRestore() {
return ctPostWALRestore.get();
}
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
}