From 9c29a6211b7272a91aa520d8ac4343e148a6a0ca Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 21 Nov 2017 21:18:54 +0800 Subject: [PATCH] HBASE-19311 Promote TestAcidGuarantees to LargeTests and start mini cluster once to make it faster --- .../hadoop/hbase/util/AbstractHBaseTool.java | 10 +- .../hbase/IntegrationTestAcidGuarantees.java | 60 +-- .../hadoop/hbase/AcidGuaranteesTestTool.java | 415 ++++++++++++++ .../hadoop/hbase/TestAcidGuarantees.java | 508 +++--------------- 4 files changed, 515 insertions(+), 478 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index b808d3e4f76..e301c1fffdf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -231,6 +231,14 @@ public abstract class AbstractHBaseTool implements Tool, Configurable { } } + public long getOptionAsLong(CommandLine cmd, String opt, int defaultValue) { + if (cmd.hasOption(opt)) { + return Long.parseLong(cmd.getOptionValue(opt)); + } else { + return defaultValue; + } + } + public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) { if (cmd.hasOption(opt)) { return Double.parseDouble(cmd.getOptionValue(opt)); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java index e1c17a4daa0..3c1e6ad17cc 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java @@ -17,26 +17,30 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_A; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_B; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_C; +import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME; + +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; -import org.apache.hadoop.hbase.regionserver.MemStoreCompactor; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Set; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; /** * This Integration Test verifies acid guarantees across column families by frequently writing * values to rows with multiple column families and concurrently reading entire rows that expect all * column families. - * *

* Sample usage: + * *

  * hbase org.apache.hadoop.hbase.IntegrationTestAcidGuarantees -Dmillis=10000 -DnumWriters=50
  * -DnumGetters=2 -DnumScanners=2 -DnumUniqueRows=5
@@ -47,19 +51,11 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
   private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
 
   // The unit test version.
-  TestAcidGuarantees tag;
+  AcidGuaranteesTestTool tool;
 
   @Override
   public int runTestFromCommandLine() throws Exception {
-    Configuration c = getConf();
-    int millis = c.getInt("millis", 5000);
-    int numWriters = c.getInt("numWriters", 50);
-    int numGetters = c.getInt("numGetters", 2);
-    int numScanners = c.getInt("numScanners", 2);
-    int numUniqueRows = c.getInt("numUniqueRows", 3);
-    boolean useMob = c.getBoolean("useMob",false);
-    tag.runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
-    return 0;
+    return tool.run(new String[0]);
   }
 
   @Override
@@ -68,50 +64,50 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
     util = getTestingUtil(getConf());
     util.initializeCluster(SERVER_COUNT);
     conf = getConf();
-    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
+    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
     // prevent aggressive region split
     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
-            ConstantSizeRegionSplitPolicy.class.getName());
-    this.setConf(util.getConfiguration());
+      ConstantSizeRegionSplitPolicy.class.getName());
 
-    // replace the HBaseTestingUtility in the unit test with the integration test's
-    // IntegrationTestingUtility
-    tag = new TestAcidGuarantees(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
-    tag.setHBaseTestingUtil(util);
+    tool = new AcidGuaranteesTestTool();
+    tool.setConf(getConf());
   }
 
   @Override
   public TableName getTablename() {
-    return TestAcidGuarantees.TABLE_NAME;
+    return TABLE_NAME;
   }
 
   @Override
   protected Set getColumnFamilies() {
-    return Sets.newHashSet(Bytes.toString(TestAcidGuarantees.FAMILY_A),
-            Bytes.toString(TestAcidGuarantees.FAMILY_B),
-            Bytes.toString(TestAcidGuarantees.FAMILY_C));
+    return Sets.newHashSet(Bytes.toString(FAMILY_A), Bytes.toString(FAMILY_B),
+      Bytes.toString(FAMILY_C));
+  }
+
+  private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
+      int numUniqueRows) throws Exception {
+    tool.run(new String[] { "-millis", String.valueOf(millisToRun), "-numWriters",
+        String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
+        String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows) });
   }
 
   // ***** Actual integration tests
-
   @Test
   public void testGetAtomicity() throws Exception {
-    tag.runTestAtomicity(20000, 4, 4, 0, 3);
+    runTestAtomicity(20000, 4, 4, 0, 3);
   }
 
   @Test
   public void testScanAtomicity() throws Exception {
-    tag.runTestAtomicity(20000, 3, 0, 2, 3);
+    runTestAtomicity(20000, 3, 0, 2, 3);
   }
 
   @Test
   public void testMixedAtomicity() throws Exception {
-    tag.runTestAtomicity(20000, 4, 2, 2, 3);
+    runTestAtomicity(20000, 4, 2, 2, 3);
   }
 
-
   // **** Command line hook
-
   public static void main(String[] args) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     IntegrationTestingUtility.setUseDistributedCluster(conf);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java
new file mode 100644
index 00000000000..5e00e8ca26f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/AcidGuaranteesTestTool.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+/**
+ * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
+ * that reads never see partially-complete writes
+ */
+@InterfaceAudience.Private
+public class AcidGuaranteesTestTool extends AbstractHBaseTool {
+
+  private static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
+
+  public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
+  public static final byte[] FAMILY_A = Bytes.toBytes("A");
+  public static final byte[] FAMILY_B = Bytes.toBytes("B");
+  public static final byte[] FAMILY_C = Bytes.toBytes("C");
+  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
+
+  public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
+
+  public static int NUM_COLS_TO_CHECK = 50;
+
+  private ExecutorService sharedPool;
+
+  private long millisToRun;
+  private int numWriters;
+  private int numGetters;
+  private int numScanners;
+  private int numUniqueRows;
+  private boolean crazyFlush;
+  private boolean useMob;
+
+  private ExecutorService createThreadPool() {
+    int maxThreads = 256;
+    int coreThreads = 128;
+
+    long keepAliveTime = 60;
+    BlockingQueue workQueue = new LinkedBlockingQueue(
+        maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
+
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
+        TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared"));
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  @Override
+  protected void addOptions() {
+    addOptWithArg("millis", "time limit in milliseconds");
+    addOptWithArg("numWriters", "number of write threads");
+    addOptWithArg("numGetters", "number of get threads");
+    addOptWithArg("numScanners", "number of scan threads");
+    addOptWithArg("numUniqueRows", "number of unique rows to test");
+    addOptNoArg("crazyFlush",
+      "if specified we will flush continuously otherwise will flush every minute");
+    addOptNoArg("useMob", "if specified we will enable mob on the first column family");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    millisToRun = getOptionAsLong(cmd, "millis", 5000);
+    numWriters = getOptionAsInt(cmd, "numWriters", 50);
+    numGetters = getOptionAsInt(cmd, "numGetters", 2);
+    numScanners = getOptionAsInt(cmd, "numScanners", 2);
+    numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
+    crazyFlush = cmd.hasOption("crazyFlush");
+    useMob = cmd.hasOption("useMob");
+  }
+
+  @Override
+  protected int doWork() throws Exception {
+    sharedPool = createThreadPool();
+    try (Connection conn = ConnectionFactory.createConnection(getConf())) {
+      runTestAtomicity(conn.getAdmin());
+    } finally {
+      sharedPool.shutdown();
+    }
+    return 0;
+  }
+
+  /**
+   * Thread that does random full-row writes into a table.
+   */
+  public static class AtomicityWriter extends RepeatingTestThread {
+    Random rand = new Random();
+    byte data[] = new byte[10];
+    byte[][] targetRows;
+    byte[][] targetFamilies;
+    Connection connection;
+    Table table;
+    AtomicLong numWritten = new AtomicLong();
+
+    public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
+        ExecutorService pool) throws IOException {
+      super(ctx);
+      this.targetRows = targetRows;
+      this.targetFamilies = targetFamilies;
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+      table = connection.getTable(TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      // Pick a random row to write into
+      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+      Put p = new Put(targetRow);
+      rand.nextBytes(data);
+
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          p.addColumn(family, qualifier, data);
+        }
+      }
+      table.put(p);
+      numWritten.getAndIncrement();
+    }
+
+    @Override
+    public void workDone() throws IOException {
+      try {
+        table.close();
+      } finally {
+        connection.close();
+      }
+    }
+  }
+
+  /**
+   * Thread that does single-row reads in a table, looking for partially completed rows.
+   */
+  public static class AtomicGetReader extends RepeatingTestThread {
+    byte[] targetRow;
+    byte[][] targetFamilies;
+    Connection connection;
+    Table table;
+    int numVerified = 0;
+    AtomicLong numRead = new AtomicLong();
+
+    public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
+        ExecutorService pool) throws IOException {
+      super(ctx);
+      this.targetRow = targetRow;
+      this.targetFamilies = targetFamilies;
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+      table = connection.getTable(TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Get g = new Get(targetRow);
+      Result res = table.get(g);
+      byte[] gotValue = null;
+      if (res.getRow() == null) {
+        // Trying to verify but we didn't find the row - the writing
+        // thread probably just hasn't started writing yet, so we can
+        // ignore this action
+        return;
+      }
+
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+            gotFailure(gotValue, res);
+          }
+          numVerified++;
+          gotValue = thisValue;
+        }
+      }
+      numRead.getAndIncrement();
+    }
+
+    @Override
+    public void workDone() throws IOException {
+      try {
+        table.close();
+      } finally {
+        connection.close();
+      }
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numVerified).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (Cell kv : res.listCells()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+
+  /**
+   * Thread that does full scans of the table looking for any partially completed rows.
+   */
+  public static class AtomicScanReader extends RepeatingTestThread {
+    byte[][] targetFamilies;
+    Table table;
+    Connection connection;
+    AtomicLong numScans = new AtomicLong();
+    AtomicLong numRowsScanned = new AtomicLong();
+
+    public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
+        throws IOException {
+      super(ctx);
+      this.targetFamilies = targetFamilies;
+      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
+      table = connection.getTable(TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Scan s = new Scan();
+      for (byte[] family : targetFamilies) {
+        s.addFamily(family);
+      }
+      ResultScanner scanner = table.getScanner(s);
+
+      for (Result res : scanner) {
+        byte[] gotValue = null;
+
+        for (byte[] family : targetFamilies) {
+          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+            byte qualifier[] = Bytes.toBytes("col" + i);
+            byte thisValue[] = res.getValue(family, qualifier);
+            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+              gotFailure(gotValue, res);
+            }
+            gotValue = thisValue;
+          }
+        }
+        numRowsScanned.getAndIncrement();
+      }
+      numScans.getAndIncrement();
+    }
+
+    @Override
+    public void workDone() throws IOException {
+      try {
+        table.close();
+      } finally {
+        connection.close();
+      }
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numRowsScanned).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (Cell kv : res.listCells()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+
+  private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
+    if (!admin.tableExists(TABLE_NAME)) {
+      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
+      Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
+          .forEachOrdered(builder::addColumnFamily);
+      admin.createTable(builder.build());
+    }
+    ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
+    if (cfd.isMobEnabled() != useMob) {
+      admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
+          .setMobEnabled(useMob).setMobThreshold(4).build());
+    }
+  }
+
+  private void runTestAtomicity(Admin admin) throws Exception {
+    createTableIfMissing(admin, useMob);
+    TestContext ctx = new TestContext(conf);
+
+    byte rows[][] = new byte[numUniqueRows][];
+    for (int i = 0; i < numUniqueRows; i++) {
+      rows[i] = Bytes.toBytes("test_row_" + i);
+    }
+
+    List writers = Lists.newArrayList();
+    for (int i = 0; i < numWriters; i++) {
+      AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
+      writers.add(writer);
+      ctx.addThread(writer);
+    }
+    // Add a flusher
+    ctx.addThread(new RepeatingTestThread(ctx) {
+      public void doAnAction() throws Exception {
+        try {
+          admin.flush(TABLE_NAME);
+        } catch (IOException ioe) {
+          LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
+        }
+        // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
+        // we would flush as often as possible. On a running cluster, this isn't practical:
+        // (1) we will cause a lot of load due to all the flushing and compacting
+        // (2) we cannot change the flushing/compacting related Configuration options to try to
+        // alleviate this
+        // (3) it is an unrealistic workload, since no one would actually flush that often.
+        // Therefore, let's flush every minute to have more flushes than usual, but not overload
+        // the running cluster.
+        if (!crazyFlush) {
+          Thread.sleep(60000);
+        }
+      }
+    });
+
+    List getters = Lists.newArrayList();
+    for (int i = 0; i < numGetters; i++) {
+      AtomicGetReader getter =
+          new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
+      getters.add(getter);
+      ctx.addThread(getter);
+    }
+
+    List scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+
+    LOG.info("Finished test. Writers:");
+    for (AtomicityWriter writer : writers) {
+      LOG.info("  wrote " + writer.numWritten.get());
+    }
+    LOG.info("Readers:");
+    for (AtomicGetReader reader : getters) {
+      LOG.info("  read " + reader.numRead.get());
+    }
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+
+  public static void main(String[] args) {
+    Configuration c = HBaseConfiguration.create();
+    int status;
+    try {
+      AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
+      status = ToolRunner.run(c, test, args);
+    } catch (Exception e) {
+      LOG.error("Exiting due to error", e);
+      status = -1;
+    }
+    System.exit(status);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
index 9e845ad0927..e42794eefa8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
@@ -18,434 +18,103 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILIES;
+import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME;
+
+import java.util.List;
+import java.util.stream.Stream;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 /**
- * Test case that uses multiple threads to read and write multifamily rows
- * into a table, verifying that reads never see partially-complete writes.
- *
- * This can run as a junit test, or with a main() function which runs against
- * a real cluster (eg for testing with failures, region movement, etc)
+ * Test case that uses multiple threads to read and write multifamily rows into a table, verifying
+ * that reads never see partially-complete writes. This can run as a junit test, or with a main()
+ * function which runs against a real cluster (eg for testing with failures, region movement, etc)
  */
-@Category({FlakeyTests.class, MediumTests.class})
+@Category({ FlakeyTests.class, LargeTests.class })
 @RunWith(Parameterized.class)
-public class TestAcidGuarantees implements Tool {
-  @Parameterized.Parameters
+public class TestAcidGuarantees {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @Parameterized.Parameters(name = "{index}: compType={0}")
   public static Object[] data() {
     return new Object[] { "NONE", "BASIC", "EAGER" };
   }
-  protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
-  public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
-  public static final byte [] FAMILY_A = Bytes.toBytes("A");
-  public static final byte [] FAMILY_B = Bytes.toBytes("B");
-  public static final byte [] FAMILY_C = Bytes.toBytes("C");
-  public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
 
-  public static final byte[][] FAMILIES = new byte[][] {
-    FAMILY_A, FAMILY_B, FAMILY_C };
+  @Parameter
+  public String compType;
 
-  private HBaseTestingUtility util;
+  private AcidGuaranteesTestTool tool = new AcidGuaranteesTestTool();
 
-  public static int NUM_COLS_TO_CHECK = 50;
-
-  // when run as main
-  private Configuration conf;
-  private ExecutorService sharedPool = null;
-
-  private void createTableIfMissing(boolean useMob)
-    throws IOException {
-    try {
-      util.createTable(TABLE_NAME, FAMILIES);
-    } catch (TableExistsException tee) {
-    }
-
-    if (useMob) {
-      HTableDescriptor htd = new HTableDescriptor(util.getAdmin().getTableDescriptor(TABLE_NAME));
-      HColumnDescriptor hcd =  htd.getColumnFamilies()[0];
-      // force mob enabled such that all data is mob data
-      hcd.setMobEnabled(true);
-      hcd.setMobThreshold(4);
-      util.getAdmin().modifyColumnFamily(TABLE_NAME, hcd);
-    }
-  }
-
-  public TestAcidGuarantees(String compType) {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
     // Set small flush size for minicluster so we exercise reseeking scanners
-    Configuration conf = HBaseConfiguration.create();
+    Configuration conf = UTIL.getConfiguration();
     conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
     // prevent aggressive region split
     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
-        ConstantSizeRegionSplitPolicy.class.getName());
+      ConstantSizeRegionSplitPolicy.class.getName());
     conf.setInt("hfile.format.version", 3); // for mob tests
-    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
-    if(MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
-      conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
-      conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9);
-    }
-    util = new HBaseTestingUtility(conf);
-    sharedPool = createThreadPool();
+    UTIL.startMiniCluster(1);
   }
 
-  public void setHBaseTestingUtil(HBaseTestingUtility util) {
-    this.util = util;
-  }
-
-  private ExecutorService createThreadPool() {
-
-    int maxThreads = 256;
-    int coreThreads = 128;
-
-    long keepAliveTime = 60;
-    BlockingQueue workQueue =
-          new LinkedBlockingQueue(maxThreads *
-              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
-
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
-        coreThreads,
-        maxThreads,
-        keepAliveTime,
-        TimeUnit.SECONDS,
-        workQueue,
-        Threads.newDaemonThreadFactory(toString() + "-shared"));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
-  }
-
-  public ExecutorService getSharedThreadPool() {
-    return sharedPool;
-  }
-
-  /**
-   * Thread that does random full-row writes into a table.
-   */
-  public static class AtomicityWriter extends RepeatingTestThread {
-    Random rand = new Random();
-    byte data[] = new byte[10];
-    byte targetRows[][];
-    byte targetFamilies[][];
-    Connection connection;
-    Table table;
-    AtomicLong numWritten = new AtomicLong();
-
-    public AtomicityWriter(TestContext ctx, byte targetRows[][],
-                           byte targetFamilies[][], ExecutorService pool) throws IOException {
-      super(ctx);
-      this.targetRows = targetRows;
-      this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
-      table = connection.getTable(TABLE_NAME);
-    }
-    public void doAnAction() throws Exception {
-      // Pick a random row to write into
-      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
-      Put p = new Put(targetRow);
-      rand.nextBytes(data);
-
-      for (byte[] family : targetFamilies) {
-        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
-          byte qualifier[] = Bytes.toBytes("col" + i);
-          p.addColumn(family, qualifier, data);
-        }
-      }
-      table.put(p);
-      numWritten.getAndIncrement();
-    }
-
-    @Override
-    public void workDone() throws IOException {
-      try {
-        table.close();
-      } finally {
-        connection.close();
-      }
-    }
-  }
-
-  /**
-   * Thread that does single-row reads in a table, looking for partially
-   * completed rows.
-   */
-  public static class AtomicGetReader extends RepeatingTestThread {
-    byte targetRow[];
-    byte targetFamilies[][];
-    Connection connection;
-    Table table;
-    int numVerified = 0;
-    AtomicLong numRead = new AtomicLong();
-
-    public AtomicGetReader(TestContext ctx, byte targetRow[],
-                           byte targetFamilies[][], ExecutorService pool) throws IOException {
-      super(ctx);
-      this.targetRow = targetRow;
-      this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
-      table = connection.getTable(TABLE_NAME);
-    }
-
-    public void doAnAction() throws Exception {
-      Get g = new Get(targetRow);
-      Result res = table.get(g);
-      byte[] gotValue = null;
-      if (res.getRow() == null) {
-        // Trying to verify but we didn't find the row - the writing
-        // thread probably just hasn't started writing yet, so we can
-        // ignore this action
-        return;
-      }
-
-      for (byte[] family : targetFamilies) {
-        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
-          byte qualifier[] = Bytes.toBytes("col" + i);
-          byte thisValue[] = res.getValue(family, qualifier);
-          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
-            gotFailure(gotValue, res);
-          }
-          numVerified++;
-          gotValue = thisValue;
-        }
-      }
-      numRead.getAndIncrement();
-    }
-
-    @Override
-    public void workDone() throws IOException {
-      try {
-        table.close();
-      } finally {
-        connection.close();
-      }
-    }
-
-    private void gotFailure(byte[] expected, Result res) {
-      StringBuilder msg = new StringBuilder();
-      msg.append("Failed after ").append(numVerified).append("!");
-      msg.append("Expected=").append(Bytes.toStringBinary(expected));
-      msg.append("Got:\n");
-      for (Cell kv : res.listCells()) {
-        msg.append(kv.toString());
-        msg.append(" val= ");
-        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
-        msg.append("\n");
-      }
-      throw new RuntimeException(msg.toString());
-    }
-  }
-
-  /**
-   * Thread that does full scans of the table looking for any partially completed
-   * rows.
-   */
-  public static class AtomicScanReader extends RepeatingTestThread {
-    byte targetFamilies[][];
-    Table table;
-    Connection connection;
-    AtomicLong numScans = new AtomicLong();
-    AtomicLong numRowsScanned = new AtomicLong();
-
-    public AtomicScanReader(TestContext ctx,
-                           byte targetFamilies[][], ExecutorService pool) throws IOException {
-      super(ctx);
-      this.targetFamilies = targetFamilies;
-      connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
-      table = connection.getTable(TABLE_NAME);
-    }
-
-    public void doAnAction() throws Exception {
-      Scan s = new Scan();
-      for (byte[] family : targetFamilies) {
-        s.addFamily(family);
-      }
-      ResultScanner scanner = table.getScanner(s);
-
-      for (Result res : scanner) {
-        byte[] gotValue = null;
-
-        for (byte[] family : targetFamilies) {
-          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
-            byte qualifier[] = Bytes.toBytes("col" + i);
-            byte thisValue[] = res.getValue(family, qualifier);
-            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
-              gotFailure(gotValue, res);
-            }
-            gotValue = thisValue;
-          }
-        }
-        numRowsScanned.getAndIncrement();
-      }
-      numScans.getAndIncrement();
-    }
-
-    @Override
-    public void workDone() throws IOException {
-      try {
-        table.close();
-      } finally {
-        connection.close();
-      }
-    }
-
-    private void gotFailure(byte[] expected, Result res) {
-      StringBuilder msg = new StringBuilder();
-      msg.append("Failed after ").append(numRowsScanned).append("!");
-      msg.append("Expected=").append(Bytes.toStringBinary(expected));
-      msg.append("Got:\n");
-      for (Cell kv : res.listCells()) {
-        msg.append(kv.toString());
-        msg.append(" val= ");
-        msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
-        msg.append("\n");
-      }
-      throw new RuntimeException(msg.toString());
-    }
-  }
-
-  public void runTestAtomicity(long millisToRun,
-      int numWriters,
-      int numGetters,
-      int numScanners,
-      int numUniqueRows) throws Exception {
-    runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
-  }
-
-  public void runTestAtomicity(long millisToRun,
-      int numWriters,
-      int numGetters,
-      int numScanners,
-      int numUniqueRows,
-      final boolean systemTest) throws Exception {
-    runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest,
-            false);
-  }
-
-  public void runTestAtomicity(long millisToRun,
-    int numWriters,
-    int numGetters,
-    int numScanners,
-    int numUniqueRows,
-    final boolean systemTest,
-    final boolean useMob) throws Exception {
-
-    createTableIfMissing(useMob);
-    // set the max threads to avoid java.lang.OutOfMemoryError: unable to create new native thread
-    util.getConfiguration().setInt("hbase.hconnection.threads.max", 40);
-    TestContext ctx = new TestContext(util.getConfiguration());
-
-    byte rows[][] = new byte[numUniqueRows][];
-    for (int i = 0; i < numUniqueRows; i++) {
-      rows[i] = Bytes.toBytes("test_row_" + i);
-    }
-
-    List writers = Lists.newArrayList();
-    for (int i = 0; i < numWriters; i++) {
-      AtomicityWriter writer = new AtomicityWriter(
-          ctx, rows, FAMILIES, getSharedThreadPool());
-      writers.add(writer);
-      ctx.addThread(writer);
-    }
-    // Add a flusher
-    ctx.addThread(new RepeatingTestThread(ctx) {
-      Admin admin = util.getAdmin();
-      public void doAnAction() throws Exception {
-        try {
-          admin.flush(TABLE_NAME);
-        } catch(IOException ioe) {
-          LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
-        }
-        // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
-        // we would flush as often as possible.  On a running cluster, this isn't practical:
-        // (1) we will cause a lot of load due to all the flushing and compacting
-        // (2) we cannot change the flushing/compacting related Configuration options to try to
-        // alleviate this
-        // (3) it is an unrealistic workload, since no one would actually flush that often.
-        // Therefore, let's flush every minute to have more flushes than usual, but not overload
-        // the running cluster.
-        if (systemTest) Thread.sleep(60000);
-      }
-    });
-
-    List getters = Lists.newArrayList();
-    for (int i = 0; i < numGetters; i++) {
-      AtomicGetReader getter = new AtomicGetReader(
-          ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool());
-      getters.add(getter);
-      ctx.addThread(getter);
-    }
-
-    List scanners = Lists.newArrayList();
-    for (int i = 0; i < numScanners; i++) {
-      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool());
-      scanners.add(scanner);
-      ctx.addThread(scanner);
-    }
-
-    ctx.startThreads();
-    ctx.waitFor(millisToRun);
-    ctx.stop();
-
-    LOG.info("Finished test. Writers:");
-    for (AtomicityWriter writer : writers) {
-      LOG.info("  wrote " + writer.numWritten.get());
-    }
-    LOG.info("Readers:");
-    for (AtomicGetReader reader : getters) {
-      LOG.info("  read " + reader.numRead.get());
-    }
-    LOG.info("Scanners:");
-    for (AtomicScanReader scanner : scanners) {
-      LOG.info("  scanned " + scanner.numScans.get());
-      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
-    }
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
   }
 
   @Before
   public void setUp() throws Exception {
-    util.startMiniCluster(1);
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
+    if (MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
+      builder.setValue(MemStoreLAB.USEMSLAB_KEY, "false");
+      builder.setValue(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, "0.9");
+    }
+    Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
+        .forEachOrdered(builder::addColumnFamily);
+    UTIL.getAdmin().createTable(builder.build());
+    tool.setConf(UTIL.getConfiguration());
   }
 
   @After
   public void tearDown() throws Exception {
-    util.shutdownMiniCluster();
+    UTIL.deleteTable(TABLE_NAME);
+  }
+
+  private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
+      int numUniqueRows) throws Exception {
+    runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
+  }
+
+  private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
+      int numUniqueRows, boolean useMob) throws Exception {
+    List args = Lists.newArrayList("-millis", String.valueOf(millisToRun), "-numWriters",
+      String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
+      String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows), "-crazyFlush");
+    if (useMob) {
+      args.add("-useMob");
+    }
+    tool.run(args.toArray(new String[0]));
   }
 
   @Test
@@ -465,67 +134,16 @@ public class TestAcidGuarantees implements Tool {
 
   @Test
   public void testMobGetAtomicity() throws Exception {
-    boolean systemTest = false;
-    boolean useMob = true;
-    runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob);
+    runTestAtomicity(20000, 5, 5, 0, 3, true);
   }
 
   @Test
   public void testMobScanAtomicity() throws Exception {
-    boolean systemTest = false;
-    boolean useMob = true;
-    runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob);
+    runTestAtomicity(20000, 5, 0, 5, 3, true);
   }
 
   @Test
   public void testMobMixedAtomicity() throws Exception {
-    boolean systemTest = false;
-    boolean useMob = true;
-    runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob);
+    runTestAtomicity(20000, 5, 2, 2, 3, true);
   }
-
-  ////////////////////////////////////////////////////////////////////////////
-  // Tool interface
-  ////////////////////////////////////////////////////////////////////////////
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration c) {
-    this.conf = c;
-    this.util = new HBaseTestingUtility(c);
-  }
-
-  @Override
-  public int run(String[] arg0) throws Exception {
-    Configuration c = getConf();
-    int millis = c.getInt("millis", 5000);
-    int numWriters = c.getInt("numWriters", 50);
-    int numGetters = c.getInt("numGetters", 2);
-    int numScanners = c.getInt("numScanners", 2);
-    int numUniqueRows = c.getInt("numUniqueRows", 3);
-    boolean useMob = c.getBoolean("useMob",false);
-    assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3";
-    runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
-    return 0;
-  }
-
-  public static void main(String args[]) throws Exception {
-    Configuration c = HBaseConfiguration.create();
-    int status;
-    try {
-      TestAcidGuarantees test = new TestAcidGuarantees(CompactingMemStore
-          .COMPACTING_MEMSTORE_TYPE_DEFAULT);
-      status = ToolRunner.run(c, test, args);
-    } catch (Exception e) {
-      LOG.error("Exiting due to error", e);
-      status = -1;
-    }
-    System.exit(status);
-  }
-
-
 }
-