diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index af6caba4b47..a97a4337f77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.InfoServer; @@ -285,6 +286,9 @@ Server { */ private ObjectName mxBean = null; + //should we check the compression codec type at master side, default true, HBASE-6370 + private final boolean masterCheckCompression; + /** * Initializes the HMaster. The steps are as follows: *
@@ -352,6 +356,9 @@ Server { this.metrics = new MasterMetrics(getServerName().toString()); // metrics interval: using the same property as region server. this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + + //should we check the compression codec type at master side, default true, HBASE-6370 + this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true); } /** @@ -1376,6 +1383,7 @@ Server { HRegionInfo [] newRegions = getHRegionInfos(hTableDescriptor, splitKeys); checkInitialized(); + checkCompression(hTableDescriptor); if (cpHost != null) { cpHost.preCreateTable(hTableDescriptor, newRegions); } @@ -1389,6 +1397,21 @@ Server { } + private void checkCompression(final HTableDescriptor htd) + throws IOException { + if (!this.masterCheckCompression) return; + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + checkCompression(hcd); + } + } + + private void checkCompression(final HColumnDescriptor hcd) + throws IOException { + if (!this.masterCheckCompression) return; + CompressionTest.testCompression(hcd.getCompression()); + CompressionTest.testCompression(hcd.getCompactionCompression()); + } + @Override public CreateTableResponse createTable(RpcController controller, CreateTableRequest req) throws ServiceException { @@ -1505,6 +1528,7 @@ Server { try { checkInitialized(); + checkCompression(descriptor); if (cpHost != null) { if (cpHost.preModifyColumn(tableName, descriptor)) { return ModifyColumnResponse.newBuilder().build(); @@ -1626,6 +1650,7 @@ Server { HTableDescriptor htd = HTableDescriptor.convert(req.getTableSchema()); try { checkInitialized(); + checkCompression(htd); if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a4b7a8e4e21..b2a3243e92c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -3145,21 +3146,19 @@ public class TestHRegion extends HBaseTestCase { /** * Writes very wide records and gets the latest row every time.. - * Flushes and compacts the region every now and then to keep things - * realistic. + * Flushes and compacts the region aggressivly to catch issues. * * @throws IOException by flush / scan / compaction * @throws InterruptedException when joining threads */ public void testWritesWhileGetting() - throws IOException, InterruptedException { - byte[] tableName = Bytes.toBytes("testWritesWhileScanning"); + throws Exception { + byte[] tableName = Bytes.toBytes("testWritesWhileGetting"); int testCount = 100; int numRows = 1; int numFamilies = 10; int numQualifiers = 100; - int flushInterval = 10; - int compactInterval = 10 * flushInterval; + int compactInterval = 100; byte[][] families = new byte[numFamilies][]; for (int i = 0; i < numFamilies; i++) { families[i] = Bytes.toBytes("family" + i); @@ -3170,14 +3169,37 @@ public class TestHRegion extends HBaseTestCase { } String method = "testWritesWhileGetting"; - this.region = initHRegion(tableName, method, families); + Configuration conf = HBaseConfiguration.create(); + // This test flushes constantly and can cause many files to be created, possibly + // extending over the ulimit. Make sure compactions are aggressive in reducing + // the number of HFiles created. + conf.setInt("hbase.hstore.compaction.min", 1); + conf.setInt("hbase.hstore.compaction.max", 1000); + this.region = initHRegion(tableName, method, conf, families); + PutThread putThread = null; + MultithreadedTestUtil.TestContext ctx = + new MultithreadedTestUtil.TestContext(HBaseConfiguration.create()); try { - PutThread putThread = new PutThread(numRows, families, qualifiers); + putThread = new PutThread(numRows, families, qualifiers); putThread.start(); putThread.waitForFirstPut(); - FlushThread flushThread = new FlushThread(); - flushThread.start(); + // Add a thread that flushes as fast as possible + ctx.addThread(new RepeatingTestThread(ctx) { + private int flushesSinceCompact = 0; + private final int maxFlushesSinceCompact = 20; + public void doAnAction() throws Exception { + if (region.flushcache()) { + ++flushesSinceCompact; + } + // Compact regularly to avoid creating too many files and exceeding the ulimit. + if (flushesSinceCompact == maxFlushesSinceCompact) { + region.compactStores(false); + flushesSinceCompact = 0; + } + } + }); + ctx.startThreads(); Get get = new Get(Bytes.toBytes("row0")); Result result = null; @@ -3187,15 +3209,6 @@ public class TestHRegion extends HBaseTestCase { long prevTimestamp = 0L; for (int i = 0; i < testCount; i++) { - if (i != 0 && i % compactInterval == 0) { - region.compactStores(true); - } - - if (i != 0 && i % flushInterval == 0) { - //System.out.println("iteration = " + i); - flushThread.flush(); - } - boolean previousEmpty = result == null || result.isEmpty(); result = region.get(get, null); if (!result.isEmpty() || !previousEmpty || i > compactInterval) { @@ -3223,25 +3236,24 @@ public class TestHRegion extends HBaseTestCase { ", New KV: " + kv + "(memStoreTS:" + kv.getMemstoreTS() + ")" ); - assertEquals(previousKV.getValue(), thisValue); + assertEquals(0, Bytes.compareTo(previousKV.getValue(), thisValue)); } } previousKV = kv; } } } - - putThread.done(); + } finally { + if (putThread != null) putThread.done(); region.flushcache(); - putThread.join(); - putThread.checkNoError(); + if (putThread != null) { + putThread.join(); + putThread.checkNoError(); + } - flushThread.done(); - flushThread.join(); - flushThread.checkNoError(); - } finally { + ctx.stop(); HRegion.closeHRegion(this.region); this.region = null; }