HBASE-10184 [Online Schema Change]: Add additional tests for online schema change (Aleksandr Shulman)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577375 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-03-13 23:25:37 +00:00
parent 5f300467b8
commit bb8594380d
4 changed files with 667 additions and 48 deletions

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -61,14 +62,23 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException;
@ -90,6 +100,7 @@ public class TestAdmin {
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean(
"hbase.master.enabletable.roundrobin", true);
@ -366,24 +377,28 @@ public class TestAdmin {
assertTrue(exceptionThrown);
}
}
/**
* Verify schema modification takes.
*
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testOnlineChangeTableSchema() throws IOException, InterruptedException {
final TableName tableName =
TableName.valueOf("changeTableSchemaOnline");
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
"hbase.online.schema.update.enable", true);
HTableDescriptor [] tables = admin.listTables();
@Test(timeout = 300000)
public void testOnlineChangeTableSchema() throws IOException,
InterruptedException {
final TableName tableName = TableName.valueOf("changeTableSchemaOnline");
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
.setBoolean("hbase.online.schema.update.enable", true);
HTableDescriptor[] tables = admin.listTables();
int numTables = tables.length;
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
.getTableRegions(tableName).size();
// FIRST, do htabledescriptor changes.
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
// Make a copy and assert copy is good.
@ -391,7 +406,7 @@ public class TestAdmin {
assertTrue(htd.equals(copy));
// Now amend the copy. Introduce differences.
long newFlushSize = htd.getMemStoreFlushSize() / 2;
if (newFlushSize <=0) {
if (newFlushSize <= 0) {
newFlushSize = HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE / 2;
}
copy.setMemStoreFlushSize(newFlushSize);
@ -418,7 +433,7 @@ public class TestAdmin {
int maxversions = hcd.getMaxVersions();
final int newMaxVersions = maxversions + 1;
hcd.setMaxVersions(newMaxVersions);
final byte [] hcdName = hcd.getName();
final byte[] hcdName = hcd.getName();
expectedException = false;
try {
this.admin.modifyColumn(tableName, hcd);
@ -428,7 +443,10 @@ public class TestAdmin {
assertFalse(expectedException);
modifiedHtd = this.admin.getTableDescriptor(tableName);
HColumnDescriptor modifiedHcd = modifiedHtd.getFamily(hcdName);
assertEquals(newMaxVersions, modifiedHcd.getMaxVersions());
assertEquals(
newMaxVersions,
waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(),
tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
// Try adding a column
assertFalse(this.admin.isTableDisabled(tableName));
@ -454,6 +472,38 @@ public class TestAdmin {
hcd = modifiedHtd.getFamily(xtracol.getName());
assertTrue(hcd == null);
// Modify bloom filter
countOfFamilies = modifiedHtd.getFamilies().size();
assertTrue(countOfFamilies > 0);
hcd = modifiedHtd.getFamilies().iterator().next();
BloomType initialBT = hcd.getBloomFilterType();
BloomType newBloomType = null;
BloomType[] possibleBloomFilters = BloomType.values();
for (BloomType type : possibleBloomFilters) {
if (initialBT == null || !initialBT.equals(type)) {
newBloomType = type;
break;
}
}
hcd.setBloomFilterType(newBloomType);
expectedException = false;
try {
this.admin.modifyColumn(tableName, hcd);
} catch (TableNotDisabledException re) {
expectedException = true;
}
assertFalse(expectedException);
modifiedHtd = this.admin.getTableDescriptor(tableName);
modifiedHcd = modifiedHtd.getFamily(hcdName);
assertEquals(
newBloomType,
waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(),
tableName, EXPECTED_NUM_REGIONS).getBloomFilterType());
// Delete the table
this.admin.disableTable(tableName);
this.admin.deleteTable(tableName);
@ -461,12 +511,12 @@ public class TestAdmin {
assertFalse(this.admin.tableExists(tableName));
}
@Test (timeout=300000)
@Test(timeout = 300000)
public void testShouldFailOnlineSchemaUpdateIfOnlineSchemaIsNotEnabled()
throws Exception {
final byte[] tableName = Bytes.toBytes("changeTableSchemaOnlineFailure");
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
"hbase.online.schema.update.enable", false);
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
.setBoolean("hbase.online.schema.update.enable", false);
HTableDescriptor[] tables = admin.listTables();
int numTables = tables.length;
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
@ -480,7 +530,7 @@ public class TestAdmin {
assertTrue(htd.equals(copy));
// Now amend the copy. Introduce differences.
long newFlushSize = htd.getMemStoreFlushSize() / 2;
if (newFlushSize <=0) {
if (newFlushSize <= 0) {
newFlushSize = HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE / 2;
}
copy.setMemStoreFlushSize(newFlushSize);
@ -496,8 +546,79 @@ public class TestAdmin {
assertTrue("Online schema update should not happen.", expectedException);
// Reset the value for the other tests
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
"hbase.online.schema.update.enable", true);
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
.setBoolean("hbase.online.schema.update.enable", true);
}
@Test
public void testOnlineChangeReplicationScope() throws Exception {
final TableName tableName = TableName.valueOf("changeReplicationTable");
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
.setBoolean("hbase.online.schema.update.enable", true);
HTableDescriptor[] tables = admin.listTables();
int numTables = tables.length;
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
.getTableRegions(tableName).size();
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
// Make a copy and assert copy is good.
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
HColumnDescriptor originalHCD = htd.getFamily(HConstants.CATALOG_FAMILY);
assertEquals(
"Replication is enabled by default, which should not be the case", 0,
hcd.getScope());
originalHCD.setScope(1);
this.admin.modifyColumn(tableName, originalHCD);
// verify that the replication scope is off (0) by default
HColumnDescriptor[] hcds = this.admin.getTableDescriptor(tableName)
.getColumnFamilies();
assertEquals("Unexpected number of column families returned", 1,
hcds.length);
assertEquals(
1,
waitForColumnSchemasToSettle(TEST_UTIL.getHBaseCluster(), tableName,
EXPECTED_NUM_REGIONS).getScope());
this.admin.disableTable(tableName);
this.admin.deleteTable(tableName);
this.admin.listTables();
assertFalse(this.admin.tableExists(tableName));
}
@Test
public void testOnlineSetTableOwner() throws Exception {
final TableName tableName = TableName.valueOf("changeTableOwnerTable");
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
.setBoolean("hbase.online.schema.update.enable", true);
HTableDescriptor[] tables = admin.listTables();
int numTables = tables.length;
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
// Make a copy and assert copy is good.
assertEquals("There is an owner by default, which should not be the case",
null, htd.getOwnerString());
htd.setOwnerString("someUser"); // does this need to be a valid user
admin.modifyTable(tableName, htd);
// verify that the replication scope is off (0) by default
HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(tableName);
assertEquals("Owner was not set", modifiedHtd.getOwnerString(), "someUser");
}
/**
@ -1719,4 +1840,67 @@ public class TestAdmin {
ct.stop();
}
}
public static HColumnDescriptor waitForColumnSchemasToSettle(
MiniHBaseCluster miniCluster, TableName tableName, int totalRegions)
throws InterruptedException {
Thread.sleep(2000); // wait 2s so that at least some of the RSes have the
// info
Set<HColumnDescriptor> descriptorSet = new HashSet<HColumnDescriptor>();
final int MAX_POLLS = 5;
int numRegionsEncountered = 0;
for (int i = 0; i < MAX_POLLS; i++) {
for (JVMClusterUtil.RegionServerThread rst : miniCluster
.getRegionServerThreads()) {
for (HRegion hri : rst.getRegionServer().getOnlineRegions(tableName)) {
numRegionsEncountered++;
HColumnDescriptor hcd = hri.getTableDesc().getColumnFamilies()[0];
descriptorSet.add(hcd);
}
}
if (descriptorSet.size() == 1) {
break;
}
Thread.sleep(2000);
}
if (descriptorSet.size() != 1) {
System.out
.println("FAIL: HColumnDescriptor definition did not settle. Here is the output:");
Iterator<HColumnDescriptor> hcIter = descriptorSet.iterator();
while (hcIter.hasNext()) {
System.out.println("HCD entry: " + hcIter.next());
}
fail("HColumnDescription did not settle as expected.");
}
assertEquals("The number of regions did not match. Expected "
+ totalRegions + " but received " + numRegionsEncountered,
totalRegions, numRegionsEncountered);
return descriptorSet.iterator().next();
}
public static void verifyBloomFilterPropertyOnEachRS(
MiniHBaseCluster miniCluster, TableName tableName, BloomType expectedType)
throws Exception {
for (JVMClusterUtil.RegionServerThread rst : miniCluster
.getRegionServerThreads()) {
for (HRegion hri : rst.getRegionServer().getOnlineRegions(tableName)) {
assertEquals(
"The bloom filter did not match expected value " + expectedType
+ " on RS " + rst.getName() + " region "
+ hri.getRegionNameAsString(), expectedType, hri.getTableDesc()
.getColumnFamilies()[0].getBloomFilterType());
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -31,10 +32,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TestAdmin;
import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Counter;
@ -56,8 +65,9 @@ public class TestRowCounter {
private final static String COL_FAM = "col_fam";
private final static String COL1 = "c1";
private final static String COL2 = "c2";
private final static int TOTAL_ROWS = 10;
private final static int ROWS_WITH_ONE_COL = 2;
private final static int NUM_ONLINE_CHANGES = 4;
private final static int TOTAL_ROWS = 100;
private final static int ROWS_WITH_ONE_COL = 20;
/**
* @throws java.lang.Exception
@ -91,21 +101,43 @@ public class TestRowCounter {
String[] args = new String[] {
TABLE_NAME
};
runRowCount(args, 10);
runRowCount(args, TOTAL_ROWS);
}
/**
* Test a case when the column specified in command line arguments is
* exclusive for few rows.
*
* @throws Exception
*/
@Test
public void testRowCounterWithOnlineSchemaChange() throws Exception {
String[] args = new String[] { TABLE_NAME };
final TableName tableName = TableName.valueOf(TABLE_NAME);
HTableDescriptor htd = TEST_UTIL.getHBaseAdmin().getTableDescriptor(
tableName);
final int INITAL_MAX_VERSIONS = htd.getFamilies().iterator().next()
.getMaxVersions();
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
.getTableRegions(tableName).size();
runRowCounterWithOnlineSchemaChange(args, TOTAL_ROWS);
final int FINAL_MAX_VERSIONS = TestAdmin.waitForColumnSchemasToSettle(
TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS)
.getMaxVersions();
assertEquals(
"There was a mismatch in the number of online schema modifications that were created",
FINAL_MAX_VERSIONS, INITAL_MAX_VERSIONS + NUM_ONLINE_CHANGES);
}
/**
* Test a case when the column specified in command line arguments is
* exclusive for few rows.
*
* @throws Exception
*/
@Test
public void testRowCounterExclusiveColumn() throws Exception {
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1
};
runRowCount(args, 8);
runRowCount(args, TOTAL_ROWS - ROWS_WITH_ONE_COL);
}
/**
@ -119,7 +151,7 @@ public class TestRowCounter {
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL2
};
runRowCount(args, 10);
runRowCount(args, TOTAL_ROWS);
}
/**
@ -142,10 +174,43 @@ public class TestRowCounter {
assertEquals(expectedCount, counter.getValue());
}
private void runRowCounterWithOnlineSchemaChange(String[] args,
int expectedCount) throws Exception {
GenericOptionsParser opts = new GenericOptionsParser(
TEST_UTIL.getConfiguration(), args);
Configuration conf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = RowCounter.createSubmittableJob(conf, args);
// This is where we'd want to start a background operation to make change on
// the table
BackgroundSchemaChangeThread schemaChangeThread = new BackgroundSchemaChangeThread(
TEST_UTIL.getHBaseAdmin(), TableName.valueOf(TABLE_NAME),
NUM_ONLINE_CHANGES);
schemaChangeThread.start();
job.waitForCompletion(true);
String trackingURL = job.getHistoryUrl();
String trackingURL2 = job.getTrackingURL();
System.out.println("Tracking URL is: " + trackingURL2);
schemaChangeThread.join();
// this is where we'd have the thread returning
//might be a timing issue - if it takes too long, then that service is just down. stupid.
//it might also be an issue of asking for the tracking url. that may kill the history server (nope. it's a time thing).
assertTrue(job.isSuccessful());
Counter counter = job.getCounters().findCounter(
RowCounterMapper.Counters.ROWS);
assertEquals(expectedCount, counter.getValue());
}
/**
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
* two columns, Few have one.
*
*
* @param table
* @throws IOException
*/
@ -226,4 +291,89 @@ public class TestRowCounter {
}
public class BackgroundSchemaChangeThread extends Thread {
private int numOnlineChanges;
HBaseAdmin admin;
TableName tableName;
public BackgroundSchemaChangeThread(HBaseAdmin admin, TableName tableName,
int numOnlineChanges) throws IOException {
this.admin = admin;
this.tableName = tableName;
this.numOnlineChanges = numOnlineChanges;
if (admin == null) {
throw new IllegalArgumentException(
"[Test Error]: Provided admin should not be null");
}
}
@Override
public void run() {
final long START_TIME = System.currentTimeMillis();
final int ONLINE_CHANGE_TIMEOUT = 200000;
HTableDescriptor htd = null;
try {
htd = admin.getTableDescriptor(tableName);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("Fail: Issue pulling table descriptor");
}
HColumnDescriptor hcd = null;
assertTrue(htd != null);
final int countOfFamilies = htd.getFamilies().size();
assertTrue(countOfFamilies > 0);
boolean expectedException = false;
int numIterations = 0;
while (numIterations < numOnlineChanges) {
if (System.currentTimeMillis() - START_TIME > ONLINE_CHANGE_TIMEOUT) {
fail("Fail: Timed out reaching before required snapshot count. Only had "
+ numIterations + " updates");
}
hcd = htd.getFamilies().iterator().next();
int maxversions = hcd.getMaxVersions();
int newMaxVersions = maxversions + 1;
System.out.println("Setting max versions on CF to " + newMaxVersions);
hcd.setMaxVersions(newMaxVersions);
final byte[] hcdName = hcd.getName();
expectedException = false;
try {
this.admin.modifyColumn(tableName, hcd);
} catch (TableNotDisabledException re) {
expectedException = true;
} catch (IOException e) {
e.printStackTrace();
fail("Fail: IO Issue while modifying column");
}
assertFalse(expectedException);
try {
int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName).size();
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
Thread.sleep(2000);
} catch (TableNotFoundException e) {
e.printStackTrace();
fail("Fail: Table not found.");
} catch (IOException e) {
e.printStackTrace();
fail("Fail: IO Issue while modifying column");
} catch (InterruptedException e) {
LOG.warn("Sleep was interrupted. This is unusual, but not grounds for TF");
e.printStackTrace();
}
numIterations++;
}
}
}
}

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
@ -34,12 +35,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@ -52,6 +56,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TestAdmin;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -76,6 +81,7 @@ public class TestEndToEndSplitTransaction {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Configuration conf = TEST_UTIL.getConfiguration();
@BeforeClass
public static void beforeAllTests() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
@ -177,11 +183,13 @@ public class TestEndToEndSplitTransaction {
/**
* Tests that the client sees meta table changes as atomic during splits
*/
@Test
public void testFromClientSideWhileSplitting() throws Throwable {
LOG.info("Starting testFromClientSideWhileSplitting");
private void runTestFromClientSideWhileSplitting(boolean onlineSchemaChange) throws Throwable {
final String tableName = "testFromClientSideWhileSplitting" + System.currentTimeMillis();
LOG.info("Starting " + tableName);
final TableName TABLENAME =
TableName.valueOf("testFromClientSideWhileSplitting");
TableName.valueOf(tableName);
final byte[] FAMILY = Bytes.toBytes("family");
//SplitTransaction will update the meta table by offlining the parent region, and adding info
@ -189,7 +197,16 @@ public class TestEndToEndSplitTransaction {
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
Stoppable stopper = new StoppableImplementation();
RegionSplitter regionSplitter = new RegionSplitter(table);
RegionSplitter regionSplitter = null;
if (onlineSchemaChange) {
regionSplitter = new RegionSplitterWithSchemaChange(table);
} else {
regionSplitter = new RegionSplitter(table);
}
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
regionChecker.start();
@ -211,6 +228,16 @@ public class TestEndToEndSplitTransaction {
regionChecker.verify();
}
@Test
public void testFromClientSideOnlineSchemaChangeWhileSplitting() throws Throwable {
runTestFromClientSideWhileSplitting(true);
}
@Test
public void testFromClientSideWhileSplitting() throws Throwable {
runTestFromClientSideWhileSplitting(false);
}
static class RegionSplitter extends Thread {
Throwable ex;
HTable table;
@ -285,6 +312,104 @@ public class TestEndToEndSplitTransaction {
}
}
static class RegionSplitterWithSchemaChange extends RegionSplitter {
RegionSplitterWithSchemaChange(HTable table) throws IOException {
super(table);
}
@Override
public void run() {
try {
Random random = new Random();
for (int i = 0; i < 5; i++) {
NavigableMap<HRegionInfo, ServerName> regions = MetaScanner
.allTableRegions(conf, null, tableName, false);
if (regions.size() == 0) {
continue;
}
int regionIndex = random.nextInt(regions.size());
// pick a random region and split it into two
HRegionInfo region = Iterators.get(regions.keySet().iterator(),
regionIndex);
// pick the mid split point
int start = 0, end = Integer.MAX_VALUE;
if (region.getStartKey().length > 0) {
start = Bytes.toInt(region.getStartKey());
}
if (region.getEndKey().length > 0) {
end = Bytes.toInt(region.getEndKey());
}
int mid = start + ((end - start) / 2);
byte[] splitPoint = Bytes.toBytes(mid);
// put some rows to the regions
addData(start);
addData(mid);
flushAndBlockUntilDone(admin, rs, region.getRegionName());
compactAndBlockUntilDone(admin, rs, region.getRegionName());
log("Initiating region split for:" + region.getRegionNameAsString());
try {
admin.split(region.getRegionName(), splitPoint);
for (int j = 0; j < 5; j++) {
HTableDescriptor htd = null;
try {
htd = admin.getTableDescriptor(tableName);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("Issue pulling table descriptor");
}
HColumnDescriptor hcd = null;
assertTrue(htd != null);
final int countOfFamilies = htd.getFamilies().size();
assertTrue(countOfFamilies > 0);
hcd = htd.getColumnFamilies()[0];
boolean expectedException = false;
assertFalse(expectedException);
int initMaxVersions = hcd.getMaxVersions();
int newMaxVersions = initMaxVersions + 1;
hcd.setMaxVersions(newMaxVersions);
admin.modifyColumn(tableName, hcd);
try {
int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName).size();
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
Thread.sleep(2000);
} catch (TableNotFoundException e) {
e.printStackTrace();
fail("Table not found. Failing.");
} catch (IOException e) {
e.printStackTrace();
fail("IO Issue while modifying column");
} catch (InterruptedException e) {
LOG.warn("Sleep was interrupted. This is unusual, but not grounds for TF");
e.printStackTrace();
}
}
// wait until the split is complete
blockUntilRegionSplit(conf, 50000, region.getRegionName(), true);
} catch (NotServingRegionException ex) {
// ignore
}
}
} catch (Throwable ex) {
this.ex = ex;
}
}
}
/**
* Checks regions using MetaScanner, MetaReader and HTable methods
*/

View File

@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -33,10 +38,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
@ -45,6 +53,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TestAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -57,6 +66,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionReque
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -74,6 +84,8 @@ public class TestHRegionServerBulkLoad {
private final static byte[] QUAL = Bytes.toBytes("qual");
private final static int NUM_CFS = 10;
public static int BLOCKSIZE = 64 * 1024;
public final static int NUM_ROWS = 2048;
public final static int NUM_CF_ITERATIONS = 2;
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
private final static byte[][] families = new byte[NUM_CFS][];
@ -148,7 +160,7 @@ public class TestHRegionServerBulkLoad {
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
createHFile(fs, hfile, fam, QUAL, val, NUM_ROWS);
famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
}
@ -195,6 +207,110 @@ public class TestHRegionServerBulkLoad {
}
}
public static class OnlineSchemaChangeMaxVersionsThread extends
RepeatingTestThread {
private HBaseAdmin admin;
private TableName tableName;
private int totalNumIterations;
byte[][] targetFamilies;
public OnlineSchemaChangeMaxVersionsThread(String tableName,
TestContext ctx, byte targetFamilies[][], int totalNumIterations) {
super(ctx);
this.tableName = TableName.valueOf(tableName);
this.targetFamilies = targetFamilies; // this should be validated, but
// it's only a test-facing API, so I
// can live with this
if (totalNumIterations < 1 || totalNumIterations > 20) {
fail("Unreasonable input provided to schema change thread. Please select a value between 1 and 20");
}
this.totalNumIterations = totalNumIterations;
try {
admin = UTIL.getHBaseAdmin();
Assert.assertNotNull(admin);
} catch (IOException e) {
fail("Not able to get a handle on the hbase admin.");
}
}
@Override
public void doAnAction() throws Exception {
final long START_TIME = System.currentTimeMillis();
final int ONLINE_CHANGE_TIMEOUT = 2000000;
HTableDescriptor htd = null;
try {
htd = admin.getTableDescriptor(tableName);
} catch (IOException ioe) {
ioe.printStackTrace();
fail("Issue pulling table descriptor");
}
HColumnDescriptor hcd = null;
assertTrue(htd != null);
final int countOfFamilies = htd.getFamilies().size();
assertTrue(countOfFamilies > 0);
boolean expectedException = false;
int numIterations = 0;
while (numIterations < totalNumIterations) {
htd = admin.getTableDescriptor(tableName);
if (System.currentTimeMillis() - START_TIME > ONLINE_CHANGE_TIMEOUT) {
fail("Timed out reaching before required modify count. Only had "
+ numIterations + " updates");
}
for (byte[] targetFamily : targetFamilies) {
hcd = htd.getFamily(targetFamily);
int maxversions = hcd.getMaxVersions();
System.out.println("NumIterations is: " + numIterations);
System.out.println("DEBUG: Current number of versions for family "
+ Bytes.toString(targetFamily) + " is " + maxversions);
int newMaxVersions = maxversions + 1;
System.out.println("Setting max versions on CF to " + newMaxVersions
+ " on CF " + Bytes.toString(targetFamily));
hcd.setMaxVersions(newMaxVersions);
final byte[] hcdName = hcd.getName();
expectedException = false;
try {
this.admin.modifyColumn(tableName, hcd);
} catch (TableNotDisabledException re) {
expectedException = true;
} catch (IOException e) {
e.printStackTrace();
fail("IO Issue while modifying column");
}
assertFalse(expectedException);
HColumnDescriptor modifiedHcd;
try {
int EXPECTED_NUM_REGIONS = UTIL.getHBaseAdmin().getTableRegions(tableName).size();
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
Thread.sleep(2000);
} catch (TableNotFoundException e) {
e.printStackTrace();
fail("Table not found. Failing.");
} catch (IOException e) {
e.printStackTrace();
fail("IO Issue while modifying column");
} catch (InterruptedException e) {
System.out
.println("WARN: Sleep was interrupted. This is unusual, but not grounds for TF");
e.printStackTrace();
}
}
numIterations++;
}
}
}
/**
* Thread that does full scans of the table looking for any partially
* completed rows.
@ -262,7 +378,7 @@ public class TestHRegionServerBulkLoad {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
for (int i = 0; i < 10; i++) {
for (int i = 0; i < NUM_CFS; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
@ -279,23 +395,42 @@ public class TestHRegionServerBulkLoad {
public void testAtomicBulkLoad() throws Exception {
String TABLE_NAME = "atomicBulkLoad";
int millisToRun = 30000;
int millisToRun = 100000;
int numScanners = 50;
UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
UTIL.startMiniCluster(1);
try {
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners, false);
} finally {
UTIL.shutdownMiniCluster();
}
}
void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
throws Exception {
setupTable(tableName, 10);
@Test
public void testAtomicBulkLoadWithSchemaChange() throws Exception {
String TABLE_NAME = "atomicBulkLoad";
int millisToRun = 100000;
int numScanners = 50;
UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable",
true);
UTIL.startMiniCluster(1);
try {
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners, true);
} finally {
UTIL.shutdownMiniCluster();
}
}
void runAtomicBulkloadTest(String tableName, int millisToRun,
int numScanners, boolean schemaChange) throws Exception {
setupTable(tableName, NUM_CFS);
TestContext ctx = new TestContext(UTIL.getConfiguration());
HTableDescriptor htd = UTIL.getHBaseAdmin().getTableDescriptor(
TableName.valueOf(tableName));
final int INITIAL_UPDATE_COUNT = htd.getColumnFamilies()[0]
.getMaxVersions();
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
ctx.addThread(loader);
@ -306,6 +441,12 @@ public class TestHRegionServerBulkLoad {
ctx.addThread(scanner);
}
if (schemaChange) {
OnlineSchemaChangeMaxVersionsThread maxVersionsChangeThread = new OnlineSchemaChangeMaxVersionsThread(
tableName, ctx, families, NUM_CF_ITERATIONS);
ctx.addThread(maxVersionsChangeThread);
}
ctx.startThreads();
ctx.waitFor(millisToRun);
ctx.stop();
@ -319,6 +460,27 @@ public class TestHRegionServerBulkLoad {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
// Verification of data insertion
Assert.assertEquals("Incorrect number of rows found.", NUM_ROWS,
UTIL.countRows(new HTable(UTIL.getConfiguration(), tableName)));
if (schemaChange) {
// Verification of data updated
htd = UTIL.getHBaseAdmin().getTableDescriptor(
TableName.valueOf(tableName));
for (byte[] family : families) {
// verify that at least one pass occurred through the loop
HColumnDescriptor hcd = htd.getFamily(family);
assertTrue(
"The full number of iterations for family "
+ Bytes.toString(family) + " was not done. Expecting at least "
+ (NUM_CF_ITERATIONS + INITIAL_UPDATE_COUNT) + " but received "
+ hcd.getMaxVersions(),
hcd.getMaxVersions() >= (NUM_CF_ITERATIONS + INITIAL_UPDATE_COUNT));
}
}
}
/**
@ -330,7 +492,7 @@ public class TestHRegionServerBulkLoad {
Configuration c = HBaseConfiguration.create();
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
test.setConf(c);
test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50, false);
} finally {
System.exit(0); // something hangs (believe it is lru threadpool)
}
@ -339,6 +501,4 @@ public class TestHRegionServerBulkLoad {
private void setConf(Configuration c) {
UTIL = new HBaseTestingUtility(c);
}
}
}