HBASE-10859 Use HFileLink in opening region files from secondaries
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ad05de172f
commit
7d247524b3
|
@ -720,25 +720,29 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
status.setStatus("Writing region info on filesystem");
|
status.setStatus("Writing region info on filesystem");
|
||||||
fs.checkRegionInfoOnFilesystem();
|
fs.checkRegionInfoOnFilesystem();
|
||||||
|
|
||||||
// Remove temporary data left over from old regions
|
|
||||||
status.setStatus("Cleaning up temporary data from old regions");
|
|
||||||
fs.cleanupTempDir();
|
|
||||||
|
|
||||||
// Initialize all the HStores
|
// Initialize all the HStores
|
||||||
status.setStatus("Initializing all the Stores");
|
status.setStatus("Initializing all the Stores");
|
||||||
long maxSeqId = initializeRegionStores(reporter, status);
|
long maxSeqId = initializeRegionStores(reporter, status);
|
||||||
|
|
||||||
status.setStatus("Cleaning up detritus from prior splits");
|
|
||||||
// Get rid of any splits or merges that were lost in-progress. Clean out
|
|
||||||
// these directories here on open. We may be opening a region that was
|
|
||||||
// being split but we crashed in the middle of it all.
|
|
||||||
fs.cleanupAnySplitDetritus();
|
|
||||||
fs.cleanupMergesDir();
|
|
||||||
|
|
||||||
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
|
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
|
||||||
this.writestate.flushRequested = false;
|
this.writestate.flushRequested = false;
|
||||||
this.writestate.compacting = 0;
|
this.writestate.compacting = 0;
|
||||||
|
|
||||||
|
if (this.writestate.writesEnabled) {
|
||||||
|
// Remove temporary data left over from old regions
|
||||||
|
status.setStatus("Cleaning up temporary data from old regions");
|
||||||
|
fs.cleanupTempDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.writestate.writesEnabled) {
|
||||||
|
status.setStatus("Cleaning up detritus from prior splits");
|
||||||
|
// Get rid of any splits or merges that were lost in-progress. Clean out
|
||||||
|
// these directories here on open. We may be opening a region that was
|
||||||
|
// being split but we crashed in the middle of it all.
|
||||||
|
fs.cleanupAnySplitDetritus();
|
||||||
|
fs.cleanupMergesDir();
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize split policy
|
// Initialize split policy
|
||||||
this.splitPolicy = RegionSplitPolicy.create(this, conf);
|
this.splitPolicy = RegionSplitPolicy.create(this, conf);
|
||||||
|
|
||||||
|
@ -832,9 +836,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Recover any edits if available.
|
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
||||||
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
|
// Recover any edits if available.
|
||||||
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
|
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
|
||||||
|
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
|
||||||
|
}
|
||||||
maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
|
maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
|
||||||
mvcc.initialize(maxSeqId);
|
mvcc.initialize(maxSeqId);
|
||||||
return maxSeqId;
|
return maxSeqId;
|
||||||
|
|
|
@ -205,8 +205,10 @@ public class HRegionFileSystem {
|
||||||
LOG.warn("Invalid StoreFile: " + status.getPath());
|
LOG.warn("Invalid StoreFile: " + status.getPath());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
|
||||||
|
regionInfoForFs, familyName, status);
|
||||||
|
storeFiles.add(info);
|
||||||
|
|
||||||
storeFiles.add(new StoreFileInfo(this.conf, this.fs, status));
|
|
||||||
}
|
}
|
||||||
return storeFiles;
|
return storeFiles;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||||
import org.apache.hadoop.hbase.io.HFileLink;
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||||
import org.apache.hadoop.hbase.io.Reference;
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
|
@ -132,6 +132,22 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a Store File Info from an HFileLink
|
||||||
|
* @param conf the {@link Configuration} to use
|
||||||
|
* @param fs The current file system to use.
|
||||||
|
* @param fileStatus The {@link FileStatus} of the file
|
||||||
|
*/
|
||||||
|
public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
|
||||||
|
final HFileLink link)
|
||||||
|
throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.fileStatus = fileStatus;
|
||||||
|
// HFileLink
|
||||||
|
this.reference = null;
|
||||||
|
this.link = link;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the region coprocessor env.
|
* Sets the region coprocessor env.
|
||||||
* @param coprocessorHost
|
* @param coprocessorHost
|
||||||
|
@ -193,11 +209,8 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
||||||
status = fileStatus;
|
status = fileStatus;
|
||||||
}
|
}
|
||||||
long length = status.getLen();
|
long length = status.getLen();
|
||||||
if (this.reference != null) {
|
hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
|
||||||
hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
|
|
||||||
} else {
|
|
||||||
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
|
|
||||||
}
|
|
||||||
StoreFile.Reader reader = null;
|
StoreFile.Reader reader = null;
|
||||||
if (this.coprocessorHost != null) {
|
if (this.coprocessorHost != null) {
|
||||||
reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
|
reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
|
||||||
|
@ -223,6 +236,27 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
||||||
*/
|
*/
|
||||||
public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
|
public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
// guard agains the case where we get the FileStatus from link, but by the time we
|
||||||
|
// call compute the file is moved again
|
||||||
|
if (this.link != null) {
|
||||||
|
FileNotFoundException exToThrow = null;
|
||||||
|
for (int i = 0; i < this.link.getLocations().length; i++) {
|
||||||
|
try {
|
||||||
|
return computeHDFSBlocksDistributionInternal(fs);
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
// try the other location
|
||||||
|
exToThrow = ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw exToThrow;
|
||||||
|
} else {
|
||||||
|
return computeHDFSBlocksDistributionInternal(fs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HDFSBlocksDistribution computeHDFSBlocksDistributionInternal(final FileSystem fs)
|
||||||
|
throws IOException {
|
||||||
FileStatus status = getReferencedFileStatus(fs);
|
FileStatus status = getReferencedFileStatus(fs);
|
||||||
if (this.reference != null) {
|
if (this.reference != null) {
|
||||||
return computeRefFileHDFSBlockDistribution(fs, reference, status);
|
return computeRefFileHDFSBlockDistribution(fs, reference, status);
|
||||||
|
@ -240,8 +274,17 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
||||||
FileStatus status;
|
FileStatus status;
|
||||||
if (this.reference != null) {
|
if (this.reference != null) {
|
||||||
if (this.link != null) {
|
if (this.link != null) {
|
||||||
// HFileLink Reference
|
FileNotFoundException exToThrow = null;
|
||||||
status = link.getFileStatus(fs);
|
for (int i = 0; i < this.link.getLocations().length; i++) {
|
||||||
|
// HFileLink Reference
|
||||||
|
try {
|
||||||
|
return link.getFileStatus(fs);
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
// try the other location
|
||||||
|
exToThrow = ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw exToThrow;
|
||||||
} else {
|
} else {
|
||||||
// HFile Reference
|
// HFile Reference
|
||||||
Path referencePath = getReferredToFile(this.getPath());
|
Path referencePath = getReferredToFile(this.getPath());
|
||||||
|
@ -249,8 +292,17 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (this.link != null) {
|
if (this.link != null) {
|
||||||
// HFileLink
|
FileNotFoundException exToThrow = null;
|
||||||
status = link.getFileStatus(fs);
|
for (int i = 0; i < this.link.getLocations().length; i++) {
|
||||||
|
// HFileLink
|
||||||
|
try {
|
||||||
|
return link.getFileStatus(fs);
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
// try the other location
|
||||||
|
exToThrow = ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw exToThrow;
|
||||||
} else {
|
} else {
|
||||||
status = this.fileStatus;
|
status = this.fileStatus;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.util;
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Similar to {@link RegionReplicaUtil} but for the server side
|
* Similar to {@link RegionReplicaUtil} but for the server side
|
||||||
|
@ -48,5 +55,39 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
|
||||||
|| !isDefaultReplica(region.getRegionInfo());
|
|| !isDefaultReplica(region.getRegionInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether to replay the recovered edits to flush the results.
|
||||||
|
* Currently secondary region replicas do not replay the edits, since it would
|
||||||
|
* cause flushes which might affect the primary region. Primary regions even opened
|
||||||
|
* in read only mode should replay the edits.
|
||||||
|
* @param region the HRegion object
|
||||||
|
* @return whether recovered edits should be replayed.
|
||||||
|
*/
|
||||||
|
public static boolean shouldReplayRecoveredEdits(HRegion region) {
|
||||||
|
return isDefaultReplica(region.getRegionInfo());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a StoreFileInfo from the given FileStatus. Secondary replicas refer to the
|
||||||
|
* files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This
|
||||||
|
* way ensures that the secondary will be able to continue reading the store files even if
|
||||||
|
* they are moved to archive after compaction
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
|
||||||
|
HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// if this is a primary region, just return the StoreFileInfo constructed from path
|
||||||
|
if (regionInfo.equals(regionInfoForFs)) {
|
||||||
|
return new StoreFileInfo(conf, fs, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
// else create a store file link. The link file does not exists on filesystem though.
|
||||||
|
HFileLink link = new HFileLink(conf,
|
||||||
|
HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName()
|
||||||
|
, familyName, status.getPath().getName()));
|
||||||
|
return new StoreFileInfo(conf, fs, status, link);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,14 +231,7 @@ public class TestReplicasClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushRegion(HRegionInfo regionInfo) throws IOException {
|
private void flushRegion(HRegionInfo regionInfo) throws IOException {
|
||||||
for (RegionServerThread rst : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
|
TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
|
||||||
HRegion region = rst.getRegionServer().getRegionByEncodedName(regionInfo.getEncodedName());
|
|
||||||
if (region != null) {
|
|
||||||
region.flushcache();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new IOException("Region to flush cannot be found");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -4216,6 +4216,7 @@ public class TestHRegion {
|
||||||
// create a primary region, load some data and flush
|
// create a primary region, load some data and flush
|
||||||
// create a secondary region, and do a get against that
|
// create a secondary region, and do a get against that
|
||||||
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
|
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
|
||||||
|
|
||||||
byte[][] families = new byte[][] {
|
byte[][] families = new byte[][] {
|
||||||
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
|
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
|
||||||
|
@ -4265,6 +4266,7 @@ public class TestHRegion {
|
||||||
// create a primary region, load some data and flush
|
// create a primary region, load some data and flush
|
||||||
// create a secondary region, and do a put against that
|
// create a secondary region, and do a put against that
|
||||||
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
|
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
|
||||||
|
|
||||||
byte[][] families = new byte[][] {
|
byte[][] families = new byte[][] {
|
||||||
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
|
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
|
||||||
|
@ -4312,7 +4314,60 @@ public class TestHRegion {
|
||||||
HRegion.closeHRegion(secondaryRegion);
|
HRegion.closeHRegion(secondaryRegion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompactionFromPrimary() throws IOException {
|
||||||
|
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
|
||||||
|
|
||||||
|
byte[][] families = new byte[][] {
|
||||||
|
Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
|
||||||
|
};
|
||||||
|
byte[] cq = Bytes.toBytes("cq");
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
|
||||||
|
for (byte[] family : families) {
|
||||||
|
htd.addFamily(new HColumnDescriptor(family));
|
||||||
|
}
|
||||||
|
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
|
||||||
|
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
|
||||||
|
false, time, 0);
|
||||||
|
HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
|
||||||
|
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
|
||||||
|
false, time, 1);
|
||||||
|
|
||||||
|
HRegion primaryRegion = null, secondaryRegion = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
primaryRegion = HRegion.createHRegion(primaryHri,
|
||||||
|
rootDir, TEST_UTIL.getConfiguration(), htd);
|
||||||
|
|
||||||
|
// load some data
|
||||||
|
putData(primaryRegion, 0, 1000, cq, families);
|
||||||
|
|
||||||
|
// flush region
|
||||||
|
primaryRegion.flushcache();
|
||||||
|
|
||||||
|
// open secondary region
|
||||||
|
secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
|
||||||
|
|
||||||
|
// move the file of the primary region to the archive, simulating a compaction
|
||||||
|
Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
|
||||||
|
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
|
||||||
|
Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]);
|
||||||
|
Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
|
||||||
|
|
||||||
|
verifyData(secondaryRegion, 0, 1000, cq, families);
|
||||||
|
} finally {
|
||||||
|
if (primaryRegion != null) {
|
||||||
|
HRegion.closeHRegion(primaryRegion);
|
||||||
|
}
|
||||||
|
if (secondaryRegion != null) {
|
||||||
|
HRegion.closeHRegion(secondaryRegion);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
|
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
|
||||||
|
|
|
@ -19,6 +19,12 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor;
|
import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
|
@ -39,6 +46,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -307,4 +315,126 @@ public class TestRegionReplicas {
|
||||||
closeRegion(hriSecondary);
|
closeRegion(hriSecondary);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testFlushAndCompactionsInPrimary() throws Exception {
|
||||||
|
|
||||||
|
long runtime = 30 * 1000;
|
||||||
|
// enable store file refreshing
|
||||||
|
final int refreshPeriod = 100; // 100ms refresh is a lot
|
||||||
|
HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
|
||||||
|
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
|
||||||
|
// restart the region server so that it starts the refresher chore
|
||||||
|
restartRegionServer();
|
||||||
|
final int startKey = 0, endKey = 1000;
|
||||||
|
|
||||||
|
try {
|
||||||
|
openRegion(hriSecondary);
|
||||||
|
|
||||||
|
//load some data to primary so that reader won't fail
|
||||||
|
HTU.loadNumericRows(table, f, startKey, endKey);
|
||||||
|
TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
|
||||||
|
// ensure that chore is run
|
||||||
|
Threads.sleep(2 * refreshPeriod);
|
||||||
|
|
||||||
|
final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
|
||||||
|
for (int i=0; i < exceptions.length; i++) {
|
||||||
|
exceptions[i] = new AtomicReference<Exception>();
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable writer = new Runnable() {
|
||||||
|
int key = startKey;
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (running.get()) {
|
||||||
|
byte[] data = Bytes.toBytes(String.valueOf(key));
|
||||||
|
Put put = new Put(data);
|
||||||
|
put.add(f, null, data);
|
||||||
|
table.put(put);
|
||||||
|
key++;
|
||||||
|
if (key == endKey) key = startKey;
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn(ex);
|
||||||
|
exceptions[0].compareAndSet(null, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Runnable flusherCompactor = new Runnable() {
|
||||||
|
Random random = new Random();
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (running.get()) {
|
||||||
|
// flush or compact
|
||||||
|
if (random.nextBoolean()) {
|
||||||
|
TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
|
||||||
|
} else {
|
||||||
|
HTU.compact(table.getName(), random.nextBoolean());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn(ex);
|
||||||
|
exceptions[1].compareAndSet(null, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Runnable reader = new Runnable() {
|
||||||
|
Random random = new Random();
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (running.get()) {
|
||||||
|
// whether to do a close and open
|
||||||
|
if (random.nextInt(10) == 0) {
|
||||||
|
try {
|
||||||
|
closeRegion(hriSecondary);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
|
||||||
|
exceptions[2].compareAndSet(null, ex);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
openRegion(hriSecondary);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
|
||||||
|
exceptions[2].compareAndSet(null, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int key = random.nextInt(endKey - startKey) + startKey;
|
||||||
|
assertGetRpc(hriSecondary, key, true);
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
|
||||||
|
exceptions[2].compareAndSet(null, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Starting writer and reader");
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(3);
|
||||||
|
executor.submit(writer);
|
||||||
|
executor.submit(flusherCompactor);
|
||||||
|
executor.submit(reader);
|
||||||
|
|
||||||
|
// wait for threads
|
||||||
|
Threads.sleep(runtime);
|
||||||
|
running.set(false);
|
||||||
|
executor.shutdown();
|
||||||
|
executor.awaitTermination(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
for (AtomicReference<Exception> exRef : exceptions) {
|
||||||
|
Assert.assertNull(exRef.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
|
||||||
|
closeRegion(hriSecondary);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -119,6 +120,18 @@ public class TestRegionServerNoMaster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
|
||||||
|
public static void flushRegion(HBaseTestingUtility HTU, HRegionInfo regionInfo) throws IOException {
|
||||||
|
for (RegionServerThread rst : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||||
|
HRegion region = rst.getRegionServer().getRegionByEncodedName(regionInfo.getEncodedName());
|
||||||
|
if (region != null) {
|
||||||
|
region.flushcache();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IOException("Region to flush cannot be found");
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
table.close();
|
table.close();
|
||||||
|
|
|
@ -1005,6 +1005,7 @@ public class TestStore {
|
||||||
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
|
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testRefreshStoreFiles() throws Exception {
|
public void testRefreshStoreFiles() throws Exception {
|
||||||
init(name.getMethodName());
|
init(name.getMethodName());
|
||||||
|
|
||||||
|
@ -1051,6 +1052,7 @@ public class TestStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
public void testRefreshStoreFilesNotChanged() throws IOException {
|
public void testRefreshStoreFilesNotChanged() throws IOException {
|
||||||
init(name.getMethodName());
|
init(name.getMethodName());
|
||||||
|
|
||||||
|
@ -1075,4 +1077,4 @@ public class TestStore {
|
||||||
//ensure that replaceStoreFiles is not called if files are not refreshed
|
//ensure that replaceStoreFiles is not called if files are not refreshed
|
||||||
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
|
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -63,6 +64,7 @@ public class TestStoreFileRefresherChore {
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
|
testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
|
private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
|
||||||
|
@ -93,7 +95,7 @@ public class TestStoreFileRefresherChore {
|
||||||
|
|
||||||
private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException {
|
private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException {
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
Path tableDir = new Path(testDir, htd.getTableName().getNameAsString());
|
Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName());
|
||||||
|
|
||||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
|
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue