HBASE-19311 Promote TestAcidGuarantees to LargeTests and start mini cluster once to make it faster
This commit is contained in:
parent
8f0f820f22
commit
9c29a6211b
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
@ -231,6 +231,14 @@ public abstract class AbstractHBaseTool implements Tool, Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getOptionAsLong(CommandLine cmd, String opt, int defaultValue) {
|
||||||
|
if (cmd.hasOption(opt)) {
|
||||||
|
return Long.parseLong(cmd.getOptionValue(opt));
|
||||||
|
} else {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) {
|
public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) {
|
||||||
if (cmd.hasOption(opt)) {
|
if (cmd.hasOption(opt)) {
|
||||||
return Double.parseDouble(cmd.getOptionValue(opt));
|
return Double.parseDouble(cmd.getOptionValue(opt));
|
||||||
|
|
|
@ -17,26 +17,30 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_A;
|
||||||
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_B;
|
||||||
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILY_C;
|
||||||
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactor;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import java.util.Set;
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This Integration Test verifies acid guarantees across column families by frequently writing
|
* This Integration Test verifies acid guarantees across column families by frequently writing
|
||||||
* values to rows with multiple column families and concurrently reading entire rows that expect all
|
* values to rows with multiple column families and concurrently reading entire rows that expect all
|
||||||
* column families.
|
* column families.
|
||||||
*
|
|
||||||
* <p>
|
* <p>
|
||||||
* Sample usage:
|
* Sample usage:
|
||||||
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* hbase org.apache.hadoop.hbase.IntegrationTestAcidGuarantees -Dmillis=10000 -DnumWriters=50
|
* hbase org.apache.hadoop.hbase.IntegrationTestAcidGuarantees -Dmillis=10000 -DnumWriters=50
|
||||||
* -DnumGetters=2 -DnumScanners=2 -DnumUniqueRows=5
|
* -DnumGetters=2 -DnumScanners=2 -DnumUniqueRows=5
|
||||||
|
@ -47,19 +51,11 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
|
||||||
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
|
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
|
||||||
|
|
||||||
// The unit test version.
|
// The unit test version.
|
||||||
TestAcidGuarantees tag;
|
AcidGuaranteesTestTool tool;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int runTestFromCommandLine() throws Exception {
|
public int runTestFromCommandLine() throws Exception {
|
||||||
Configuration c = getConf();
|
return tool.run(new String[0]);
|
||||||
int millis = c.getInt("millis", 5000);
|
|
||||||
int numWriters = c.getInt("numWriters", 50);
|
|
||||||
int numGetters = c.getInt("numGetters", 2);
|
|
||||||
int numScanners = c.getInt("numScanners", 2);
|
|
||||||
int numUniqueRows = c.getInt("numUniqueRows", 3);
|
|
||||||
boolean useMob = c.getBoolean("useMob",false);
|
|
||||||
tag.runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -68,50 +64,50 @@ public class IntegrationTestAcidGuarantees extends IntegrationTestBase {
|
||||||
util = getTestingUtil(getConf());
|
util = getTestingUtil(getConf());
|
||||||
util.initializeCluster(SERVER_COUNT);
|
util.initializeCluster(SERVER_COUNT);
|
||||||
conf = getConf();
|
conf = getConf();
|
||||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
|
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
|
||||||
// prevent aggressive region split
|
// prevent aggressive region split
|
||||||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||||
ConstantSizeRegionSplitPolicy.class.getName());
|
ConstantSizeRegionSplitPolicy.class.getName());
|
||||||
this.setConf(util.getConfiguration());
|
|
||||||
|
|
||||||
// replace the HBaseTestingUtility in the unit test with the integration test's
|
tool = new AcidGuaranteesTestTool();
|
||||||
// IntegrationTestingUtility
|
tool.setConf(getConf());
|
||||||
tag = new TestAcidGuarantees(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
|
|
||||||
tag.setHBaseTestingUtil(util);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableName getTablename() {
|
public TableName getTablename() {
|
||||||
return TestAcidGuarantees.TABLE_NAME;
|
return TABLE_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Set<String> getColumnFamilies() {
|
protected Set<String> getColumnFamilies() {
|
||||||
return Sets.newHashSet(Bytes.toString(TestAcidGuarantees.FAMILY_A),
|
return Sets.newHashSet(Bytes.toString(FAMILY_A), Bytes.toString(FAMILY_B),
|
||||||
Bytes.toString(TestAcidGuarantees.FAMILY_B),
|
Bytes.toString(FAMILY_C));
|
||||||
Bytes.toString(TestAcidGuarantees.FAMILY_C));
|
}
|
||||||
|
|
||||||
|
private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
|
||||||
|
int numUniqueRows) throws Exception {
|
||||||
|
tool.run(new String[] { "-millis", String.valueOf(millisToRun), "-numWriters",
|
||||||
|
String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
|
||||||
|
String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows) });
|
||||||
}
|
}
|
||||||
|
|
||||||
// ***** Actual integration tests
|
// ***** Actual integration tests
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAtomicity() throws Exception {
|
public void testGetAtomicity() throws Exception {
|
||||||
tag.runTestAtomicity(20000, 4, 4, 0, 3);
|
runTestAtomicity(20000, 4, 4, 0, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScanAtomicity() throws Exception {
|
public void testScanAtomicity() throws Exception {
|
||||||
tag.runTestAtomicity(20000, 3, 0, 2, 3);
|
runTestAtomicity(20000, 3, 0, 2, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMixedAtomicity() throws Exception {
|
public void testMixedAtomicity() throws Exception {
|
||||||
tag.runTestAtomicity(20000, 4, 2, 2, 3);
|
runTestAtomicity(20000, 4, 2, 2, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// **** Command line hook
|
// **** Command line hook
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
|
|
@ -0,0 +1,415 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
|
||||||
|
* that reads never see partially-complete writes
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class AcidGuaranteesTestTool extends AbstractHBaseTool {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
|
||||||
|
|
||||||
|
public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
|
||||||
|
public static final byte[] FAMILY_A = Bytes.toBytes("A");
|
||||||
|
public static final byte[] FAMILY_B = Bytes.toBytes("B");
|
||||||
|
public static final byte[] FAMILY_C = Bytes.toBytes("C");
|
||||||
|
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
|
||||||
|
|
||||||
|
public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
|
||||||
|
|
||||||
|
public static int NUM_COLS_TO_CHECK = 50;
|
||||||
|
|
||||||
|
private ExecutorService sharedPool;
|
||||||
|
|
||||||
|
private long millisToRun;
|
||||||
|
private int numWriters;
|
||||||
|
private int numGetters;
|
||||||
|
private int numScanners;
|
||||||
|
private int numUniqueRows;
|
||||||
|
private boolean crazyFlush;
|
||||||
|
private boolean useMob;
|
||||||
|
|
||||||
|
private ExecutorService createThreadPool() {
|
||||||
|
int maxThreads = 256;
|
||||||
|
int coreThreads = 128;
|
||||||
|
|
||||||
|
long keepAliveTime = 60;
|
||||||
|
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
|
||||||
|
maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
|
||||||
|
|
||||||
|
ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
|
||||||
|
TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared"));
|
||||||
|
tpe.allowCoreThreadTimeOut(true);
|
||||||
|
return tpe;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addOptions() {
|
||||||
|
addOptWithArg("millis", "time limit in milliseconds");
|
||||||
|
addOptWithArg("numWriters", "number of write threads");
|
||||||
|
addOptWithArg("numGetters", "number of get threads");
|
||||||
|
addOptWithArg("numScanners", "number of scan threads");
|
||||||
|
addOptWithArg("numUniqueRows", "number of unique rows to test");
|
||||||
|
addOptNoArg("crazyFlush",
|
||||||
|
"if specified we will flush continuously otherwise will flush every minute");
|
||||||
|
addOptNoArg("useMob", "if specified we will enable mob on the first column family");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(CommandLine cmd) {
|
||||||
|
millisToRun = getOptionAsLong(cmd, "millis", 5000);
|
||||||
|
numWriters = getOptionAsInt(cmd, "numWriters", 50);
|
||||||
|
numGetters = getOptionAsInt(cmd, "numGetters", 2);
|
||||||
|
numScanners = getOptionAsInt(cmd, "numScanners", 2);
|
||||||
|
numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
|
||||||
|
crazyFlush = cmd.hasOption("crazyFlush");
|
||||||
|
useMob = cmd.hasOption("useMob");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int doWork() throws Exception {
|
||||||
|
sharedPool = createThreadPool();
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(getConf())) {
|
||||||
|
runTestAtomicity(conn.getAdmin());
|
||||||
|
} finally {
|
||||||
|
sharedPool.shutdown();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread that does random full-row writes into a table.
|
||||||
|
*/
|
||||||
|
public static class AtomicityWriter extends RepeatingTestThread {
|
||||||
|
Random rand = new Random();
|
||||||
|
byte data[] = new byte[10];
|
||||||
|
byte[][] targetRows;
|
||||||
|
byte[][] targetFamilies;
|
||||||
|
Connection connection;
|
||||||
|
Table table;
|
||||||
|
AtomicLong numWritten = new AtomicLong();
|
||||||
|
|
||||||
|
public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
|
||||||
|
ExecutorService pool) throws IOException {
|
||||||
|
super(ctx);
|
||||||
|
this.targetRows = targetRows;
|
||||||
|
this.targetFamilies = targetFamilies;
|
||||||
|
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
||||||
|
table = connection.getTable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
// Pick a random row to write into
|
||||||
|
byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
|
||||||
|
Put p = new Put(targetRow);
|
||||||
|
rand.nextBytes(data);
|
||||||
|
|
||||||
|
for (byte[] family : targetFamilies) {
|
||||||
|
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
||||||
|
byte qualifier[] = Bytes.toBytes("col" + i);
|
||||||
|
p.addColumn(family, qualifier, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
table.put(p);
|
||||||
|
numWritten.getAndIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void workDone() throws IOException {
|
||||||
|
try {
|
||||||
|
table.close();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread that does single-row reads in a table, looking for partially completed rows.
|
||||||
|
*/
|
||||||
|
public static class AtomicGetReader extends RepeatingTestThread {
|
||||||
|
byte[] targetRow;
|
||||||
|
byte[][] targetFamilies;
|
||||||
|
Connection connection;
|
||||||
|
Table table;
|
||||||
|
int numVerified = 0;
|
||||||
|
AtomicLong numRead = new AtomicLong();
|
||||||
|
|
||||||
|
public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
|
||||||
|
ExecutorService pool) throws IOException {
|
||||||
|
super(ctx);
|
||||||
|
this.targetRow = targetRow;
|
||||||
|
this.targetFamilies = targetFamilies;
|
||||||
|
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
||||||
|
table = connection.getTable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
Get g = new Get(targetRow);
|
||||||
|
Result res = table.get(g);
|
||||||
|
byte[] gotValue = null;
|
||||||
|
if (res.getRow() == null) {
|
||||||
|
// Trying to verify but we didn't find the row - the writing
|
||||||
|
// thread probably just hasn't started writing yet, so we can
|
||||||
|
// ignore this action
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (byte[] family : targetFamilies) {
|
||||||
|
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
||||||
|
byte qualifier[] = Bytes.toBytes("col" + i);
|
||||||
|
byte thisValue[] = res.getValue(family, qualifier);
|
||||||
|
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
|
||||||
|
gotFailure(gotValue, res);
|
||||||
|
}
|
||||||
|
numVerified++;
|
||||||
|
gotValue = thisValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numRead.getAndIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void workDone() throws IOException {
|
||||||
|
try {
|
||||||
|
table.close();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void gotFailure(byte[] expected, Result res) {
|
||||||
|
StringBuilder msg = new StringBuilder();
|
||||||
|
msg.append("Failed after ").append(numVerified).append("!");
|
||||||
|
msg.append("Expected=").append(Bytes.toStringBinary(expected));
|
||||||
|
msg.append("Got:\n");
|
||||||
|
for (Cell kv : res.listCells()) {
|
||||||
|
msg.append(kv.toString());
|
||||||
|
msg.append(" val= ");
|
||||||
|
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
|
||||||
|
msg.append("\n");
|
||||||
|
}
|
||||||
|
throw new RuntimeException(msg.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread that does full scans of the table looking for any partially completed rows.
|
||||||
|
*/
|
||||||
|
public static class AtomicScanReader extends RepeatingTestThread {
|
||||||
|
byte[][] targetFamilies;
|
||||||
|
Table table;
|
||||||
|
Connection connection;
|
||||||
|
AtomicLong numScans = new AtomicLong();
|
||||||
|
AtomicLong numRowsScanned = new AtomicLong();
|
||||||
|
|
||||||
|
public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
|
||||||
|
throws IOException {
|
||||||
|
super(ctx);
|
||||||
|
this.targetFamilies = targetFamilies;
|
||||||
|
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
||||||
|
table = connection.getTable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
Scan s = new Scan();
|
||||||
|
for (byte[] family : targetFamilies) {
|
||||||
|
s.addFamily(family);
|
||||||
|
}
|
||||||
|
ResultScanner scanner = table.getScanner(s);
|
||||||
|
|
||||||
|
for (Result res : scanner) {
|
||||||
|
byte[] gotValue = null;
|
||||||
|
|
||||||
|
for (byte[] family : targetFamilies) {
|
||||||
|
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
||||||
|
byte qualifier[] = Bytes.toBytes("col" + i);
|
||||||
|
byte thisValue[] = res.getValue(family, qualifier);
|
||||||
|
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
|
||||||
|
gotFailure(gotValue, res);
|
||||||
|
}
|
||||||
|
gotValue = thisValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numRowsScanned.getAndIncrement();
|
||||||
|
}
|
||||||
|
numScans.getAndIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void workDone() throws IOException {
|
||||||
|
try {
|
||||||
|
table.close();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void gotFailure(byte[] expected, Result res) {
|
||||||
|
StringBuilder msg = new StringBuilder();
|
||||||
|
msg.append("Failed after ").append(numRowsScanned).append("!");
|
||||||
|
msg.append("Expected=").append(Bytes.toStringBinary(expected));
|
||||||
|
msg.append("Got:\n");
|
||||||
|
for (Cell kv : res.listCells()) {
|
||||||
|
msg.append(kv.toString());
|
||||||
|
msg.append(" val= ");
|
||||||
|
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
|
||||||
|
msg.append("\n");
|
||||||
|
}
|
||||||
|
throw new RuntimeException(msg.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
|
||||||
|
if (!admin.tableExists(TABLE_NAME)) {
|
||||||
|
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
|
||||||
|
Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
|
||||||
|
.forEachOrdered(builder::addColumnFamily);
|
||||||
|
admin.createTable(builder.build());
|
||||||
|
}
|
||||||
|
ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
|
||||||
|
if (cfd.isMobEnabled() != useMob) {
|
||||||
|
admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
|
||||||
|
.setMobEnabled(useMob).setMobThreshold(4).build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTestAtomicity(Admin admin) throws Exception {
|
||||||
|
createTableIfMissing(admin, useMob);
|
||||||
|
TestContext ctx = new TestContext(conf);
|
||||||
|
|
||||||
|
byte rows[][] = new byte[numUniqueRows][];
|
||||||
|
for (int i = 0; i < numUniqueRows; i++) {
|
||||||
|
rows[i] = Bytes.toBytes("test_row_" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<AtomicityWriter> writers = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < numWriters; i++) {
|
||||||
|
AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
|
||||||
|
writers.add(writer);
|
||||||
|
ctx.addThread(writer);
|
||||||
|
}
|
||||||
|
// Add a flusher
|
||||||
|
ctx.addThread(new RepeatingTestThread(ctx) {
|
||||||
|
public void doAnAction() throws Exception {
|
||||||
|
try {
|
||||||
|
admin.flush(TABLE_NAME);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
|
||||||
|
}
|
||||||
|
// Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
|
||||||
|
// we would flush as often as possible. On a running cluster, this isn't practical:
|
||||||
|
// (1) we will cause a lot of load due to all the flushing and compacting
|
||||||
|
// (2) we cannot change the flushing/compacting related Configuration options to try to
|
||||||
|
// alleviate this
|
||||||
|
// (3) it is an unrealistic workload, since no one would actually flush that often.
|
||||||
|
// Therefore, let's flush every minute to have more flushes than usual, but not overload
|
||||||
|
// the running cluster.
|
||||||
|
if (!crazyFlush) {
|
||||||
|
Thread.sleep(60000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
List<AtomicGetReader> getters = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < numGetters; i++) {
|
||||||
|
AtomicGetReader getter =
|
||||||
|
new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
|
||||||
|
getters.add(getter);
|
||||||
|
ctx.addThread(getter);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<AtomicScanReader> scanners = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < numScanners; i++) {
|
||||||
|
AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
|
||||||
|
scanners.add(scanner);
|
||||||
|
ctx.addThread(scanner);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.startThreads();
|
||||||
|
ctx.waitFor(millisToRun);
|
||||||
|
ctx.stop();
|
||||||
|
|
||||||
|
LOG.info("Finished test. Writers:");
|
||||||
|
for (AtomicityWriter writer : writers) {
|
||||||
|
LOG.info(" wrote " + writer.numWritten.get());
|
||||||
|
}
|
||||||
|
LOG.info("Readers:");
|
||||||
|
for (AtomicGetReader reader : getters) {
|
||||||
|
LOG.info(" read " + reader.numRead.get());
|
||||||
|
}
|
||||||
|
LOG.info("Scanners:");
|
||||||
|
for (AtomicScanReader scanner : scanners) {
|
||||||
|
LOG.info(" scanned " + scanner.numScans.get());
|
||||||
|
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
Configuration c = HBaseConfiguration.create();
|
||||||
|
int status;
|
||||||
|
try {
|
||||||
|
AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
|
||||||
|
status = ToolRunner.run(c, test, args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exiting due to error", e);
|
||||||
|
status = -1;
|
||||||
|
}
|
||||||
|
System.exit(status);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,434 +18,103 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.FAMILIES;
|
||||||
import java.util.List;
|
import static org.apache.hadoop.hbase.AcidGuaranteesTestTool.TABLE_NAME;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.stream.Stream;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test case that uses multiple threads to read and write multifamily rows
|
* Test case that uses multiple threads to read and write multifamily rows into a table, verifying
|
||||||
* into a table, verifying that reads never see partially-complete writes.
|
* that reads never see partially-complete writes. This can run as a junit test, or with a main()
|
||||||
*
|
* function which runs against a real cluster (eg for testing with failures, region movement, etc)
|
||||||
* This can run as a junit test, or with a main() function which runs against
|
|
||||||
* a real cluster (eg for testing with failures, region movement, etc)
|
|
||||||
*/
|
*/
|
||||||
@Category({FlakeyTests.class, MediumTests.class})
|
@Category({ FlakeyTests.class, LargeTests.class })
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class TestAcidGuarantees implements Tool {
|
public class TestAcidGuarantees {
|
||||||
@Parameterized.Parameters
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{index}: compType={0}")
|
||||||
public static Object[] data() {
|
public static Object[] data() {
|
||||||
return new Object[] { "NONE", "BASIC", "EAGER" };
|
return new Object[] { "NONE", "BASIC", "EAGER" };
|
||||||
}
|
}
|
||||||
protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
|
|
||||||
public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
|
|
||||||
public static final byte [] FAMILY_A = Bytes.toBytes("A");
|
|
||||||
public static final byte [] FAMILY_B = Bytes.toBytes("B");
|
|
||||||
public static final byte [] FAMILY_C = Bytes.toBytes("C");
|
|
||||||
public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
|
|
||||||
|
|
||||||
public static final byte[][] FAMILIES = new byte[][] {
|
@Parameter
|
||||||
FAMILY_A, FAMILY_B, FAMILY_C };
|
public String compType;
|
||||||
|
|
||||||
private HBaseTestingUtility util;
|
private AcidGuaranteesTestTool tool = new AcidGuaranteesTestTool();
|
||||||
|
|
||||||
public static int NUM_COLS_TO_CHECK = 50;
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
// when run as main
|
|
||||||
private Configuration conf;
|
|
||||||
private ExecutorService sharedPool = null;
|
|
||||||
|
|
||||||
private void createTableIfMissing(boolean useMob)
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
util.createTable(TABLE_NAME, FAMILIES);
|
|
||||||
} catch (TableExistsException tee) {
|
|
||||||
}
|
|
||||||
|
|
||||||
if (useMob) {
|
|
||||||
HTableDescriptor htd = new HTableDescriptor(util.getAdmin().getTableDescriptor(TABLE_NAME));
|
|
||||||
HColumnDescriptor hcd = htd.getColumnFamilies()[0];
|
|
||||||
// force mob enabled such that all data is mob data
|
|
||||||
hcd.setMobEnabled(true);
|
|
||||||
hcd.setMobThreshold(4);
|
|
||||||
util.getAdmin().modifyColumnFamily(TABLE_NAME, hcd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public TestAcidGuarantees(String compType) {
|
|
||||||
// Set small flush size for minicluster so we exercise reseeking scanners
|
// Set small flush size for minicluster so we exercise reseeking scanners
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
|
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128 * 1024));
|
||||||
// prevent aggressive region split
|
// prevent aggressive region split
|
||||||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||||
ConstantSizeRegionSplitPolicy.class.getName());
|
ConstantSizeRegionSplitPolicy.class.getName());
|
||||||
conf.setInt("hfile.format.version", 3); // for mob tests
|
conf.setInt("hfile.format.version", 3); // for mob tests
|
||||||
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
|
UTIL.startMiniCluster(1);
|
||||||
if(MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
|
|
||||||
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
|
|
||||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.9);
|
|
||||||
}
|
|
||||||
util = new HBaseTestingUtility(conf);
|
|
||||||
sharedPool = createThreadPool();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setHBaseTestingUtil(HBaseTestingUtility util) {
|
@AfterClass
|
||||||
this.util = util;
|
public static void tearDownAfterClass() throws Exception {
|
||||||
}
|
UTIL.shutdownMiniCluster();
|
||||||
|
|
||||||
private ExecutorService createThreadPool() {
|
|
||||||
|
|
||||||
int maxThreads = 256;
|
|
||||||
int coreThreads = 128;
|
|
||||||
|
|
||||||
long keepAliveTime = 60;
|
|
||||||
BlockingQueue<Runnable> workQueue =
|
|
||||||
new LinkedBlockingQueue<Runnable>(maxThreads *
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
|
|
||||||
|
|
||||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
|
||||||
coreThreads,
|
|
||||||
maxThreads,
|
|
||||||
keepAliveTime,
|
|
||||||
TimeUnit.SECONDS,
|
|
||||||
workQueue,
|
|
||||||
Threads.newDaemonThreadFactory(toString() + "-shared"));
|
|
||||||
tpe.allowCoreThreadTimeOut(true);
|
|
||||||
return tpe;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ExecutorService getSharedThreadPool() {
|
|
||||||
return sharedPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Thread that does random full-row writes into a table.
|
|
||||||
*/
|
|
||||||
public static class AtomicityWriter extends RepeatingTestThread {
|
|
||||||
Random rand = new Random();
|
|
||||||
byte data[] = new byte[10];
|
|
||||||
byte targetRows[][];
|
|
||||||
byte targetFamilies[][];
|
|
||||||
Connection connection;
|
|
||||||
Table table;
|
|
||||||
AtomicLong numWritten = new AtomicLong();
|
|
||||||
|
|
||||||
public AtomicityWriter(TestContext ctx, byte targetRows[][],
|
|
||||||
byte targetFamilies[][], ExecutorService pool) throws IOException {
|
|
||||||
super(ctx);
|
|
||||||
this.targetRows = targetRows;
|
|
||||||
this.targetFamilies = targetFamilies;
|
|
||||||
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
|
||||||
table = connection.getTable(TABLE_NAME);
|
|
||||||
}
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
// Pick a random row to write into
|
|
||||||
byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
|
|
||||||
Put p = new Put(targetRow);
|
|
||||||
rand.nextBytes(data);
|
|
||||||
|
|
||||||
for (byte[] family : targetFamilies) {
|
|
||||||
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
|
||||||
byte qualifier[] = Bytes.toBytes("col" + i);
|
|
||||||
p.addColumn(family, qualifier, data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
table.put(p);
|
|
||||||
numWritten.getAndIncrement();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void workDone() throws IOException {
|
|
||||||
try {
|
|
||||||
table.close();
|
|
||||||
} finally {
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Thread that does single-row reads in a table, looking for partially
|
|
||||||
* completed rows.
|
|
||||||
*/
|
|
||||||
public static class AtomicGetReader extends RepeatingTestThread {
|
|
||||||
byte targetRow[];
|
|
||||||
byte targetFamilies[][];
|
|
||||||
Connection connection;
|
|
||||||
Table table;
|
|
||||||
int numVerified = 0;
|
|
||||||
AtomicLong numRead = new AtomicLong();
|
|
||||||
|
|
||||||
public AtomicGetReader(TestContext ctx, byte targetRow[],
|
|
||||||
byte targetFamilies[][], ExecutorService pool) throws IOException {
|
|
||||||
super(ctx);
|
|
||||||
this.targetRow = targetRow;
|
|
||||||
this.targetFamilies = targetFamilies;
|
|
||||||
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
|
||||||
table = connection.getTable(TABLE_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
Get g = new Get(targetRow);
|
|
||||||
Result res = table.get(g);
|
|
||||||
byte[] gotValue = null;
|
|
||||||
if (res.getRow() == null) {
|
|
||||||
// Trying to verify but we didn't find the row - the writing
|
|
||||||
// thread probably just hasn't started writing yet, so we can
|
|
||||||
// ignore this action
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (byte[] family : targetFamilies) {
|
|
||||||
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
|
||||||
byte qualifier[] = Bytes.toBytes("col" + i);
|
|
||||||
byte thisValue[] = res.getValue(family, qualifier);
|
|
||||||
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
|
|
||||||
gotFailure(gotValue, res);
|
|
||||||
}
|
|
||||||
numVerified++;
|
|
||||||
gotValue = thisValue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
numRead.getAndIncrement();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void workDone() throws IOException {
|
|
||||||
try {
|
|
||||||
table.close();
|
|
||||||
} finally {
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void gotFailure(byte[] expected, Result res) {
|
|
||||||
StringBuilder msg = new StringBuilder();
|
|
||||||
msg.append("Failed after ").append(numVerified).append("!");
|
|
||||||
msg.append("Expected=").append(Bytes.toStringBinary(expected));
|
|
||||||
msg.append("Got:\n");
|
|
||||||
for (Cell kv : res.listCells()) {
|
|
||||||
msg.append(kv.toString());
|
|
||||||
msg.append(" val= ");
|
|
||||||
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
|
|
||||||
msg.append("\n");
|
|
||||||
}
|
|
||||||
throw new RuntimeException(msg.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Thread that does full scans of the table looking for any partially completed
|
|
||||||
* rows.
|
|
||||||
*/
|
|
||||||
public static class AtomicScanReader extends RepeatingTestThread {
|
|
||||||
byte targetFamilies[][];
|
|
||||||
Table table;
|
|
||||||
Connection connection;
|
|
||||||
AtomicLong numScans = new AtomicLong();
|
|
||||||
AtomicLong numRowsScanned = new AtomicLong();
|
|
||||||
|
|
||||||
public AtomicScanReader(TestContext ctx,
|
|
||||||
byte targetFamilies[][], ExecutorService pool) throws IOException {
|
|
||||||
super(ctx);
|
|
||||||
this.targetFamilies = targetFamilies;
|
|
||||||
connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
|
|
||||||
table = connection.getTable(TABLE_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
Scan s = new Scan();
|
|
||||||
for (byte[] family : targetFamilies) {
|
|
||||||
s.addFamily(family);
|
|
||||||
}
|
|
||||||
ResultScanner scanner = table.getScanner(s);
|
|
||||||
|
|
||||||
for (Result res : scanner) {
|
|
||||||
byte[] gotValue = null;
|
|
||||||
|
|
||||||
for (byte[] family : targetFamilies) {
|
|
||||||
for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
|
|
||||||
byte qualifier[] = Bytes.toBytes("col" + i);
|
|
||||||
byte thisValue[] = res.getValue(family, qualifier);
|
|
||||||
if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
|
|
||||||
gotFailure(gotValue, res);
|
|
||||||
}
|
|
||||||
gotValue = thisValue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
numRowsScanned.getAndIncrement();
|
|
||||||
}
|
|
||||||
numScans.getAndIncrement();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void workDone() throws IOException {
|
|
||||||
try {
|
|
||||||
table.close();
|
|
||||||
} finally {
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void gotFailure(byte[] expected, Result res) {
|
|
||||||
StringBuilder msg = new StringBuilder();
|
|
||||||
msg.append("Failed after ").append(numRowsScanned).append("!");
|
|
||||||
msg.append("Expected=").append(Bytes.toStringBinary(expected));
|
|
||||||
msg.append("Got:\n");
|
|
||||||
for (Cell kv : res.listCells()) {
|
|
||||||
msg.append(kv.toString());
|
|
||||||
msg.append(" val= ");
|
|
||||||
msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
|
|
||||||
msg.append("\n");
|
|
||||||
}
|
|
||||||
throw new RuntimeException(msg.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void runTestAtomicity(long millisToRun,
|
|
||||||
int numWriters,
|
|
||||||
int numGetters,
|
|
||||||
int numScanners,
|
|
||||||
int numUniqueRows) throws Exception {
|
|
||||||
runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void runTestAtomicity(long millisToRun,
|
|
||||||
int numWriters,
|
|
||||||
int numGetters,
|
|
||||||
int numScanners,
|
|
||||||
int numUniqueRows,
|
|
||||||
final boolean systemTest) throws Exception {
|
|
||||||
runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void runTestAtomicity(long millisToRun,
|
|
||||||
int numWriters,
|
|
||||||
int numGetters,
|
|
||||||
int numScanners,
|
|
||||||
int numUniqueRows,
|
|
||||||
final boolean systemTest,
|
|
||||||
final boolean useMob) throws Exception {
|
|
||||||
|
|
||||||
createTableIfMissing(useMob);
|
|
||||||
// set the max threads to avoid java.lang.OutOfMemoryError: unable to create new native thread
|
|
||||||
util.getConfiguration().setInt("hbase.hconnection.threads.max", 40);
|
|
||||||
TestContext ctx = new TestContext(util.getConfiguration());
|
|
||||||
|
|
||||||
byte rows[][] = new byte[numUniqueRows][];
|
|
||||||
for (int i = 0; i < numUniqueRows; i++) {
|
|
||||||
rows[i] = Bytes.toBytes("test_row_" + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<AtomicityWriter> writers = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < numWriters; i++) {
|
|
||||||
AtomicityWriter writer = new AtomicityWriter(
|
|
||||||
ctx, rows, FAMILIES, getSharedThreadPool());
|
|
||||||
writers.add(writer);
|
|
||||||
ctx.addThread(writer);
|
|
||||||
}
|
|
||||||
// Add a flusher
|
|
||||||
ctx.addThread(new RepeatingTestThread(ctx) {
|
|
||||||
Admin admin = util.getAdmin();
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
try {
|
|
||||||
admin.flush(TABLE_NAME);
|
|
||||||
} catch(IOException ioe) {
|
|
||||||
LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
|
|
||||||
}
|
|
||||||
// Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
|
|
||||||
// we would flush as often as possible. On a running cluster, this isn't practical:
|
|
||||||
// (1) we will cause a lot of load due to all the flushing and compacting
|
|
||||||
// (2) we cannot change the flushing/compacting related Configuration options to try to
|
|
||||||
// alleviate this
|
|
||||||
// (3) it is an unrealistic workload, since no one would actually flush that often.
|
|
||||||
// Therefore, let's flush every minute to have more flushes than usual, but not overload
|
|
||||||
// the running cluster.
|
|
||||||
if (systemTest) Thread.sleep(60000);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
List<AtomicGetReader> getters = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < numGetters; i++) {
|
|
||||||
AtomicGetReader getter = new AtomicGetReader(
|
|
||||||
ctx, rows[i % numUniqueRows], FAMILIES, getSharedThreadPool());
|
|
||||||
getters.add(getter);
|
|
||||||
ctx.addThread(getter);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<AtomicScanReader> scanners = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < numScanners; i++) {
|
|
||||||
AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, getSharedThreadPool());
|
|
||||||
scanners.add(scanner);
|
|
||||||
ctx.addThread(scanner);
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.startThreads();
|
|
||||||
ctx.waitFor(millisToRun);
|
|
||||||
ctx.stop();
|
|
||||||
|
|
||||||
LOG.info("Finished test. Writers:");
|
|
||||||
for (AtomicityWriter writer : writers) {
|
|
||||||
LOG.info(" wrote " + writer.numWritten.get());
|
|
||||||
}
|
|
||||||
LOG.info("Readers:");
|
|
||||||
for (AtomicGetReader reader : getters) {
|
|
||||||
LOG.info(" read " + reader.numRead.get());
|
|
||||||
}
|
|
||||||
LOG.info("Scanners:");
|
|
||||||
for (AtomicScanReader scanner : scanners) {
|
|
||||||
LOG.info(" scanned " + scanner.numScans.get());
|
|
||||||
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
util.startMiniCluster(1);
|
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||||
|
.setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, compType);
|
||||||
|
if (MemoryCompactionPolicy.valueOf(compType) == MemoryCompactionPolicy.EAGER) {
|
||||||
|
builder.setValue(MemStoreLAB.USEMSLAB_KEY, "false");
|
||||||
|
builder.setValue(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, "0.9");
|
||||||
|
}
|
||||||
|
Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
|
||||||
|
.forEachOrdered(builder::addColumnFamily);
|
||||||
|
UTIL.getAdmin().createTable(builder.build());
|
||||||
|
tool.setConf(UTIL.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
util.shutdownMiniCluster();
|
UTIL.deleteTable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
|
||||||
|
int numUniqueRows) throws Exception {
|
||||||
|
runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTestAtomicity(long millisToRun, int numWriters, int numGetters, int numScanners,
|
||||||
|
int numUniqueRows, boolean useMob) throws Exception {
|
||||||
|
List<String> args = Lists.newArrayList("-millis", String.valueOf(millisToRun), "-numWriters",
|
||||||
|
String.valueOf(numWriters), "-numGetters", String.valueOf(numGetters), "-numScanners",
|
||||||
|
String.valueOf(numScanners), "-numUniqueRows", String.valueOf(numUniqueRows), "-crazyFlush");
|
||||||
|
if (useMob) {
|
||||||
|
args.add("-useMob");
|
||||||
|
}
|
||||||
|
tool.run(args.toArray(new String[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -465,67 +134,16 @@ public class TestAcidGuarantees implements Tool {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMobGetAtomicity() throws Exception {
|
public void testMobGetAtomicity() throws Exception {
|
||||||
boolean systemTest = false;
|
runTestAtomicity(20000, 5, 5, 0, 3, true);
|
||||||
boolean useMob = true;
|
|
||||||
runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMobScanAtomicity() throws Exception {
|
public void testMobScanAtomicity() throws Exception {
|
||||||
boolean systemTest = false;
|
runTestAtomicity(20000, 5, 0, 5, 3, true);
|
||||||
boolean useMob = true;
|
|
||||||
runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMobMixedAtomicity() throws Exception {
|
public void testMobMixedAtomicity() throws Exception {
|
||||||
boolean systemTest = false;
|
runTestAtomicity(20000, 5, 2, 2, 3, true);
|
||||||
boolean useMob = true;
|
|
||||||
runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////
|
|
||||||
// Tool interface
|
|
||||||
////////////////////////////////////////////////////////////////////////////
|
|
||||||
@Override
|
|
||||||
public Configuration getConf() {
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setConf(Configuration c) {
|
|
||||||
this.conf = c;
|
|
||||||
this.util = new HBaseTestingUtility(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int run(String[] arg0) throws Exception {
|
|
||||||
Configuration c = getConf();
|
|
||||||
int millis = c.getInt("millis", 5000);
|
|
||||||
int numWriters = c.getInt("numWriters", 50);
|
|
||||||
int numGetters = c.getInt("numGetters", 2);
|
|
||||||
int numScanners = c.getInt("numScanners", 2);
|
|
||||||
int numUniqueRows = c.getInt("numUniqueRows", 3);
|
|
||||||
boolean useMob = c.getBoolean("useMob",false);
|
|
||||||
assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3";
|
|
||||||
runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String args[]) throws Exception {
|
|
||||||
Configuration c = HBaseConfiguration.create();
|
|
||||||
int status;
|
|
||||||
try {
|
|
||||||
TestAcidGuarantees test = new TestAcidGuarantees(CompactingMemStore
|
|
||||||
.COMPACTING_MEMSTORE_TYPE_DEFAULT);
|
|
||||||
status = ToolRunner.run(c, test, args);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Exiting due to error", e);
|
|
||||||
status = -1;
|
|
||||||
}
|
|
||||||
System.exit(status);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue