HBASE-6334 TestImprovement for TestHRegion.testWritesWhileGetting
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1360935 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d6ed913ce5
commit
4c72558f62
|
@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.util.InfoServer;
|
import org.apache.hadoop.hbase.util.InfoServer;
|
||||||
|
@ -285,6 +286,9 @@ Server {
|
||||||
*/
|
*/
|
||||||
private ObjectName mxBean = null;
|
private ObjectName mxBean = null;
|
||||||
|
|
||||||
|
//should we check the compression codec type at master side, default true, HBASE-6370
|
||||||
|
private final boolean masterCheckCompression;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the HMaster. The steps are as follows:
|
* Initializes the HMaster. The steps are as follows:
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -352,6 +356,9 @@ Server {
|
||||||
this.metrics = new MasterMetrics(getServerName().toString());
|
this.metrics = new MasterMetrics(getServerName().toString());
|
||||||
// metrics interval: using the same property as region server.
|
// metrics interval: using the same property as region server.
|
||||||
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
|
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
|
||||||
|
|
||||||
|
//should we check the compression codec type at master side, default true, HBASE-6370
|
||||||
|
this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1376,6 +1383,7 @@ Server {
|
||||||
|
|
||||||
HRegionInfo [] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
|
HRegionInfo [] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
|
||||||
checkInitialized();
|
checkInitialized();
|
||||||
|
checkCompression(hTableDescriptor);
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preCreateTable(hTableDescriptor, newRegions);
|
cpHost.preCreateTable(hTableDescriptor, newRegions);
|
||||||
}
|
}
|
||||||
|
@ -1389,6 +1397,21 @@ Server {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkCompression(final HTableDescriptor htd)
|
||||||
|
throws IOException {
|
||||||
|
if (!this.masterCheckCompression) return;
|
||||||
|
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
|
||||||
|
checkCompression(hcd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkCompression(final HColumnDescriptor hcd)
|
||||||
|
throws IOException {
|
||||||
|
if (!this.masterCheckCompression) return;
|
||||||
|
CompressionTest.testCompression(hcd.getCompression());
|
||||||
|
CompressionTest.testCompression(hcd.getCompactionCompression());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
|
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
|
@ -1505,6 +1528,7 @@ Server {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkInitialized();
|
checkInitialized();
|
||||||
|
checkCompression(descriptor);
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
if (cpHost.preModifyColumn(tableName, descriptor)) {
|
if (cpHost.preModifyColumn(tableName, descriptor)) {
|
||||||
return ModifyColumnResponse.newBuilder().build();
|
return ModifyColumnResponse.newBuilder().build();
|
||||||
|
@ -1626,6 +1650,7 @@ Server {
|
||||||
HTableDescriptor htd = HTableDescriptor.convert(req.getTableSchema());
|
HTableDescriptor htd = HTableDescriptor.convert(req.getTableSchema());
|
||||||
try {
|
try {
|
||||||
checkInitialized();
|
checkInitialized();
|
||||||
|
checkCompression(htd);
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preModifyTable(tableName, htd);
|
cpHost.preModifyTable(tableName, htd);
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTests;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -3145,21 +3146,19 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes very wide records and gets the latest row every time..
|
* Writes very wide records and gets the latest row every time..
|
||||||
* Flushes and compacts the region every now and then to keep things
|
* Flushes and compacts the region aggressivly to catch issues.
|
||||||
* realistic.
|
|
||||||
*
|
*
|
||||||
* @throws IOException by flush / scan / compaction
|
* @throws IOException by flush / scan / compaction
|
||||||
* @throws InterruptedException when joining threads
|
* @throws InterruptedException when joining threads
|
||||||
*/
|
*/
|
||||||
public void testWritesWhileGetting()
|
public void testWritesWhileGetting()
|
||||||
throws IOException, InterruptedException {
|
throws Exception {
|
||||||
byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
|
byte[] tableName = Bytes.toBytes("testWritesWhileGetting");
|
||||||
int testCount = 100;
|
int testCount = 100;
|
||||||
int numRows = 1;
|
int numRows = 1;
|
||||||
int numFamilies = 10;
|
int numFamilies = 10;
|
||||||
int numQualifiers = 100;
|
int numQualifiers = 100;
|
||||||
int flushInterval = 10;
|
int compactInterval = 100;
|
||||||
int compactInterval = 10 * flushInterval;
|
|
||||||
byte[][] families = new byte[numFamilies][];
|
byte[][] families = new byte[numFamilies][];
|
||||||
for (int i = 0; i < numFamilies; i++) {
|
for (int i = 0; i < numFamilies; i++) {
|
||||||
families[i] = Bytes.toBytes("family" + i);
|
families[i] = Bytes.toBytes("family" + i);
|
||||||
|
@ -3170,14 +3169,37 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
String method = "testWritesWhileGetting";
|
String method = "testWritesWhileGetting";
|
||||||
this.region = initHRegion(tableName, method, families);
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
// This test flushes constantly and can cause many files to be created, possibly
|
||||||
|
// extending over the ulimit. Make sure compactions are aggressive in reducing
|
||||||
|
// the number of HFiles created.
|
||||||
|
conf.setInt("hbase.hstore.compaction.min", 1);
|
||||||
|
conf.setInt("hbase.hstore.compaction.max", 1000);
|
||||||
|
this.region = initHRegion(tableName, method, conf, families);
|
||||||
|
PutThread putThread = null;
|
||||||
|
MultithreadedTestUtil.TestContext ctx =
|
||||||
|
new MultithreadedTestUtil.TestContext(HBaseConfiguration.create());
|
||||||
try {
|
try {
|
||||||
PutThread putThread = new PutThread(numRows, families, qualifiers);
|
putThread = new PutThread(numRows, families, qualifiers);
|
||||||
putThread.start();
|
putThread.start();
|
||||||
putThread.waitForFirstPut();
|
putThread.waitForFirstPut();
|
||||||
|
|
||||||
FlushThread flushThread = new FlushThread();
|
// Add a thread that flushes as fast as possible
|
||||||
flushThread.start();
|
ctx.addThread(new RepeatingTestThread(ctx) {
|
||||||
|
private int flushesSinceCompact = 0;
|
||||||
|
private final int maxFlushesSinceCompact = 20;
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
if (region.flushcache()) {
|
||||||
|
++flushesSinceCompact;
|
||||||
|
}
|
||||||
|
// Compact regularly to avoid creating too many files and exceeding the ulimit.
|
||||||
|
if (flushesSinceCompact == maxFlushesSinceCompact) {
|
||||||
|
region.compactStores(false);
|
||||||
|
flushesSinceCompact = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ctx.startThreads();
|
||||||
|
|
||||||
Get get = new Get(Bytes.toBytes("row0"));
|
Get get = new Get(Bytes.toBytes("row0"));
|
||||||
Result result = null;
|
Result result = null;
|
||||||
|
@ -3187,15 +3209,6 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
long prevTimestamp = 0L;
|
long prevTimestamp = 0L;
|
||||||
for (int i = 0; i < testCount; i++) {
|
for (int i = 0; i < testCount; i++) {
|
||||||
|
|
||||||
if (i != 0 && i % compactInterval == 0) {
|
|
||||||
region.compactStores(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (i != 0 && i % flushInterval == 0) {
|
|
||||||
//System.out.println("iteration = " + i);
|
|
||||||
flushThread.flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean previousEmpty = result == null || result.isEmpty();
|
boolean previousEmpty = result == null || result.isEmpty();
|
||||||
result = region.get(get, null);
|
result = region.get(get, null);
|
||||||
if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
|
if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
|
||||||
|
@ -3223,25 +3236,24 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
", New KV: " +
|
", New KV: " +
|
||||||
kv + "(memStoreTS:" + kv.getMemstoreTS() + ")"
|
kv + "(memStoreTS:" + kv.getMemstoreTS() + ")"
|
||||||
);
|
);
|
||||||
assertEquals(previousKV.getValue(), thisValue);
|
assertEquals(0, Bytes.compareTo(previousKV.getValue(), thisValue));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
previousKV = kv;
|
previousKV = kv;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
putThread.done();
|
if (putThread != null) putThread.done();
|
||||||
|
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
|
|
||||||
|
if (putThread != null) {
|
||||||
putThread.join();
|
putThread.join();
|
||||||
putThread.checkNoError();
|
putThread.checkNoError();
|
||||||
|
}
|
||||||
|
|
||||||
flushThread.done();
|
ctx.stop();
|
||||||
flushThread.join();
|
|
||||||
flushThread.checkNoError();
|
|
||||||
} finally {
|
|
||||||
HRegion.closeHRegion(this.region);
|
HRegion.closeHRegion(this.region);
|
||||||
this.region = null;
|
this.region = null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue