diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d80beb34d9e..43723f8c301 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1220,6 +1220,19 @@ possible configurations would overwhelm and obscure the important. org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager Fully qualified name of class implementing coordinated state manager. + + hbase.regionserver.storefile.refresh.period + 0 + + The period (in milliseconds) for refreshing the store files for the secondary regions. 0 + means this feature is disabled. Secondary regions sees new files (from flushes and + compactions) from primary once the secondary region refreshes the list of files in the + region (there is no notification mechanism). But too frequent refreshes might cause + extra Namenode pressure. If the files cannot be refreshed for longer than HFile TTL + (hbase.master.hfilecleaner.ttl) the requests are rejected. Configuring HFile TTL to a larger + value is also recommended with this setting. + + hbase.http.filter.initializers org.apache.hadoop.hbase.http.lib.StaticUserWebFilter diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java index 4b7038f1a12..f0af406e9da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private @@ -63,10 +64,11 @@ public class ModifyTableHandler extends TableEventHandler { super.prepareWithTableLock(); // Check operation is possible on the table in its current state // Also checks whether the table exists - if (masterServices.getAssignmentManager().getZKTable().isEnabledTable(this.htd.getTableName()) + if (masterServices.getAssignmentManager().getTableStateManager() + .isTableState(this.htd.getTableName(), ZooKeeperProtos.Table.State.ENABLED) && this.htd.getRegionReplication() != getTableDescriptor().getRegionReplication()) { throw new IOException("REGION_REPLICATION change is not supported for enabled tables"); - } + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b89d886f3a0..cc1be80a5ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; @@ -220,7 +221,7 @@ public class HRegion implements HeapSize { // , Writable{ * Its default value is -1L. This default is used as a marker to indicate * that the region hasn't opened yet. Once it is opened, it is set to the derived * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. - * + * *

Control of this sequence is handed off to the WAL/HLog implementation. It is responsible * for tagging edits with the correct sequence id since it is responsible for getting the * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT @@ -375,6 +376,9 @@ public class HRegion implements HeapSize { // , Writable{ volatile boolean writesEnabled = true; // Set if region is read-only volatile boolean readOnly = false; + // whether the reads are enabled. This is different than readOnly, because readOnly is + // static in the lifetime of the region, while readsEnabled is dynamic + volatile boolean readsEnabled = true; /** * Set flags that make this region read-only. @@ -394,6 +398,10 @@ public class HRegion implements HeapSize { // , Writable{ return this.flushRequested; } + void setReadsEnabled(boolean readsEnabled) { + this.readsEnabled = readsEnabled; + } + static final long HEAP_SIZE = ClassSize.align( ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); } @@ -727,7 +735,7 @@ public class HRegion implements HeapSize { // , Writable{ fs.cleanupAnySplitDetritus(); fs.cleanupMergesDir(); - this.writestate.setReadOnly(this.htableDescriptor.isReadOnly()); + this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; this.writestate.compacting = 0; @@ -2156,6 +2164,7 @@ public class HRegion implements HeapSize { // , Writable{ this.nonce = nonce; } + @Override public Mutation getMutation(int index) { return this.operations[index]; } @@ -2485,7 +2494,7 @@ public class HRegion implements HeapSize { // , Writable{ // Acquire the latest mvcc number // ---------------------------------- w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = @@ -2547,7 +2556,7 @@ public class HRegion implements HeapSize { // , Writable{ } // txid should always increase, so having the one from the last call is ok. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), + this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, currentNonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, null); @@ -2574,7 +2583,7 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); @@ -2599,7 +2608,7 @@ public class HRegion implements HeapSize { // , Writable{ if (txid != 0) { syncOrDefer(txid, durability); } - + doRollBackMemstore = false; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2741,7 +2750,7 @@ public class HRegion implements HeapSize { // , Writable{ if (this.getCoprocessorHost() != null) { Boolean processed = null; if (w instanceof Put) { - processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, + processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, qualifier, compareOp, comparator, (Put) w); } else if (w instanceof Delete) { processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, @@ -2886,6 +2895,16 @@ public class HRegion implements HeapSize { // , Writable{ } } + protected void checkReadsEnabled() throws IOException { + if (!this.writestate.readsEnabled) { + throw new IOException ("The region's reads are disabled. Cannot serve the request"); + } + } + + public void setReadsEnabled(boolean readsEnabled) { + this.writestate.setReadsEnabled(readsEnabled); + } + /** * Add updates first to the hlog and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. @@ -2944,7 +2963,7 @@ public class HRegion implements HeapSize { // , Writable{ */ private void rollbackMemstore(List memstoreCells) { int kvsRolledback = 0; - + for (KeyValue kv : memstoreCells) { byte[] family = kv.getFamily(); Store store = getStore(family); @@ -4912,7 +4931,7 @@ public class HRegion implements HeapSize { // , Writable{ // 7. Append no sync if (!walEdit.isEmpty()) { walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, processor.getClusterIds(), nonceGroup, nonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); @@ -5183,7 +5202,7 @@ public class HRegion implements HeapSize { // , Writable{ } allKVs.addAll(entry.getValue()); } - + // Actually write to WAL now if (writeToWAL) { // Using default cluster id, as this can only happen in the originating @@ -5200,7 +5219,7 @@ public class HRegion implements HeapSize { // , Writable{ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } - + size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -5406,7 +5425,7 @@ public class HRegion implements HeapSize { // , Writable{ size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } - + // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { @@ -5772,10 +5791,11 @@ public class HRegion implements HeapSize { // , Writable{ */ protected void startRegionOperation(Operation op) throws IOException { switch (op) { - case INCREMENT: - case APPEND: - case GET: + case GET: // read operations case SCAN: + checkReadsEnabled(); + case INCREMENT: // write operations + case APPEND: case SPLIT_REGION: case MERGE_REGION: case PUT: @@ -6064,7 +6084,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Do not change this sequence id. See {@link #sequenceId} comment. - * @return sequenceId + * @return sequenceId */ @VisibleForTesting public AtomicLong getSequenceId() { @@ -6175,7 +6195,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + /** * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore * the WALEdit append later. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 1bc7edd7a50..42199697851 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; /** * View to an on-disk Region. @@ -74,6 +75,8 @@ public class HRegionFileSystem { private static final String REGION_TEMP_DIR = ".tmp"; private final HRegionInfo regionInfo; + //regionInfo for interacting with FS (getting encodedName, etc) + private final HRegionInfo regionInfoForFs; private final Configuration conf; private final Path tableDir; private final FileSystem fs; @@ -100,6 +103,7 @@ public class HRegionFileSystem { this.conf = conf; this.tableDir = tableDir; this.regionInfo = regionInfo; + this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", @@ -123,7 +127,7 @@ public class HRegionFileSystem { /** @return {@link Path} to the region directory. */ public Path getRegionDir() { - return new Path(this.tableDir, this.regionInfo.getEncodedName()); + return new Path(this.tableDir, this.regionInfoForFs.getEncodedName()); } // =========================================================================== @@ -242,6 +246,7 @@ public class HRegionFileSystem { public boolean hasReferences(final String familyName) throws IOException { FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName), new PathFilter () { + @Override public boolean accept(Path path) { return StoreFileInfo.isReference(path); } @@ -288,14 +293,14 @@ public class HRegionFileSystem { */ public void deleteFamily(final String familyName) throws IOException { // archive family store files - HFileArchiver.archiveFamily(fs, conf, regionInfo, tableDir, Bytes.toBytes(familyName)); + HFileArchiver.archiveFamily(fs, conf, regionInfoForFs, tableDir, Bytes.toBytes(familyName)); // delete the family folder Path familyDir = getStoreDir(familyName); if(fs.exists(familyDir) && !deleteDir(familyDir)) throw new IOException("Could not delete family " + familyName - + " from FileSystem for region " + regionInfo.getRegionNameAsString() + "(" - + regionInfo.getEncodedName() + ")"); + + " from FileSystem for region " + regionInfoForFs.getRegionNameAsString() + "(" + + regionInfoForFs.getEncodedName() + ")"); } /** @@ -405,7 +410,7 @@ public class HRegionFileSystem { */ public void removeStoreFile(final String familyName, final Path filePath) throws IOException { - HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfo, + HFileArchiver.archiveStoreFile(this.conf, this.fs, this.regionInfoForFs, this.tableDir, Bytes.toBytes(familyName), filePath); } @@ -417,7 +422,7 @@ public class HRegionFileSystem { */ public void removeStoreFiles(final String familyName, final Collection storeFiles) throws IOException { - HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfo, + HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs, this.tableDir, Bytes.toBytes(familyName), storeFiles); } @@ -602,7 +607,7 @@ public class HRegionFileSystem { // See REF_NAME_REGEX regex above. The referred-to regions name is // up in the path of the passed in f -- parentdir is family, // then the directory above is the region name. - String parentRegionName = regionInfo.getEncodedName(); + String parentRegionName = regionInfoForFs.getEncodedName(); // Write reference with same file id only with the other region name as // suffix and into the new region location (under same family). Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName); @@ -675,12 +680,12 @@ public class HRegionFileSystem { Path referenceDir = new Path(new Path(mergedDir, mergedRegion.getEncodedName()), familyName); // A whole reference to the store file. - Reference r = Reference.createTopReference(regionInfo.getStartKey()); + Reference r = Reference.createTopReference(regionInfoForFs.getStartKey()); // Add the referred-to regions name as a dot separated suffix. // See REF_NAME_REGEX regex above. The referred-to regions name is // up in the path of the passed in f -- parentdir is family, // then the directory above is the region name. - String mergingRegionName = regionInfo.getEncodedName(); + String mergingRegionName = regionInfoForFs.getEncodedName(); // Write reference with same file id only with the other region name as // suffix and into the new region location (under same family). Path p = new Path(referenceDir, f.getPath().getName() + "." @@ -770,7 +775,7 @@ public class HRegionFileSystem { // pb version is much shorter -- we write now w/o the toString version -- so checking length // only should be sufficient. I don't want to read the file every time to check if it pb // serialized. - byte[] content = getRegionInfoFileContent(regionInfo); + byte[] content = getRegionInfoFileContent(regionInfoForFs); try { Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE); @@ -786,7 +791,7 @@ public class HRegionFileSystem { throw new IOException("Unable to remove existing " + regionInfoFile); } } catch (FileNotFoundException e) { - LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfo.getEncodedName() + + LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfoForFs.getEncodedName() + " on table " + regionInfo.getTable()); } @@ -799,7 +804,7 @@ public class HRegionFileSystem { * @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation. */ private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException { - byte[] content = getRegionInfoFileContent(regionInfo); + byte[] content = getRegionInfoFileContent(regionInfoForFs); writeRegionInfoOnFilesystem(content, useTempDir); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d8f16962c7e..2cdd136d79c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -363,6 +363,9 @@ public class HRegionServer extends HasThread implements */ private MovedRegionsCleaner movedRegionsCleaner; + // chore for refreshing store files for secondary regions + private StorefileRefresherChore storefileRefresher; + private RegionServerCoprocessorHost rsHost; private RegionServerProcedureManagerHost rspmHost; @@ -693,6 +696,12 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getAddress(), 0)); this.pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); + + int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD + , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); + if (storefileRefreshPeriod > 0) { + this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); + } } /** @@ -832,6 +841,9 @@ public class HRegionServer extends HasThread implements if (this.nonceManagerChore != null) { this.nonceManagerChore.interrupt(); } + if (this.storefileRefresher != null) { + this.storefileRefresher.interrupt(); + } // Stop the snapshot and other procedure handlers, forcefully killing all running tasks rspmHost.stop(this.abortRequested || this.killed); @@ -1506,6 +1518,10 @@ public class HRegionServer extends HasThread implements Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner", uncaughtExceptionHandler); } + if (this.storefileRefresher != null) { + Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher", + uncaughtExceptionHandler); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1853,6 +1869,9 @@ public class HRegionServer extends HasThread implements this.replicationSinkHandler.stopReplicationService(); } } + if (this.storefileRefresher != null) { + Threads.shutdown(this.storefileRefresher.getThread()); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 221824404a5..280c8ace44e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.io.FileNotFoundException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.security.Key; @@ -27,6 +26,8 @@ import java.security.KeyException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; @@ -93,6 +94,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * A Store holds a column family in a Region. Its a memstore and a set of zero @@ -487,10 +489,13 @@ public class HStore implements Store { */ private List loadStoreFiles() throws IOException { Collection files = fs.getStoreFiles(getColumnFamilyName()); + return openStoreFiles(files); + } + + private List openStoreFiles(Collection files) throws IOException { if (files == null || files.size() == 0) { return new ArrayList(); } - // initialize the thread pool for opening store files in parallel.. ThreadPoolExecutor storeFileOpenerThreadPool = this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + @@ -550,6 +555,60 @@ public class HStore implements Store { return results; } + /** + * Checks the underlying store files, and opens the files that have not + * been opened, and removes the store file readers for store files no longer + * available. Mainly used by secondary region replicas to keep up to date with + * the primary region files. + * @throws IOException + */ + @Override + public void refreshStoreFiles() throws IOException { + StoreFileManager sfm = storeEngine.getStoreFileManager(); + Collection currentFiles = sfm.getStorefiles(); + if (currentFiles == null) currentFiles = new ArrayList(0); + + Collection newFiles = fs.getStoreFiles(getColumnFamilyName()); + if (newFiles == null) newFiles = new ArrayList(0); + + HashMap currentFilesSet = new HashMap(currentFiles.size()); + for (StoreFile sf : currentFiles) { + currentFilesSet.put(sf.getFileInfo(), sf); + } + HashSet newFilesSet = new HashSet(newFiles); + + Set toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); + Set toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); + + if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { + return; + } + + LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString() + + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); + + Set toBeRemovedStoreFiles = new HashSet(toBeRemovedFiles.size()); + for (StoreFileInfo sfi : toBeRemovedFiles) { + toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); + } + + // try to open the files + List openedFiles = openStoreFiles(toBeAddedFiles); + + // propogate the file changes to the underlying store file manager + replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception + + // Advance the memstore read point to be at least the new store files seqIds so that + // readers might pick it up. This assumes that the store is not getting any writes (otherwise + // in-flight transactions might be made visible) + if (!toBeAddedFiles.isEmpty()) { + region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId()); + } + + // notify scanners, close file readers, and recompute store size + completeCompaction(toBeRemovedStoreFiles, false); + } + private StoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); return createStoreFileAndReader(info); @@ -1099,7 +1158,7 @@ public class HStore implements Store { writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact); // Archive old files & update store size. + completeCompaction(filesToCompact, true); // Archive old files & update store size. } finally { finishCompactionRequest(cr); } @@ -1153,7 +1212,8 @@ public class HStore implements Store { this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); } - private void replaceStoreFiles(final Collection compactedFiles, + @VisibleForTesting + void replaceStoreFiles(final Collection compactedFiles, final Collection result) throws IOException { this.lock.writeLock().lock(); try { @@ -1300,7 +1360,7 @@ public class HStore implements Store { this.getCoprocessorHost().postCompact(this, sf, null); } replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); - completeCompaction(filesToCompact); + completeCompaction(filesToCompact, true); } } finally { synchronized (filesCompacting) { @@ -1500,7 +1560,7 @@ public class HStore implements Store { } } - /* + /** *

It works by processing a compaction that's been written to disk. * *

It is usually invoked at the end of a compaction, but might also be @@ -1517,6 +1577,28 @@ public class HStore implements Store { */ @VisibleForTesting protected void completeCompaction(final Collection compactedFiles) + throws IOException { + completeCompaction(compactedFiles, true); + } + + + /** + *

It works by processing a compaction that's been written to disk. + * + *

It is usually invoked at the end of a compaction, but might also be + * invoked at HStore startup, if the prior execution died midway through. + * + *

Moving the compacted TreeMap into place means: + *

+   * 1) Unload all replaced StoreFile, close and collect list to delete.
+   * 2) Compute new store size
+   * 
+ * + * @param compactedFiles list of files that were compacted + * @param newFile StoreFile that is the result of the compaction + */ + @VisibleForTesting + protected void completeCompaction(final Collection compactedFiles, boolean removeFiles) throws IOException { try { // Do not delete old store files until we have sent out notification of @@ -1531,7 +1613,9 @@ public class HStore implements Store { for (StoreFile compactedFile : compactedFiles) { compactedFile.closeReader(true); } - this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); + if (removeFiles) { + this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); + } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed removing compacted files in " + this + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 2e508f58e82..2d247e9c6f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -35,13 +35,13 @@ import org.apache.hadoop.hbase.util.ClassSize; @InterfaceAudience.Private public class MultiVersionConsistencyControl { private static final long NO_WRITE_NUMBER = 0; - private volatile long memstoreRead = 0; + private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); // This is the pending queue of writes. private final LinkedList writeQueue = new LinkedList(); - + /** * Default constructor. Initializes the memstoreRead/Write points to 0. */ @@ -60,14 +60,14 @@ public class MultiVersionConsistencyControl { } /** - * + * * @param initVal The value we used initially and expected it'll be reset later * @return WriteEntry instance. */ WriteEntry beginMemstoreInsert() { return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); } - + /** * Get a mvcc write number before an actual one(its log sequence Id) being assigned * @param sequenceId @@ -83,9 +83,9 @@ public class MultiVersionConsistencyControl { // changes touch same row key // If for any reason, the bumped value isn't reset due to failure situations, we'll reset // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all - return sequenceId.incrementAndGet() + 1000000000; + return sequenceId.incrementAndGet() + 1000000000; } - + /** * This function starts a MVCC transaction with current region's log change sequence number. Since * we set change sequence number when flushing current change to WAL(late binding), the flush @@ -126,7 +126,7 @@ public class MultiVersionConsistencyControl { } waitForPreviousTransactionsComplete(e); } - + /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the * end of this call, the global read point is at least as large as the write point of the passed @@ -183,6 +183,18 @@ public class MultiVersionConsistencyControl { return false; } + /** + * Advances the current read point to be given seqNum if it is smaller than + * that. + */ + void advanceMemstoreReadPointIfNeeded(long seqNum) { + synchronized (writeQueue) { + if (this.memstoreRead < seqNum) { + memstoreRead = seqNum; + } + } + } + /** * Wait for all previous MVCC transactions complete */ @@ -190,7 +202,7 @@ public class MultiVersionConsistencyControl { WriteEntry w = beginMemstoreInsert(); waitForPreviousTransactionsComplete(w); } - + public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; WriteEntry w = waitedEntry; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fd73f2fcf8b..280a1b81733 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -27,10 +27,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; @@ -357,4 +357,13 @@ public interface Store extends HeapSize, StoreConfigInformation { * @return Whether this store has too many store files. */ boolean hasTooManyStoreFiles(); + + /** + * Checks the underlying store files, and opens the files that have not + * been opened, and removes the store file readers for store files no longer + * available. Mainly used by secondary region replicas to keep up to date with + * the primary region files. + * @throws IOException + */ + void refreshStoreFiles() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 28e7b1b0cbd..28ccf5ae35a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils; * Describe a StoreFile (hfile, reference, link) */ @InterfaceAudience.Private -public class StoreFileInfo { +public class StoreFileInfo implements Comparable { public static final Log LOG = LogFactory.getLog(StoreFileInfo.class); /** @@ -403,4 +403,27 @@ public class StoreFileInfo { } return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length); } + + @Override + public boolean equals(Object that) { + if (that == null) { + return false; + } + + if (that instanceof StoreFileInfo) { + return this.compareTo((StoreFileInfo)that) == 0; + } + + return false; + }; + + @Override + public int compareTo(StoreFileInfo o) { + return this.fileStatus.compareTo(o.fileStatus); + } + + @Override + public int hashCode() { + return this.fileStatus.hashCode(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java new file mode 100644 index 00000000000..aeec21f4f23 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.StringUtils; + +/** + * A chore for refreshing the store files for secondary regions hosted in the region server. + * + * This chore should run periodically with a shorter interval than HFile TTL + * ("hbase.master.hfilecleaner.ttl", default 5 minutes). + * It ensures that if we cannot refresh files longer than that amount, the region + * will stop serving read requests because the referenced files might have been deleted (by the + * primary region). + */ +public class StorefileRefresherChore extends Chore { + + private static final Log LOG = LogFactory.getLog(StorefileRefresherChore.class); + + /** + * The period (in milliseconds) for refreshing the store files for the secondary regions. + */ + static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD + = "hbase.regionserver.storefile.refresh.period"; + static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default + + private HRegionServer regionServer; + private long hfileTtl; + private int period; + + //ts of last time regions store files are refreshed + private Map lastRefreshTimes; // encodedName -> long + + public StorefileRefresherChore(int period, HRegionServer regionServer, Stoppable stoppable) { + super("StorefileRefresherChore", period, stoppable); + this.period = period; + this.regionServer = regionServer; + this.hfileTtl = this.regionServer.getConfiguration().getLong( + TimeToLiveHFileCleaner.TTL_CONF_KEY, TimeToLiveHFileCleaner.DEFAULT_TTL); + if (period > hfileTtl / 2) { + throw new RuntimeException(REGIONSERVER_STOREFILE_REFRESH_PERIOD + + " should be set smaller than half of " + TimeToLiveHFileCleaner.TTL_CONF_KEY); + } + lastRefreshTimes = new HashMap(); + } + + @Override + protected void chore() { + for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { + if (!r.writestate.isReadOnly()) { + // skip checking for this region if it can accept writes + continue; + } + String encodedName = r.getRegionInfo().getEncodedName(); + long time = EnvironmentEdgeManager.currentTimeMillis(); + if (!lastRefreshTimes.containsKey(encodedName)) { + lastRefreshTimes.put(encodedName, time); + } + try { + for (Store store : r.getStores().values()) { + // TODO: some stores might see new data from flush, while others do not which + // MIGHT break atomic edits across column families. We can fix this with setting + // mvcc read numbers that we know every store has seen + store.refreshStoreFiles(); + } + } catch (IOException ex) { + LOG.warn("Exception while trying to refresh store files for region:" + r.getRegionInfo() + + ", exception:" + StringUtils.stringifyException(ex)); + + // Store files have a TTL in the archive directory. If we fail to refresh for that long, we stop serving reads + if (isRegionStale(encodedName, time)) { + r.setReadsEnabled(false); // stop serving reads + } + continue; + } + lastRefreshTimes.put(encodedName, time); + r.setReadsEnabled(true); // restart serving reads + } + + // remove closed regions + for (String encodedName : lastRefreshTimes.keySet()) { + if (regionServer.getFromOnlineRegions(encodedName) == null) { + lastRefreshTimes.remove(encodedName); + } + } + } + + protected boolean isRegionStale(String encodedName, long time) { + long lastRefreshTime = lastRefreshTimes.get(encodedName); + return time - lastRefreshTime > hfileTtl - period; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java new file mode 100644 index 00000000000..69708db7587 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Similar to {@link RegionReplicaUtil} but for the server side + */ +public class ServerRegionReplicaUtil extends RegionReplicaUtil { + + /** + * Returns the regionInfo object to use for interacting with the file system. + * @return An HRegionInfo object to interact with the filesystem + */ + public static HRegionInfo getRegionInfoForFs(HRegionInfo regionInfo) { + if (regionInfo == null) { + return null; + } + return RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo); + } + + /** + * Returns whether this region replica can accept writes. + * @param region the HRegion object + * @return whether the replica is read only + */ + public static boolean isReadOnly(HRegion region) { + return region.getTableDesc().isReadOnly() + || !isDefaultReplica(region.getRegionInfo()); + } + + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 38242948acd..e56b8d09aca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1815,6 +1815,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } + public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException { + for (int i = startRow; i < endRow; i++) { + byte[] data = Bytes.toBytes(String.valueOf(i)); + Delete delete = new Delete(data); + delete.deleteFamily(f); + t.delete(delete); + } + } + /** * Return the number of rows in the given table. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a0538302cc4..8e836bb4a50 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -245,7 +245,7 @@ public class TestHRegion { throws IOException { super(fs, rootDir, logName, conf); } - + void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { this.storeFlushCtx = storeFlushCtx; } @@ -256,18 +256,18 @@ public class TestHRegion { super.sync(txid); } } - + FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyHLog faultyLog = new MyFaultyHLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); HRegion region = initHRegion(tableName, null, null, name.getMethodName(), CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); - + Store store = region.getStore(COLUMN_FAMILY_BYTES); // Get some random bytes. byte [] value = Bytes.toBytes(name.getMethodName()); faultyLog.setStoreFlushCtx(store.createFlushContext(12345)); - + Put put = new Put(value); put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); faultyLog.setFailureType(FaultyHLog.FailureType.SYNC); @@ -284,7 +284,7 @@ public class TestHRegion { assertTrue("flushable size should be zero, but it is " + sz, sz == 0); HRegion.closeHRegion(region); } - + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: @@ -2244,7 +2244,7 @@ public class TestHRegion { /** * This method tests https://issues.apache.org/jira/browse/HBASE-2516. - * + * * @throws IOException */ @Test @@ -2800,7 +2800,7 @@ public class TestHRegion { /** * Added for HBASE-5416 - * + * * Here we test scan optimization when only subset of CFs are used in filter * conditions. */ @@ -2869,7 +2869,7 @@ public class TestHRegion { /** * HBASE-5416 - * + * * Test case when scan limits amount of KVs returned on each next() call. */ @Test @@ -2967,7 +2967,7 @@ public class TestHRegion { // //////////////////////////////////////////////////////////////////////////// /** * Splits twice and verifies getting from each of the split regions. - * + * * @throws Exception */ @Test @@ -3099,7 +3099,7 @@ public class TestHRegion { * Flushes the cache in a thread while scanning. The tests verify that the * scan is coherent - e.g. the returned results are always of the same or * later update as the previous results. - * + * * @throws IOException * scan / compact * @throws InterruptedException @@ -3221,7 +3221,7 @@ public class TestHRegion { /** * Writes very wide records and scans for the latest every time.. Flushes and * compacts the region every now and then to keep things realistic. - * + * * @throws IOException * by flush / scan / compaction * @throws InterruptedException @@ -3386,7 +3386,7 @@ public class TestHRegion { /** * Writes very wide records and gets the latest row every time.. Flushes and * compacts the region aggressivly to catch issues. - * + * * @throws IOException * by flush / scan / compaction * @throws InterruptedException @@ -3786,7 +3786,7 @@ public class TestHRegion { /** * Testcase to check state of region initialization task set to ABORTED or not * if any exceptions during initialization - * + * * @throws Exception */ @Test @@ -4211,7 +4211,116 @@ public class TestHRegion { this.region = null; } + @Test + public void testRegionReplicaSecondary() throws IOException { + // create a primary region, load some data and flush + // create a secondary region, and do a get against that + Path rootDir = new Path(dir + "testRegionReplicaSecondary"); + + byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") + }; + byte[] cq = Bytes.toBytes("cq"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary")); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + long time = System.currentTimeMillis(); + HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + HRegion primaryRegion = null, secondaryRegion = null; + + try { + primaryRegion = HRegion.createHRegion(primaryHri, + rootDir, TEST_UTIL.getConfiguration(), htd); + + // load some data + putData(primaryRegion, 0, 1000, cq, families); + + // flush region + primaryRegion.flushcache(); + + // open secondary region + secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); + + verifyData(secondaryRegion, 0, 1000, cq, families); + } finally { + if (primaryRegion != null) { + HRegion.closeHRegion(primaryRegion); + } + if (secondaryRegion != null) { + HRegion.closeHRegion(secondaryRegion); + } + } + } + + @Test + public void testRegionReplicaSecondaryIsReadOnly() throws IOException { + // create a primary region, load some data and flush + // create a secondary region, and do a put against that + Path rootDir = new Path(dir + "testRegionReplicaSecondary"); + + byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") + }; + byte[] cq = Bytes.toBytes("cq"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary")); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + long time = System.currentTimeMillis(); + HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + HRegion primaryRegion = null, secondaryRegion = null; + + try { + primaryRegion = HRegion.createHRegion(primaryHri, + rootDir, TEST_UTIL.getConfiguration(), htd); + + // load some data + putData(primaryRegion, 0, 1000, cq, families); + + // flush region + primaryRegion.flushcache(); + + // open secondary region + secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); + + try { + putData(secondaryRegion, 0, 1000, cq, families); + fail("Should have thrown exception"); + } catch (IOException ex) { + // expected + } + } finally { + if (primaryRegion != null) { + HRegion.closeHRegion(primaryRegion); + } + if (secondaryRegion != null) { + HRegion.closeHRegion(secondaryRegion); + } + } + + } + private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + putData(this.region, startRow, numRows, qf, families); + } + + private void putData(HRegion region, + int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { for (int i = startRow; i < startRow + numRows; i++) { Put put = new Put(Bytes.toBytes("" + i)); put.setDurability(Durability.SKIP_WAL); @@ -4254,13 +4363,13 @@ public class TestHRegion { /* * Assert first value in the passed region is firstValue. - * + * * @param r - * + * * @param fs - * + * * @param firstValue - * + * * @throws IOException */ private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java new file mode 100644 index 00000000000..cc2235fe19e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.ServiceException; + +/** + * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole + * cluster. See {@link TestRegionServerNoMaster}. + */ +@Category(MediumTests.class) +public class TestRegionReplicas { + private static final Log LOG = LogFactory.getLog(TestRegionReplicas.class); + + private static final int NB_SERVERS = 1; + private static HTable table; + private static final byte[] row = "TestRegionReplicas".getBytes(); + + private static HRegionInfo hriPrimary; + private static HRegionInfo hriSecondary; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final byte[] f = HConstants.CATALOG_FAMILY; + + @BeforeClass + public static void before() throws Exception { + HTU.startMiniCluster(NB_SERVERS); + final byte[] tableName = Bytes.toBytes(TestRegionReplicas.class.getSimpleName()); + + // Create table then get the single region for our new table. + table = HTU.createTable(tableName, f); + + hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); + + // mock a secondary region info to open + hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), + hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); + + // No master + TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); + } + + @AfterClass + public static void afterClass() throws Exception { + table.close(); + HTU.shutdownMiniCluster(); + } + + @After + public void after() throws Exception { + // Clean the state if the test failed before cleaning the znode + // It does not manage all bad failures, so if there are multiple failures, only + // the first one should be looked at. + ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary); + } + + private HRegionServer getRS() { + return HTU.getMiniHBaseCluster().getRegionServer(0); + } + + private void openRegion(HRegionInfo hri) throws Exception { + ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); + // first version is '0' + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null); + AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); + Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); + Assert.assertTrue(responseOpen.getOpeningState(0). + equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED)); + checkRegionIsOpened(hri.getEncodedName()); + } + + private void closeRegion(HRegionInfo hri) throws Exception { + ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); + + AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(getRS().getServerName(), + hri.getEncodedName(), true); + AdminProtos.CloseRegionResponse responseClose = getRS().getRSRpcServices().closeRegion(null, crr); + Assert.assertTrue(responseClose.getClosed()); + + checkRegionIsClosed(hri.getEncodedName()); + + ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName()); + } + + private void checkRegionIsOpened(String encodedRegionName) throws Exception { + + while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + Thread.sleep(1); + } + + Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); + + Assert.assertTrue( + ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), encodedRegionName, getRS().getServerName())); + } + + + private void checkRegionIsClosed(String encodedRegionName) throws Exception { + + while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + Thread.sleep(1); + } + + try { + Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); + } catch (NotServingRegionException expected) { + // That's how it work: if the region is closed we have an exception. + } + + // We don't delete the znode here, because there is not always a znode. + } + + @Test(timeout = 60000) + public void testOpenRegionReplica() throws Exception { + openRegion(hriSecondary); + try { + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + } finally { + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(hriSecondary); + } + } + + /** Tests that the meta location is saved for secondary regions */ + @Test(timeout = 60000) + public void testRegionReplicaUpdatesMetaLocation() throws Exception { + openRegion(hriSecondary); + HTable meta = null; + try { + meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME); + TestMetaReaderEditor.assertMetaLocation(meta, hriPrimary.getRegionName() + , getRS().getServerName(), -1, 1, false); + } finally { + if (meta != null ) meta.close(); + closeRegion(hriSecondary); + } + } + + @Test(timeout = 60000) + public void testRegionReplicaGets() throws Exception { + try { + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + // flush so that region replica can read + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + openRegion(hriSecondary); + + // first try directly against region + HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); + assertGet(region, 42, true); + + assertGetRpc(hriSecondary, 42, true); + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); + closeRegion(hriSecondary); + } + } + + private void assertGet(HRegion region, int value, boolean expect) throws IOException { + byte[] row = Bytes.toBytes(String.valueOf(value)); + Get get = new Get(row); + Result result = region.get(get); + if (expect) { + Assert.assertArrayEquals(row, result.getValue(f, null)); + } else { + result.isEmpty(); + } + } + + // build a mock rpc + private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException { + byte[] row = Bytes.toBytes(String.valueOf(value)); + Get get = new Get(row); + ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); + ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); + Result result = ProtobufUtil.toResult(getResp.getResult()); + if (expect) { + Assert.assertArrayEquals(row, result.getValue(f, null)); + } else { + result.isEmpty(); + } + } + + private void restartRegionServer() throws Exception { + afterClass(); + before(); + } + + @Test(timeout = 300000) + public void testRefreshStoreFiles() throws Exception { + // enable store file refreshing + final int refreshPeriod = 2000; // 2 sec + HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); + HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); + // restart the region server so that it starts the refresher chore + restartRegionServer(); + + try { + LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); + openRegion(hriSecondary); + + //load some data to primary + LOG.info("Loading data to primary region"); + HTU.loadNumericRows(table, f, 0, 1000); + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + // flush so that region replica can read + LOG.info("Flushing primary region"); + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + // ensure that chore is run + LOG.info("Sleeping for " + (4 * refreshPeriod)); + Threads.sleep(4 * refreshPeriod); + + LOG.info("Checking results from secondary region replica"); + HRegion secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName()); + Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); + + assertGet(secondaryRegion, 42, true); + assertGetRpc(hriSecondary, 42, true); + assertGetRpc(hriSecondary, 1042, false); + + //load some data to primary + HTU.loadNumericRows(table, f, 1000, 1100); + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + HTU.loadNumericRows(table, f, 2000, 2100); + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + // ensure that chore is run + Threads.sleep(4 * refreshPeriod); + + assertGetRpc(hriSecondary, 42, true); + assertGetRpc(hriSecondary, 1042, true); + assertGetRpc(hriSecondary, 2042, true); + + // ensure that we are see the 3 store files + Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); + + // force compaction + HTU.compact(table.getName(), true); + + long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod; + while (System.currentTimeMillis() < wakeUpTime) { + assertGetRpc(hriSecondary, 42, true); + assertGetRpc(hriSecondary, 1042, true); + assertGetRpc(hriSecondary, 2042, true); + Threads.sleep(10); + } + + // ensure that we see the compacted file only + Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); + + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); + closeRegion(hriSecondary); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index 7cd7ed16119..cc87eb32cd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -85,6 +87,11 @@ public class TestRegionServerNoMaster { hri = table.getRegionLocation(row, false).getRegionInfo(); regionName = hri.getRegionName(); + stopMasterAndAssignMeta(HTU); + } + + public static void stopMasterAndAssignMeta(HBaseTestingUtility HTU) + throws NodeExistsException, KeeperException, IOException, InterruptedException { // No master HTU.getHBaseCluster().getMaster().stopMaster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 6929222201e..9d31e7db044 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,6 +20,10 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.lang.ref.SoftReference; @@ -84,6 +88,8 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; +import com.google.common.collect.Lists; + /** * Test class for the Store */ @@ -137,7 +143,7 @@ public class TestStore { } private void init(String methodName) throws IOException { - init(methodName, HBaseConfiguration.create()); + init(methodName, TEST_UTIL.getConfiguration()); } private void init(String methodName, Configuration conf) @@ -194,6 +200,7 @@ public class TestStore { // Inject our faulty LocalFileSystem conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -330,7 +337,7 @@ public class TestStore { FileSystem fs = FileSystem.get(conf); // Initialize region init(name.getMethodName(), conf); - + int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #"+i); @@ -350,12 +357,12 @@ public class TestStore { lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); } - - private static long getLowestTimeStampFromFS(FileSystem fs, + + private static long getLowestTimeStampFromFS(FileSystem fs, final Collection candidates) throws IOException { long minTs = Long.MAX_VALUE; if (candidates.isEmpty()) { - return minTs; + return minTs; } Path[] p = new Path[candidates.size()]; int i = 0; @@ -363,7 +370,7 @@ public class TestStore { p[i] = sf.getPath(); ++i; } - + FileStatus[] stats = fs.listStatus(p); if (stats == null || stats.length == 0) { return minTs; @@ -724,6 +731,7 @@ public class TestStore { conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -790,6 +798,7 @@ public class TestStore { overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); } + @Override public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { @@ -968,4 +977,102 @@ public class TestStore { Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); } + + private void addStoreFile() throws IOException { + StoreFile f = this.store.getStorefiles().iterator().next(); + Path storedir = f.getPath().getParent(); + long seqid = this.store.getMaxSequenceId(); + Configuration c = TEST_UTIL.getConfiguration(); + FileSystem fs = FileSystem.get(c); + HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); + StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c), + fs) + .withOutputDir(storedir) + .withFileContext(fileContext) + .build(); + w.appendMetadata(seqid + 1, false); + w.close(); + LOG.info("Added store file:" + w.getPath()); + } + + private void archiveStoreFile(int index) throws IOException { + Collection files = this.store.getStorefiles(); + StoreFile sf = null; + Iterator it = files.iterator(); + for (int i = 0; i <= index; i++) { + sf = it.next(); + } + store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); + } + + public void testRefreshStoreFiles() throws Exception { + init(name.getMethodName()); + + assertEquals(0, this.store.getStorefilesCount()); + + // add some data, flush + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + flush(1); + assertEquals(1, this.store.getStorefilesCount()); + + // add one more file + addStoreFile(); + + assertEquals(1, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(2, this.store.getStorefilesCount()); + + // add three more files + addStoreFile(); + addStoreFile(); + addStoreFile(); + + assertEquals(2, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(5, this.store.getStorefilesCount()); + + archiveStoreFile(0); + + assertEquals(5, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(4, this.store.getStorefilesCount()); + + archiveStoreFile(0); + archiveStoreFile(1); + archiveStoreFile(2); + + assertEquals(4, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(1, this.store.getStorefilesCount()); + + archiveStoreFile(0); + store.refreshStoreFiles(); + assertEquals(0, this.store.getStorefilesCount()); + } + + @SuppressWarnings("unchecked") + public void testRefreshStoreFilesNotChanged() throws IOException { + init(name.getMethodName()); + + assertEquals(0, this.store.getStorefilesCount()); + + // add some data, flush + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + flush(1); + // add one more file + addStoreFile(); + + HStore spiedStore = spy(store); + + // call first time after files changed + spiedStore.refreshStoreFiles(); + assertEquals(2, this.store.getStorefilesCount()); + verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class)); + + // call second time + spiedStore.refreshStoreFiles(); + + //ensure that replaceStoreFiles is not called if files are not refreshed + verify(spiedStore, times(0)).replaceStoreFiles(null, null); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java new file mode 100644 index 00000000000..e7cda8f925e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.StoppableImplementation; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestStoreFileRefresherChore { + + private HBaseTestingUtility TEST_UTIL; + private Path testDir; + + @Before + public void setUp() { + TEST_UTIL = new HBaseTestingUtility(); + testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore"); + } + + private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + // Set default to be three versions. + hcd.setMaxVersions(Integer.MAX_VALUE); + htd.addFamily(hcd); + } + return htd; + } + + static class FailingHRegionFileSystem extends HRegionFileSystem { + boolean fail = false; + FailingHRegionFileSystem(Configuration conf, FileSystem fs, Path tableDir, HRegionInfo regionInfo) { + super(conf, fs, tableDir, regionInfo); + } + + @Override + public Collection getStoreFiles(String familyName) throws IOException { + if (fail) { + throw new IOException("simulating FS failure"); + } + return super.getStoreFiles(familyName); + } + } + + private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + Path tableDir = new Path(testDir, htd.getTableName().getNameAsString()); + + HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId); + + HRegionFileSystem fs = new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info); + HRegion region = new HRegion(fs, HLogFactory.createHLog(fs.getFileSystem(), + tableDir, "log_" + replicaId, conf), conf, htd, null); + + region.initialize(); + + return region; + } + + private void putData(HRegion region, int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + for (int i = startRow; i < startRow + numRows; i++) { + Put put = new Put(Bytes.toBytes("" + i)); + put.setDurability(Durability.SKIP_WAL); + for (byte[] family : families) { + put.add(family, qf, null); + } + region.put(put); + } + } + + private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) + throws IOException { + for (int i = startRow; i < startRow + numRows; i++) { + byte[] row = Bytes.toBytes("" + i); + Get get = new Get(row); + for (byte[] family : families) { + get.addColumn(family, qf); + } + Result result = newReg.get(get); + Cell[] raw = result.rawCells(); + assertEquals(families.length, result.size()); + for (int j = 0; j < families.length; j++) { + assertTrue(CellUtil.matchingRow(raw[j], row)); + assertTrue(CellUtil.matchingFamily(raw[j], families[j])); + assertTrue(CellUtil.matchingQualifier(raw[j], qf)); + } + } + } + + static class StaleStorefileRefresherChore extends StorefileRefresherChore { + boolean isStale = false; + public StaleStorefileRefresherChore(int period, HRegionServer regionServer, + Stoppable stoppable) { + super(period, regionServer, stoppable); + } + @Override + protected boolean isRegionStale(String encodedName, long time) { + return isStale; + } + } + + @Test (timeout = 60000) + public void testIsStale() throws IOException { + int period = 0; + byte[][] families = new byte[][] {Bytes.toBytes("cf")}; + byte[] qf = Bytes.toBytes("cq"); + + HRegionServer regionServer = mock(HRegionServer.class); + List regions = new ArrayList(); + when(regionServer.getOnlineRegionsLocalContext()).thenReturn(regions); + when(regionServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); + + HTableDescriptor htd = getTableDesc(TableName.valueOf("testIsStale"), families); + HRegion primary = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 0); + HRegion replica1 = initHRegion(htd, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1); + regions.add(primary); + regions.add(replica1); + + StaleStorefileRefresherChore chore = new StaleStorefileRefresherChore(period, regionServer, new StoppableImplementation()); + + // write some data to primary and flush + putData(primary, 0, 100, qf, families); + primary.flushcache(); + verifyData(primary, 0, 100, qf, families); + + try { + verifyData(replica1, 0, 100, qf, families); + Assert.fail("should have failed"); + } catch(AssertionError ex) { + // expected + } + chore.chore(); + verifyData(replica1, 0, 100, qf, families); + + // simulate an fs failure where we cannot refresh the store files for the replica + ((FailingHRegionFileSystem)replica1.getRegionFileSystem()).fail = true; + + // write some more data to primary and flush + putData(primary, 100, 100, qf, families); + primary.flushcache(); + verifyData(primary, 0, 200, qf, families); + + chore.chore(); // should not throw ex, but we cannot refresh the store files + + verifyData(replica1, 0, 100, qf, families); + try { + verifyData(replica1, 100, 100, qf, families); + Assert.fail("should have failed"); + } catch(AssertionError ex) { + // expected + } + + chore.isStale = true; + chore.chore(); //now after this, we cannot read back any value + try { + verifyData(replica1, 0, 100, qf, families); + Assert.fail("should have failed with IOException"); + } catch(IOException ex) { + // expected + } + } +}