HBASE-10352 Region and RegionServer changes for opening region replicas, and refreshing store files

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1571865 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2014-02-25 23:31:30 +00:00
parent 72355a920a
commit d7d9f8db62
17 changed files with 1181 additions and 71 deletions

View File

@ -1220,6 +1220,19 @@ possible configurations would overwhelm and obscure the important.
<value>org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager</value> <value>org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager</value>
<description>Fully qualified name of class implementing coordinated state manager.</description> <description>Fully qualified name of class implementing coordinated state manager.</description>
</property> </property>
<property>
<name>hbase.regionserver.storefile.refresh.period</name>
<value>0</value>
<description>
The period (in milliseconds) for refreshing the store files for the secondary regions. 0
means this feature is disabled. Secondary regions sees new files (from flushes and
compactions) from primary once the secondary region refreshes the list of files in the
region (there is no notification mechanism). But too frequent refreshes might cause
extra Namenode pressure. If the files cannot be refreshed for longer than HFile TTL
(hbase.master.hfilecleaner.ttl) the requests are rejected. Configuring HFile TTL to a larger
value is also recommended with this setting.
</description>
</property>
<property> <property>
<name>hbase.http.filter.initializers</name> <name>hbase.http.filter.initializers</name>
<value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value> <value>org.apache.hadoop.hbase.http.lib.StaticUserWebFilter</value>

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -63,10 +64,11 @@ public class ModifyTableHandler extends TableEventHandler {
super.prepareWithTableLock(); super.prepareWithTableLock();
// Check operation is possible on the table in its current state // Check operation is possible on the table in its current state
// Also checks whether the table exists // Also checks whether the table exists
if (masterServices.getAssignmentManager().getZKTable().isEnabledTable(this.htd.getTableName()) if (masterServices.getAssignmentManager().getTableStateManager()
.isTableState(this.htd.getTableName(), ZooKeeperProtos.Table.State.ENABLED)
&& this.htd.getRegionReplication() != getTableDescriptor().getRegionReplication()) { && this.htd.getRegionReplication() != getTableDescriptor().getRegionReplication()) {
throw new IOException("REGION_REPLICATION change is not supported for enabled tables"); throw new IOException("REGION_REPLICATION change is not supported for enabled tables");
} }
} }
@Override @Override

View File

@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -220,7 +221,7 @@ public class HRegion implements HeapSize { // , Writable{
* Its default value is -1L. This default is used as a marker to indicate * Its default value is -1L. This default is used as a marker to indicate
* that the region hasn't opened yet. Once it is opened, it is set to the derived * that the region hasn't opened yet. Once it is opened, it is set to the derived
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
* *
* <p>Control of this sequence is handed off to the WAL/HLog implementation. It is responsible * <p>Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
* for tagging edits with the correct sequence id since it is responsible for getting the * for tagging edits with the correct sequence id since it is responsible for getting the
* edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
@ -375,6 +376,9 @@ public class HRegion implements HeapSize { // , Writable{
volatile boolean writesEnabled = true; volatile boolean writesEnabled = true;
// Set if region is read-only // Set if region is read-only
volatile boolean readOnly = false; volatile boolean readOnly = false;
// whether the reads are enabled. This is different than readOnly, because readOnly is
// static in the lifetime of the region, while readsEnabled is dynamic
volatile boolean readsEnabled = true;
/** /**
* Set flags that make this region read-only. * Set flags that make this region read-only.
@ -394,6 +398,10 @@ public class HRegion implements HeapSize { // , Writable{
return this.flushRequested; return this.flushRequested;
} }
void setReadsEnabled(boolean readsEnabled) {
this.readsEnabled = readsEnabled;
}
static final long HEAP_SIZE = ClassSize.align( static final long HEAP_SIZE = ClassSize.align(
ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
} }
@ -727,7 +735,7 @@ public class HRegion implements HeapSize { // , Writable{
fs.cleanupAnySplitDetritus(); fs.cleanupAnySplitDetritus();
fs.cleanupMergesDir(); fs.cleanupMergesDir();
this.writestate.setReadOnly(this.htableDescriptor.isReadOnly()); this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
this.writestate.flushRequested = false; this.writestate.flushRequested = false;
this.writestate.compacting = 0; this.writestate.compacting = 0;
@ -2156,6 +2164,7 @@ public class HRegion implements HeapSize { // , Writable{
this.nonce = nonce; this.nonce = nonce;
} }
@Override
public Mutation getMutation(int index) { public Mutation getMutation(int index) {
return this.operations[index]; return this.operations[index];
} }
@ -2485,7 +2494,7 @@ public class HRegion implements HeapSize { // , Writable{
// Acquire the latest mvcc number // Acquire the latest mvcc number
// ---------------------------------- // ----------------------------------
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// calling the pre CP hook for batch mutation // calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) { if (!isInReplay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp = MiniBatchOperationInProgress<Mutation> miniBatchOp =
@ -2547,7 +2556,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
// txid should always increase, so having the one from the last call is ok. // txid should always increase, so having the one from the last call is ok.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(), this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce); currentNonceGroup, currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null); walEdit, getSequenceId(), true, null);
@ -2574,7 +2583,7 @@ public class HRegion implements HeapSize { // , Writable{
Mutation mutation = batchOp.getMutation(firstIndex); Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) { if (walEdit.size() > 0) {
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce); mutation.getClusterIds(), currentNonceGroup, currentNonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells); getSequenceId(), true, memstoreCells);
@ -2599,7 +2608,7 @@ public class HRegion implements HeapSize { // , Writable{
if (txid != 0) { if (txid != 0) {
syncOrDefer(txid, durability); syncOrDefer(txid, durability);
} }
doRollBackMemstore = false; doRollBackMemstore = false;
// calling the post CP hook for batch mutation // calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) { if (!isInReplay && coprocessorHost != null) {
@ -2741,7 +2750,7 @@ public class HRegion implements HeapSize { // , Writable{
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
Boolean processed = null; Boolean processed = null;
if (w instanceof Put) { if (w instanceof Put) {
processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
qualifier, compareOp, comparator, (Put) w); qualifier, compareOp, comparator, (Put) w);
} else if (w instanceof Delete) { } else if (w instanceof Delete) {
processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
@ -2886,6 +2895,16 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
protected void checkReadsEnabled() throws IOException {
if (!this.writestate.readsEnabled) {
throw new IOException ("The region's reads are disabled. Cannot serve the request");
}
}
public void setReadsEnabled(boolean readsEnabled) {
this.writestate.setReadsEnabled(readsEnabled);
}
/** /**
* Add updates first to the hlog and then add values to memstore. * Add updates first to the hlog and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row. * Warning: Assumption is caller has lock on passed in row.
@ -2944,7 +2963,7 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
private void rollbackMemstore(List<KeyValue> memstoreCells) { private void rollbackMemstore(List<KeyValue> memstoreCells) {
int kvsRolledback = 0; int kvsRolledback = 0;
for (KeyValue kv : memstoreCells) { for (KeyValue kv : memstoreCells) {
byte[] family = kv.getFamily(); byte[] family = kv.getFamily();
Store store = getStore(family); Store store = getStore(family);
@ -4912,7 +4931,7 @@ public class HRegion implements HeapSize { // , Writable{
// 7. Append no sync // 7. Append no sync
if (!walEdit.isEmpty()) { if (!walEdit.isEmpty()) {
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce); processor.getClusterIds(), nonceGroup, nonce);
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells); walKey, walEdit, getSequenceId(), true, memstoreCells);
@ -5183,7 +5202,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
allKVs.addAll(entry.getValue()); allKVs.addAll(entry.getValue());
} }
// Actually write to WAL now // Actually write to WAL now
if (writeToWAL) { if (writeToWAL) {
// Using default cluster id, as this can only happen in the originating // Using default cluster id, as this can only happen in the originating
@ -5200,7 +5219,7 @@ public class HRegion implements HeapSize { // , Writable{
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
} }
size = this.addAndGetGlobalMemstoreSize(size); size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size); flush = isFlushSize(size);
} finally { } finally {
@ -5406,7 +5425,7 @@ public class HRegion implements HeapSize { // , Writable{
size = this.addAndGetGlobalMemstoreSize(size); size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size); flush = isFlushSize(size);
} }
// Actually write to WAL now // Actually write to WAL now
if (walEdits != null && !walEdits.isEmpty()) { if (walEdits != null && !walEdits.isEmpty()) {
if (writeToWAL) { if (writeToWAL) {
@ -5772,10 +5791,11 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
protected void startRegionOperation(Operation op) throws IOException { protected void startRegionOperation(Operation op) throws IOException {
switch (op) { switch (op) {
case INCREMENT: case GET: // read operations
case APPEND:
case GET:
case SCAN: case SCAN:
checkReadsEnabled();
case INCREMENT: // write operations
case APPEND:
case SPLIT_REGION: case SPLIT_REGION:
case MERGE_REGION: case MERGE_REGION:
case PUT: case PUT:
@ -6064,7 +6084,7 @@ public class HRegion implements HeapSize { // , Writable{
/** /**
* Do not change this sequence id. See {@link #sequenceId} comment. * Do not change this sequence id. See {@link #sequenceId} comment.
* @return sequenceId * @return sequenceId
*/ */
@VisibleForTesting @VisibleForTesting
public AtomicLong getSequenceId() { public AtomicLong getSequenceId() {
@ -6175,7 +6195,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} }
/** /**
* Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
* the WALEdit append later. * the WALEdit append later.

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
/** /**
* View to an on-disk Region. * View to an on-disk Region.
@ -74,6 +75,8 @@ public class HRegionFileSystem {
private static final String REGION_TEMP_DIR = ".tmp"; private static final String REGION_TEMP_DIR = ".tmp";
private final HRegionInfo regionInfo; private final HRegionInfo regionInfo;
//regionInfo for interacting with FS (getting encodedName, etc)
private final HRegionInfo regionInfoForFs;
private final Configuration conf; private final Configuration conf;
private final Path tableDir; private final Path tableDir;
private final FileSystem fs; private final FileSystem fs;
@ -100,6 +103,7 @@ public class HRegionFileSystem {
this.conf = conf; this.conf = conf;
this.tableDir = tableDir; this.tableDir = tableDir;
this.regionInfo = regionInfo; this.regionInfo = regionInfo;
this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
@ -123,7 +127,7 @@ public class HRegionFileSystem {
/** @return {@link Path} to the region directory. */ /** @return {@link Path} to the region directory. */
public Path getRegionDir() { public Path getRegionDir() {
return new Path(this.tableDir, this.regionInfo.getEncodedName()); return new Path(this.tableDir, this.regionInfoForFs.getEncodedName());
} }
// =========================================================================== // ===========================================================================
@ -242,6 +246,7 @@ public class HRegionFileSystem {
public boolean hasReferences(final String familyName) throws IOException { public boolean hasReferences(final String familyName) throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName), FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName),
new PathFilter () { new PathFilter () {
@Override
public boolean accept(Path path) { public boolean accept(Path path) {
return StoreFileInfo.isReference(path); return StoreFileInfo.isReference(path);
} }
@ -288,14 +293,14 @@ public class HRegionFileSystem {
*/ */
public void deleteFamily(final String familyName) throws IOException { public void deleteFamily(final String familyName) throws IOException {
// archive family store files // archive family store files
HFileArchiver.archiveFamily(fs, conf, regionInfo, tableDir, Bytes.toBytes(familyName)); HFileArchiver.archiveFamily(fs, conf, regionInfoForFs, tableDir, Bytes.toBytes(familyName));
// delete the family folder // delete the family folder
Path familyDir = getStoreDir(familyName); Path familyDir = getStoreDir(familyName);
if(fs.exists(familyDir) && !deleteDir(familyDir)) if(fs.exists(familyDir) && !deleteDir(familyDir))
throw new IOException("Could not delete family " + familyName throw new IOException("Could not delete family " + familyName
+ " from FileSystem for region " + regionInfo.getRegionNameAsString() + "(" + " from FileSystem for region " + regionInfoForFs.getRegionNameAsString() + "("
+ regionInfo.getEncodedName() + ")"); + regionInfoForFs.getEncodedName() + ")");
} }
/** /**
@ -405,7 +410,7 @@ public class HRegionFileSystem {
*/ */
public void removeStoreFile(final String familyName, final Path filePath) public void removeStoreFile(final String familyName, final Path filePath)
throws IOException { throws IOException {
HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfo, HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfoForFs,
this.tableDir, Bytes.toBytes(familyName), filePath); this.tableDir, Bytes.toBytes(familyName), filePath);
} }
@ -417,7 +422,7 @@ public class HRegionFileSystem {
*/ */
public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles) public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)
throws IOException { throws IOException {
HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo, HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs,
this.tableDir, Bytes.toBytes(familyName), storeFiles); this.tableDir, Bytes.toBytes(familyName), storeFiles);
} }
@ -602,7 +607,7 @@ public class HRegionFileSystem {
// See REF_NAME_REGEX regex above. The referred-to regions name is // See REF_NAME_REGEX regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family, // up in the path of the passed in <code>f</code> -- parentdir is family,
// then the directory above is the region name. // then the directory above is the region name.
String parentRegionName = regionInfo.getEncodedName(); String parentRegionName = regionInfoForFs.getEncodedName();
// Write reference with same file id only with the other region name as // Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family). // suffix and into the new region location (under same family).
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName); Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
@ -675,12 +680,12 @@ public class HRegionFileSystem {
Path referenceDir = new Path(new Path(mergedDir, Path referenceDir = new Path(new Path(mergedDir,
mergedRegion.getEncodedName()), familyName); mergedRegion.getEncodedName()), familyName);
// A whole reference to the store file. // A whole reference to the store file.
Reference r = Reference.createTopReference(regionInfo.getStartKey()); Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
// Add the referred-to regions name as a dot separated suffix. // Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_REGEX regex above. The referred-to regions name is // See REF_NAME_REGEX regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family, // up in the path of the passed in <code>f</code> -- parentdir is family,
// then the directory above is the region name. // then the directory above is the region name.
String mergingRegionName = regionInfo.getEncodedName(); String mergingRegionName = regionInfoForFs.getEncodedName();
// Write reference with same file id only with the other region name as // Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family). // suffix and into the new region location (under same family).
Path p = new Path(referenceDir, f.getPath().getName() + "." Path p = new Path(referenceDir, f.getPath().getName() + "."
@ -770,7 +775,7 @@ public class HRegionFileSystem {
// pb version is much shorter -- we write now w/o the toString version -- so checking length // pb version is much shorter -- we write now w/o the toString version -- so checking length
// only should be sufficient. I don't want to read the file every time to check if it pb // only should be sufficient. I don't want to read the file every time to check if it pb
// serialized. // serialized.
byte[] content = getRegionInfoFileContent(regionInfo); byte[] content = getRegionInfoFileContent(regionInfoForFs);
try { try {
Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE); Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
@ -786,7 +791,7 @@ public class HRegionFileSystem {
throw new IOException("Unable to remove existing " + regionInfoFile); throw new IOException("Unable to remove existing " + regionInfoFile);
} }
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfo.getEncodedName() + LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfoForFs.getEncodedName() +
" on table " + regionInfo.getTable()); " on table " + regionInfo.getTable());
} }
@ -799,7 +804,7 @@ public class HRegionFileSystem {
* @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation. * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
*/ */
private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException { private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException {
byte[] content = getRegionInfoFileContent(regionInfo); byte[] content = getRegionInfoFileContent(regionInfoForFs);
writeRegionInfoOnFilesystem(content, useTempDir); writeRegionInfoOnFilesystem(content, useTempDir);
} }

View File

@ -363,6 +363,9 @@ public class HRegionServer extends HasThread implements
*/ */
private MovedRegionsCleaner movedRegionsCleaner; private MovedRegionsCleaner movedRegionsCleaner;
// chore for refreshing store files for secondary regions
private StorefileRefresherChore storefileRefresher;
private RegionServerCoprocessorHost rsHost; private RegionServerCoprocessorHost rsHost;
private RegionServerProcedureManagerHost rspmHost; private RegionServerProcedureManagerHost rspmHost;
@ -693,6 +696,12 @@ public class HRegionServer extends HasThread implements
rpcServices.isa.getAddress(), 0)); rpcServices.isa.getAddress(), 0));
this.pauseMonitor = new JvmPauseMonitor(conf); this.pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start(); pauseMonitor.start();
int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
}
} }
/** /**
@ -832,6 +841,9 @@ public class HRegionServer extends HasThread implements
if (this.nonceManagerChore != null) { if (this.nonceManagerChore != null) {
this.nonceManagerChore.interrupt(); this.nonceManagerChore.interrupt();
} }
if (this.storefileRefresher != null) {
this.storefileRefresher.interrupt();
}
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
rspmHost.stop(this.abortRequested || this.killed); rspmHost.stop(this.abortRequested || this.killed);
@ -1506,6 +1518,10 @@ public class HRegionServer extends HasThread implements
Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner", Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner",
uncaughtExceptionHandler); uncaughtExceptionHandler);
} }
if (this.storefileRefresher != null) {
Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher",
uncaughtExceptionHandler);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
@ -1853,6 +1869,9 @@ public class HRegionServer extends HasThread implements
this.replicationSinkHandler.stopReplicationService(); this.replicationSinkHandler.stopReplicationService();
} }
} }
if (this.storefileRefresher != null) {
Threads.shutdown(this.storefileRefresher.getThread());
}
} }
/** /**

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.Key; import java.security.Key;
@ -27,6 +26,8 @@ import java.security.KeyException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
@ -93,6 +94,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/** /**
* A Store holds a column family in a Region. Its a memstore and a set of zero * A Store holds a column family in a Region. Its a memstore and a set of zero
@ -487,10 +489,13 @@ public class HStore implements Store {
*/ */
private List<StoreFile> loadStoreFiles() throws IOException { private List<StoreFile> loadStoreFiles() throws IOException {
Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
return openStoreFiles(files);
}
private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
if (files == null || files.size() == 0) { if (files == null || files.size() == 0) {
return new ArrayList<StoreFile>(); return new ArrayList<StoreFile>();
} }
// initialize the thread pool for opening store files in parallel.. // initialize the thread pool for opening store files in parallel..
ThreadPoolExecutor storeFileOpenerThreadPool = ThreadPoolExecutor storeFileOpenerThreadPool =
this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
@ -550,6 +555,60 @@ public class HStore implements Store {
return results; return results;
} }
/**
* Checks the underlying store files, and opens the files that have not
* been opened, and removes the store file readers for store files no longer
* available. Mainly used by secondary region replicas to keep up to date with
* the primary region files.
* @throws IOException
*/
@Override
public void refreshStoreFiles() throws IOException {
StoreFileManager sfm = storeEngine.getStoreFileManager();
Collection<StoreFile> currentFiles = sfm.getStorefiles();
if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
for (StoreFile sf : currentFiles) {
currentFilesSet.put(sf.getFileInfo(), sf);
}
HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
return;
}
LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
+ " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
for (StoreFileInfo sfi : toBeRemovedFiles) {
toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
}
// try to open the files
List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
// Advance the memstore read point to be at least the new store files seqIds so that
// readers might pick it up. This assumes that the store is not getting any writes (otherwise
// in-flight transactions might be made visible)
if (!toBeAddedFiles.isEmpty()) {
region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
}
// notify scanners, close file readers, and recompute store size
completeCompaction(toBeRemovedStoreFiles, false);
}
private StoreFile createStoreFileAndReader(final Path p) throws IOException { private StoreFile createStoreFileAndReader(final Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
return createStoreFileAndReader(info); return createStoreFileAndReader(info);
@ -1099,7 +1158,7 @@ public class HStore implements Store {
writeCompactionWalRecord(filesToCompact, sfs); writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs);
// At this point the store will use new files for all new scanners. // At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact); // Archive old files & update store size. completeCompaction(filesToCompact, true); // Archive old files & update store size.
} finally { } finally {
finishCompactionRequest(cr); finishCompactionRequest(cr);
} }
@ -1153,7 +1212,8 @@ public class HStore implements Store {
this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
} }
private void replaceStoreFiles(final Collection<StoreFile> compactedFiles, @VisibleForTesting
void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
final Collection<StoreFile> result) throws IOException { final Collection<StoreFile> result) throws IOException {
this.lock.writeLock().lock(); this.lock.writeLock().lock();
try { try {
@ -1300,7 +1360,7 @@ public class HStore implements Store {
this.getCoprocessorHost().postCompact(this, sf, null); this.getCoprocessorHost().postCompact(this, sf, null);
} }
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
completeCompaction(filesToCompact); completeCompaction(filesToCompact, true);
} }
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1500,7 +1560,7 @@ public class HStore implements Store {
} }
} }
/* /**
* <p>It works by processing a compaction that's been written to disk. * <p>It works by processing a compaction that's been written to disk.
* *
* <p>It is usually invoked at the end of a compaction, but might also be * <p>It is usually invoked at the end of a compaction, but might also be
@ -1517,6 +1577,28 @@ public class HStore implements Store {
*/ */
@VisibleForTesting @VisibleForTesting
protected void completeCompaction(final Collection<StoreFile> compactedFiles) protected void completeCompaction(final Collection<StoreFile> compactedFiles)
throws IOException {
completeCompaction(compactedFiles, true);
}
/**
* <p>It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
* invoked at HStore startup, if the prior execution died midway through.
*
* <p>Moving the compacted TreeMap into place means:
* <pre>
* 1) Unload all replaced StoreFile, close and collect list to delete.
* 2) Compute new store size
* </pre>
*
* @param compactedFiles list of files that were compacted
* @param newFile StoreFile that is the result of the compaction
*/
@VisibleForTesting
protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
throws IOException { throws IOException {
try { try {
// Do not delete old store files until we have sent out notification of // Do not delete old store files until we have sent out notification of
@ -1531,7 +1613,9 @@ public class HStore implements Store {
for (StoreFile compactedFile : compactedFiles) { for (StoreFile compactedFile : compactedFiles) {
compactedFile.closeReader(true); compactedFile.closeReader(true);
} }
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); if (removeFiles) {
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
}
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed removing compacted files in " + this + LOG.error("Failed removing compacted files in " + this +

View File

@ -35,13 +35,13 @@ import org.apache.hadoop.hbase.util.ClassSize;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MultiVersionConsistencyControl { public class MultiVersionConsistencyControl {
private static final long NO_WRITE_NUMBER = 0; private static final long NO_WRITE_NUMBER = 0;
private volatile long memstoreRead = 0; private volatile long memstoreRead = 0;
private final Object readWaiters = new Object(); private final Object readWaiters = new Object();
// This is the pending queue of writes. // This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue = private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>(); new LinkedList<WriteEntry>();
/** /**
* Default constructor. Initializes the memstoreRead/Write points to 0. * Default constructor. Initializes the memstoreRead/Write points to 0.
*/ */
@ -60,14 +60,14 @@ public class MultiVersionConsistencyControl {
} }
/** /**
* *
* @param initVal The value we used initially and expected it'll be reset later * @param initVal The value we used initially and expected it'll be reset later
* @return WriteEntry instance. * @return WriteEntry instance.
*/ */
WriteEntry beginMemstoreInsert() { WriteEntry beginMemstoreInsert() {
return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
} }
/** /**
* Get a mvcc write number before an actual one(its log sequence Id) being assigned * Get a mvcc write number before an actual one(its log sequence Id) being assigned
* @param sequenceId * @param sequenceId
@ -83,9 +83,9 @@ public class MultiVersionConsistencyControl {
// changes touch same row key // changes touch same row key
// If for any reason, the bumped value isn't reset due to failure situations, we'll reset // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
// curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
return sequenceId.incrementAndGet() + 1000000000; return sequenceId.incrementAndGet() + 1000000000;
} }
/** /**
* This function starts a MVCC transaction with current region's log change sequence number. Since * This function starts a MVCC transaction with current region's log change sequence number. Since
* we set change sequence number when flushing current change to WAL(late binding), the flush * we set change sequence number when flushing current change to WAL(late binding), the flush
@ -126,7 +126,7 @@ public class MultiVersionConsistencyControl {
} }
waitForPreviousTransactionsComplete(e); waitForPreviousTransactionsComplete(e);
} }
/** /**
* Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
* end of this call, the global read point is at least as large as the write point of the passed * end of this call, the global read point is at least as large as the write point of the passed
@ -183,6 +183,18 @@ public class MultiVersionConsistencyControl {
return false; return false;
} }
/**
* Advances the current read point to be given seqNum if it is smaller than
* that.
*/
void advanceMemstoreReadPointIfNeeded(long seqNum) {
synchronized (writeQueue) {
if (this.memstoreRead < seqNum) {
memstoreRead = seqNum;
}
}
}
/** /**
* Wait for all previous MVCC transactions complete * Wait for all previous MVCC transactions complete
*/ */
@ -190,7 +202,7 @@ public class MultiVersionConsistencyControl {
WriteEntry w = beginMemstoreInsert(); WriteEntry w = beginMemstoreInsert();
waitForPreviousTransactionsComplete(w); waitForPreviousTransactionsComplete(w);
} }
public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
boolean interrupted = false; boolean interrupted = false;
WriteEntry w = waitedEntry; WriteEntry w = waitedEntry;

View File

@ -27,10 +27,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -357,4 +357,13 @@ public interface Store extends HeapSize, StoreConfigInformation {
* @return Whether this store has too many store files. * @return Whether this store has too many store files.
*/ */
boolean hasTooManyStoreFiles(); boolean hasTooManyStoreFiles();
/**
* Checks the underlying store files, and opens the files that have not
* been opened, and removes the store file readers for store files no longer
* available. Mainly used by secondary region replicas to keep up to date with
* the primary region files.
* @throws IOException
*/
void refreshStoreFiles() throws IOException;
} }

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
* Describe a StoreFile (hfile, reference, link) * Describe a StoreFile (hfile, reference, link)
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StoreFileInfo { public class StoreFileInfo implements Comparable<StoreFileInfo> {
public static final Log LOG = LogFactory.getLog(StoreFileInfo.class); public static final Log LOG = LogFactory.getLog(StoreFileInfo.class);
/** /**
@ -403,4 +403,27 @@ public class StoreFileInfo {
} }
return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length); return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length);
} }
@Override
public boolean equals(Object that) {
if (that == null) {
return false;
}
if (that instanceof StoreFileInfo) {
return this.compareTo((StoreFileInfo)that) == 0;
}
return false;
};
@Override
public int compareTo(StoreFileInfo o) {
return this.fileStatus.compareTo(o.fileStatus);
}
@Override
public int hashCode() {
return this.fileStatus.hashCode();
}
} }

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
/**
* A chore for refreshing the store files for secondary regions hosted in the region server.
*
* This chore should run periodically with a shorter interval than HFile TTL
* ("hbase.master.hfilecleaner.ttl", default 5 minutes).
* It ensures that if we cannot refresh files longer than that amount, the region
* will stop serving read requests because the referenced files might have been deleted (by the
* primary region).
*/
public class StorefileRefresherChore extends Chore {
private static final Log LOG = LogFactory.getLog(StorefileRefresherChore.class);
/**
* The period (in milliseconds) for refreshing the store files for the secondary regions.
*/
static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
= "hbase.regionserver.storefile.refresh.period";
static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default
private HRegionServer regionServer;
private long hfileTtl;
private int period;
//ts of last time regions store files are refreshed
private Map<String, Long> lastRefreshTimes; // encodedName -> long
public StorefileRefresherChore(int period, HRegionServer regionServer, Stoppable stoppable) {
super("StorefileRefresherChore", period, stoppable);
this.period = period;
this.regionServer = regionServer;
this.hfileTtl = this.regionServer.getConfiguration().getLong(
TimeToLiveHFileCleaner.TTL_CONF_KEY, TimeToLiveHFileCleaner.DEFAULT_TTL);
if (period > hfileTtl / 2) {
throw new RuntimeException(REGIONSERVER_STOREFILE_REFRESH_PERIOD +
" should be set smaller than half of " + TimeToLiveHFileCleaner.TTL_CONF_KEY);
}
lastRefreshTimes = new HashMap<String, Long>();
}
@Override
protected void chore() {
for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
if (!r.writestate.isReadOnly()) {
// skip checking for this region if it can accept writes
continue;
}
String encodedName = r.getRegionInfo().getEncodedName();
long time = EnvironmentEdgeManager.currentTimeMillis();
if (!lastRefreshTimes.containsKey(encodedName)) {
lastRefreshTimes.put(encodedName, time);
}
try {
for (Store store : r.getStores().values()) {
// TODO: some stores might see new data from flush, while others do not which
// MIGHT break atomic edits across column families. We can fix this with setting
// mvcc read numbers that we know every store has seen
store.refreshStoreFiles();
}
} catch (IOException ex) {
LOG.warn("Exception while trying to refresh store files for region:" + r.getRegionInfo()
+ ", exception:" + StringUtils.stringifyException(ex));
// Store files have a TTL in the archive directory. If we fail to refresh for that long, we stop serving reads
if (isRegionStale(encodedName, time)) {
r.setReadsEnabled(false); // stop serving reads
}
continue;
}
lastRefreshTimes.put(encodedName, time);
r.setReadsEnabled(true); // restart serving reads
}
// remove closed regions
for (String encodedName : lastRefreshTimes.keySet()) {
if (regionServer.getFromOnlineRegions(encodedName) == null) {
lastRefreshTimes.remove(encodedName);
}
}
}
protected boolean isRegionStale(String encodedName, long time) {
long lastRefreshTime = lastRefreshTimes.get(encodedName);
return time - lastRefreshTime > hfileTtl - period;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
*/
public class ServerRegionReplicaUtil extends RegionReplicaUtil {
/**
* Returns the regionInfo object to use for interacting with the file system.
* @return An HRegionInfo object to interact with the filesystem
*/
public static HRegionInfo getRegionInfoForFs(HRegionInfo regionInfo) {
if (regionInfo == null) {
return null;
}
return RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo);
}
/**
* Returns whether this region replica can accept writes.
* @param region the HRegion object
* @return whether the replica is read only
*/
public static boolean isReadOnly(HRegion region) {
return region.getTableDesc().isReadOnly()
|| !isDefaultReplica(region.getRegionInfo());
}
}

View File

@ -1815,6 +1815,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
} }
} }
public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Delete delete = new Delete(data);
delete.deleteFamily(f);
t.delete(delete);
}
}
/** /**
* Return the number of rows in the given table. * Return the number of rows in the given table.
*/ */

View File

@ -245,7 +245,7 @@ public class TestHRegion {
throws IOException { throws IOException {
super(fs, rootDir, logName, conf); super(fs, rootDir, logName, conf);
} }
void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
this.storeFlushCtx = storeFlushCtx; this.storeFlushCtx = storeFlushCtx;
} }
@ -256,18 +256,18 @@ public class TestHRegion {
super.sync(txid); super.sync(txid);
} }
} }
FileSystem fs = FileSystem.get(CONF); FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
MyFaultyHLog faultyLog = new MyFaultyHLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); MyFaultyHLog faultyLog = new MyFaultyHLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
HRegion region = initHRegion(tableName, null, null, name.getMethodName(), HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
Store store = region.getStore(COLUMN_FAMILY_BYTES); Store store = region.getStore(COLUMN_FAMILY_BYTES);
// Get some random bytes. // Get some random bytes.
byte [] value = Bytes.toBytes(name.getMethodName()); byte [] value = Bytes.toBytes(name.getMethodName());
faultyLog.setStoreFlushCtx(store.createFlushContext(12345)); faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
Put put = new Put(value); Put put = new Put(value);
put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
faultyLog.setFailureType(FaultyHLog.FailureType.SYNC); faultyLog.setFailureType(FaultyHLog.FailureType.SYNC);
@ -284,7 +284,7 @@ public class TestHRegion {
assertTrue("flushable size should be zero, but it is " + sz, sz == 0); assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
HRegion.closeHRegion(region); HRegion.closeHRegion(region);
} }
/** /**
* Test we do not lose data if we fail a flush and then close. * Test we do not lose data if we fail a flush and then close.
* Part of HBase-10466. Tests the following from the issue description: * Part of HBase-10466. Tests the following from the issue description:
@ -2244,7 +2244,7 @@ public class TestHRegion {
/** /**
* This method tests https://issues.apache.org/jira/browse/HBASE-2516. * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
* *
* @throws IOException * @throws IOException
*/ */
@Test @Test
@ -2800,7 +2800,7 @@ public class TestHRegion {
/** /**
* Added for HBASE-5416 * Added for HBASE-5416
* *
* Here we test scan optimization when only subset of CFs are used in filter * Here we test scan optimization when only subset of CFs are used in filter
* conditions. * conditions.
*/ */
@ -2869,7 +2869,7 @@ public class TestHRegion {
/** /**
* HBASE-5416 * HBASE-5416
* *
* Test case when scan limits amount of KVs returned on each next() call. * Test case when scan limits amount of KVs returned on each next() call.
*/ */
@Test @Test
@ -2967,7 +2967,7 @@ public class TestHRegion {
// //////////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////////
/** /**
* Splits twice and verifies getting from each of the split regions. * Splits twice and verifies getting from each of the split regions.
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -3099,7 +3099,7 @@ public class TestHRegion {
* Flushes the cache in a thread while scanning. The tests verify that the * Flushes the cache in a thread while scanning. The tests verify that the
* scan is coherent - e.g. the returned results are always of the same or * scan is coherent - e.g. the returned results are always of the same or
* later update as the previous results. * later update as the previous results.
* *
* @throws IOException * @throws IOException
* scan / compact * scan / compact
* @throws InterruptedException * @throws InterruptedException
@ -3221,7 +3221,7 @@ public class TestHRegion {
/** /**
* Writes very wide records and scans for the latest every time.. Flushes and * Writes very wide records and scans for the latest every time.. Flushes and
* compacts the region every now and then to keep things realistic. * compacts the region every now and then to keep things realistic.
* *
* @throws IOException * @throws IOException
* by flush / scan / compaction * by flush / scan / compaction
* @throws InterruptedException * @throws InterruptedException
@ -3386,7 +3386,7 @@ public class TestHRegion {
/** /**
* Writes very wide records and gets the latest row every time.. Flushes and * Writes very wide records and gets the latest row every time.. Flushes and
* compacts the region aggressivly to catch issues. * compacts the region aggressivly to catch issues.
* *
* @throws IOException * @throws IOException
* by flush / scan / compaction * by flush / scan / compaction
* @throws InterruptedException * @throws InterruptedException
@ -3786,7 +3786,7 @@ public class TestHRegion {
/** /**
* Testcase to check state of region initialization task set to ABORTED or not * Testcase to check state of region initialization task set to ABORTED or not
* if any exceptions during initialization * if any exceptions during initialization
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -4211,7 +4211,116 @@ public class TestHRegion {
this.region = null; this.region = null;
} }
@Test
public void testRegionReplicaSecondary() throws IOException {
// create a primary region, load some data and flush
// create a secondary region, and do a get against that
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
byte[][] families = new byte[][] {
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
};
byte[] cq = Bytes.toBytes("cq");
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
for (byte[] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
long time = System.currentTimeMillis();
HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
false, time, 0);
HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
false, time, 1);
HRegion primaryRegion = null, secondaryRegion = null;
try {
primaryRegion = HRegion.createHRegion(primaryHri,
rootDir, TEST_UTIL.getConfiguration(), htd);
// load some data
putData(primaryRegion, 0, 1000, cq, families);
// flush region
primaryRegion.flushcache();
// open secondary region
secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
verifyData(secondaryRegion, 0, 1000, cq, families);
} finally {
if (primaryRegion != null) {
HRegion.closeHRegion(primaryRegion);
}
if (secondaryRegion != null) {
HRegion.closeHRegion(secondaryRegion);
}
}
}
@Test
public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
// create a primary region, load some data and flush
// create a secondary region, and do a put against that
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
byte[][] families = new byte[][] {
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
};
byte[] cq = Bytes.toBytes("cq");
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
for (byte[] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
long time = System.currentTimeMillis();
HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
false, time, 0);
HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
false, time, 1);
HRegion primaryRegion = null, secondaryRegion = null;
try {
primaryRegion = HRegion.createHRegion(primaryHri,
rootDir, TEST_UTIL.getConfiguration(), htd);
// load some data
putData(primaryRegion, 0, 1000, cq, families);
// flush region
primaryRegion.flushcache();
// open secondary region
secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
try {
putData(secondaryRegion, 0, 1000, cq, families);
fail("Should have thrown exception");
} catch (IOException ex) {
// expected
}
} finally {
if (primaryRegion != null) {
HRegion.closeHRegion(primaryRegion);
}
if (secondaryRegion != null) {
HRegion.closeHRegion(secondaryRegion);
}
}
}
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
putData(this.region, startRow, numRows, qf, families);
}
private void putData(HRegion region,
int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
for (int i = startRow; i < startRow + numRows; i++) { for (int i = startRow; i < startRow + numRows; i++) {
Put put = new Put(Bytes.toBytes("" + i)); Put put = new Put(Bytes.toBytes("" + i));
put.setDurability(Durability.SKIP_WAL); put.setDurability(Durability.SKIP_WAL);
@ -4254,13 +4363,13 @@ public class TestHRegion {
/* /*
* Assert first value in the passed region is <code>firstValue</code>. * Assert first value in the passed region is <code>firstValue</code>.
* *
* @param r * @param r
* *
* @param fs * @param fs
* *
* @param firstValue * @param firstValue
* *
* @throws IOException * @throws IOException
*/ */
private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)

View File

@ -0,0 +1,310 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException;
/**
* Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
* cluster. See {@link TestRegionServerNoMaster}.
*/
@Category(MediumTests.class)
public class TestRegionReplicas {
private static final Log LOG = LogFactory.getLog(TestRegionReplicas.class);
private static final int NB_SERVERS = 1;
private static HTable table;
private static final byte[] row = "TestRegionReplicas".getBytes();
private static HRegionInfo hriPrimary;
private static HRegionInfo hriSecondary;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@BeforeClass
public static void before() throws Exception {
HTU.startMiniCluster(NB_SERVERS);
final byte[] tableName = Bytes.toBytes(TestRegionReplicas.class.getSimpleName());
// Create table then get the single region for our new table.
table = HTU.createTable(tableName, f);
hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
// mock a secondary region info to open
hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
// No master
TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
}
@AfterClass
public static void afterClass() throws Exception {
table.close();
HTU.shutdownMiniCluster();
}
@After
public void after() throws Exception {
// Clean the state if the test failed before cleaning the znode
// It does not manage all bad failures, so if there are multiple failures, only
// the first one should be looked at.
ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
}
private HRegionServer getRS() {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
private void openRegion(HRegionInfo hri) throws Exception {
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0).
equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
checkRegionIsOpened(hri.getEncodedName());
}
private void closeRegion(HRegionInfo hri) throws Exception {
ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(),
hri.getEncodedName(), true);
AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
checkRegionIsClosed(hri.getEncodedName());
ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName());
}
private void checkRegionIsOpened(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
Assert.assertTrue(
ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), encodedRegionName, getRS().getServerName()));
}
private void checkRegionIsClosed(String encodedRegionName) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
try {
Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
} catch (NotServingRegionException expected) {
// That's how it work: if the region is closed we have an exception.
}
// We don't delete the znode here, because there is not always a znode.
}
@Test(timeout = 60000)
public void testOpenRegionReplica() throws Exception {
openRegion(hriSecondary);
try {
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
// assert that we can read back from primary
Assert.assertEquals(1000, HTU.countRows(table));
} finally {
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(hriSecondary);
}
}
/** Tests that the meta location is saved for secondary regions */
@Test(timeout = 60000)
public void testRegionReplicaUpdatesMetaLocation() throws Exception {
openRegion(hriSecondary);
HTable meta = null;
try {
meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME);
TestMetaReaderEditor.assertMetaLocation(meta, hriPrimary.getRegionName()
, getRS().getServerName(), -1, 1, false);
} finally {
if (meta != null ) meta.close();
closeRegion(hriSecondary);
}
}
@Test(timeout = 60000)
public void testRegionReplicaGets() throws Exception {
try {
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
// assert that we can read back from primary
Assert.assertEquals(1000, HTU.countRows(table));
// flush so that region replica can read
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
openRegion(hriSecondary);
// first try directly against region
HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
assertGet(region, 42, true);
assertGetRpc(hriSecondary, 42, true);
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
}
}
private void assertGet(HRegion region, int value, boolean expect) throws IOException {
byte[] row = Bytes.toBytes(String.valueOf(value));
Get get = new Get(row);
Result result = region.get(get);
if (expect) {
Assert.assertArrayEquals(row, result.getValue(f, null));
} else {
result.isEmpty();
}
}
// build a mock rpc
private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException {
byte[] row = Bytes.toBytes(String.valueOf(value));
Get get = new Get(row);
ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
Result result = ProtobufUtil.toResult(getResp.getResult());
if (expect) {
Assert.assertArrayEquals(row, result.getValue(f, null));
} else {
result.isEmpty();
}
}
private void restartRegionServer() throws Exception {
afterClass();
before();
}
@Test(timeout = 300000)
public void testRefreshStoreFiles() throws Exception {
// enable store file refreshing
final int refreshPeriod = 2000; // 2 sec
HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
// restart the region server so that it starts the refresher chore
restartRegionServer();
try {
LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
openRegion(hriSecondary);
//load some data to primary
LOG.info("Loading data to primary region");
HTU.loadNumericRows(table, f, 0, 1000);
// assert that we can read back from primary
Assert.assertEquals(1000, HTU.countRows(table));
// flush so that region replica can read
LOG.info("Flushing primary region");
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
// ensure that chore is run
LOG.info("Sleeping for " + (4 * refreshPeriod));
Threads.sleep(4 * refreshPeriod);
LOG.info("Checking results from secondary region replica");
HRegion secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
assertGet(secondaryRegion, 42, true);
assertGetRpc(hriSecondary, 42, true);
assertGetRpc(hriSecondary, 1042, false);
//load some data to primary
HTU.loadNumericRows(table, f, 1000, 1100);
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
HTU.loadNumericRows(table, f, 2000, 2100);
getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache();
// ensure that chore is run
Threads.sleep(4 * refreshPeriod);
assertGetRpc(hriSecondary, 42, true);
assertGetRpc(hriSecondary, 1042, true);
assertGetRpc(hriSecondary, 2042, true);
// ensure that we are see the 3 store files
Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
// force compaction
HTU.compact(table.getName(), true);
long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
while (System.currentTimeMillis() < wakeUpTime) {
assertGetRpc(hriSecondary, 42, true);
assertGetRpc(hriSecondary, 1042, true);
assertGetRpc(hriSecondary, 2042, true);
Threads.sleep(10);
}
// ensure that we see the compacted file only
Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
closeRegion(hriSecondary);
}
}
}

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -85,6 +87,11 @@ public class TestRegionServerNoMaster {
hri = table.getRegionLocation(row, false).getRegionInfo(); hri = table.getRegionLocation(row, false).getRegionInfo();
regionName = hri.getRegionName(); regionName = hri.getRegionName();
stopMasterAndAssignMeta(HTU);
}
public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU)
throws NodeExistsException, KeeperException, IOException, InterruptedException {
// No master // No master
HTU.getHBaseCluster().getMaster().stopMaster(); HTU.getHBaseCluster().getMaster().stopMaster();

View File

@ -20,6 +20,10 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
@ -84,6 +88,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists;
/** /**
* Test class for the Store * Test class for the Store
*/ */
@ -137,7 +143,7 @@ public class TestStore {
} }
private void init(String methodName) throws IOException { private void init(String methodName) throws IOException {
init(methodName, HBaseConfiguration.create()); init(methodName, TEST_UTIL.getConfiguration());
} }
private void init(String methodName, Configuration conf) private void init(String methodName, Configuration conf)
@ -194,6 +200,7 @@ public class TestStore {
// Inject our faulty LocalFileSystem // Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() { user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception { public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core) // Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
@ -330,7 +337,7 @@ public class TestStore {
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
// Initialize region // Initialize region
init(name.getMethodName(), conf); init(name.getMethodName(), conf);
int storeFileNum = 4; int storeFileNum = 4;
for (int i = 1; i <= storeFileNum; i++) { for (int i = 1; i <= storeFileNum; i++) {
LOG.info("Adding some data for the store file #"+i); LOG.info("Adding some data for the store file #"+i);
@ -350,12 +357,12 @@ public class TestStore {
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
} }
private static long getLowestTimeStampFromFS(FileSystem fs, private static long getLowestTimeStampFromFS(FileSystem fs,
final Collection<StoreFile> candidates) throws IOException { final Collection<StoreFile> candidates) throws IOException {
long minTs = Long.MAX_VALUE; long minTs = Long.MAX_VALUE;
if (candidates.isEmpty()) { if (candidates.isEmpty()) {
return minTs; return minTs;
} }
Path[] p = new Path[candidates.size()]; Path[] p = new Path[candidates.size()];
int i = 0; int i = 0;
@ -363,7 +370,7 @@ public class TestStore {
p[i] = sf.getPath(); p[i] = sf.getPath();
++i; ++i;
} }
FileStatus[] stats = fs.listStatus(p); FileStatus[] stats = fs.listStatus(p);
if (stats == null || stats.length == 0) { if (stats == null || stats.length == 0) {
return minTs; return minTs;
@ -724,6 +731,7 @@ public class TestStore {
conf.setClass("fs.file.impl", FaultyFileSystem.class, conf.setClass("fs.file.impl", FaultyFileSystem.class,
FileSystem.class); FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() { user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception { public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core) // Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
@ -790,6 +798,7 @@ public class TestStore {
overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
} }
@Override
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) int bufferSize, short replication, long blockSize, Progressable progress)
throws IOException { throws IOException {
@ -968,4 +977,102 @@ public class TestStore {
Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor, Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
this.store.storeEngine.getCompactor()); this.store.storeEngine.getCompactor());
} }
private void addStoreFile() throws IOException {
StoreFile f = this.store.getStorefiles().iterator().next();
Path storedir = f.getPath().getParent();
long seqid = this.store.getMaxSequenceId();
Configuration c = TEST_UTIL.getConfiguration();
FileSystem fs = FileSystem.get(c);
HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
fs)
.withOutputDir(storedir)
.withFileContext(fileContext)
.build();
w.appendMetadata(seqid + 1, false);
w.close();
LOG.info("Added store file:" + w.getPath());
}
private void archiveStoreFile(int index) throws IOException {
Collection<StoreFile> files = this.store.getStorefiles();
StoreFile sf = null;
Iterator<StoreFile> it = files.iterator();
for (int i = 0; i <= index; i++) {
sf = it.next();
}
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
}
public void testRefreshStoreFiles() throws Exception {
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
// add some data, flush
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
flush(1);
assertEquals(1, this.store.getStorefilesCount());
// add one more file
addStoreFile();
assertEquals(1, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
// add three more files
addStoreFile();
addStoreFile();
addStoreFile();
assertEquals(2, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(5, this.store.getStorefilesCount());
archiveStoreFile(0);
assertEquals(5, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(4, this.store.getStorefilesCount());
archiveStoreFile(0);
archiveStoreFile(1);
archiveStoreFile(2);
assertEquals(4, this.store.getStorefilesCount());
store.refreshStoreFiles();
assertEquals(1, this.store.getStorefilesCount());
archiveStoreFile(0);
store.refreshStoreFiles();
assertEquals(0, this.store.getStorefilesCount());
}
@SuppressWarnings("unchecked")
public void testRefreshStoreFilesNotChanged() throws IOException {
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
// add some data, flush
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
flush(1);
// add one more file
addStoreFile();
HStore spiedStore = spy(store);
// call first time after files changed
spiedStore.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
// call second time
spiedStore.refreshStoreFiles();
//ensure that replaceStoreFiles is not called if files are not refreshed
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
} }

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestStoreFileRefresherChore {
private HBaseTestingUtility TEST_UTIL;
private Path testDir;
@Before
public void setUp() {
TEST_UTIL = new HBaseTestingUtility();
testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
}
private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
HTableDescriptor htd = new HTableDescriptor(tableName);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
// Set default to be three versions.
hcd.setMaxVersions(Integer.MAX_VALUE);
htd.addFamily(hcd);
}
return htd;
}
static class FailingHRegionFileSystem extends HRegionFileSystem {
boolean fail = false;
FailingHRegionFileSystem(Configuration conf, FileSystem fs, Path tableDir, HRegionInfo regionInfo) {
super(conf, fs, tableDir, regionInfo);
}
@Override
public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
if (fail) {
throw new IOException("simulating FS failure");
}
return super.getStoreFiles(familyName);
}
}
private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
Path tableDir = new Path(testDir, htd.getTableName().getNameAsString());
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
HRegionFileSystem fs = new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info);
HRegion region = new HRegion(fs, HLogFactory.createHLog(fs.getFileSystem(),
tableDir, "log_" + replicaId, conf), conf, htd, null);
region.initialize();
return region;
}
private void putData(HRegion region, int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
Put put = new Put(Bytes.toBytes("" + i));
put.setDurability(Durability.SKIP_WAL);
for (byte[] family : families) {
put.add(family, qf, null);
}
region.put(put);
}
}
private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
byte[] row = Bytes.toBytes("" + i);
Get get = new Get(row);
for (byte[] family : families) {
get.addColumn(family, qf);
}
Result result = newReg.get(get);
Cell[] raw = result.rawCells();
assertEquals(families.length, result.size());
for (int j = 0; j < families.length; j++) {
assertTrue(CellUtil.matchingRow(raw[j], row));
assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
assertTrue(CellUtil.matchingQualifier(raw[j], qf));
}
}
}
static class StaleStorefileRefresherChore extends StorefileRefresherChore {
boolean isStale = false;
public StaleStorefileRefresherChore(int period, HRegionServer regionServer,
Stoppable stoppable) {
super(period, regionServer, stoppable);
}
@Override
protected boolean isRegionStale(String encodedName, long time) {
return isStale;
}
}
@Test (timeout = 60000)
public void testIsStale() throws IOException {
int period = 0;
byte[][] families = new byte[][] {Bytes.toBytes("cf")};
byte[] qf = Bytes.toBytes("cq");
HRegionServer regionServer = mock(HRegionServer.class);
List<HRegion> regions = new ArrayList<HRegion>();
when(regionServer.getOnlineRegionsLocalContext()).thenReturn(regions);
when(regionServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
HTableDescriptor htd = getTableDesc(TableName.valueOf("testIsStale"), families);
HRegion primary = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0);
HRegion replica1 = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1);
regions.add(primary);
regions.add(replica1);
StaleStorefileRefresherChore chore = new StaleStorefileRefresherChore(period, regionServer, new StoppableImplementation());
// write some data to primary and flush
putData(primary, 0, 100, qf, families);
primary.flushcache();
verifyData(primary, 0, 100, qf, families);
try {
verifyData(replica1, 0, 100, qf, families);
Assert.fail("should have failed");
} catch(AssertionError ex) {
// expected
}
chore.chore();
verifyData(replica1, 0, 100, qf, families);
// simulate an fs failure where we cannot refresh the store files for the replica
((FailingHRegionFileSystem)replica1.getRegionFileSystem()).fail = true;
// write some more data to primary and flush
putData(primary, 100, 100, qf, families);
primary.flushcache();
verifyData(primary, 0, 200, qf, families);
chore.chore(); // should not throw ex, but we cannot refresh the store files
verifyData(replica1, 0, 100, qf, families);
try {
verifyData(replica1, 100, 100, qf, families);
Assert.fail("should have failed");
} catch(AssertionError ex) {
// expected
}
chore.isStale = true;
chore.chore(); //now after this, we cannot read back any value
try {
verifyData(replica1, 0, 100, qf, families);
Assert.fail("should have failed with IOException");
} catch(IOException ex) {
// expected
}
}
}