HBASE-10184 [Online Schema Change]: Add additional tests for online schema change (Aleksandr Shulman) REVERTED patch and addendum patch
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2ea3307765
commit
2242036c01
|
@ -27,7 +27,6 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -62,23 +61,14 @@ import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
|
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.junit.After;
|
import org.junit.*;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
@ -100,7 +90,6 @@ public class TestAdmin {
|
||||||
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
|
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
||||||
TEST_UTIL.getConfiguration().setBoolean(
|
TEST_UTIL.getConfiguration().setBoolean(
|
||||||
"hbase.master.enabletable.roundrobin", true);
|
"hbase.master.enabletable.roundrobin", true);
|
||||||
|
@ -380,25 +369,21 @@ public class TestAdmin {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify schema modification takes.
|
* Verify schema modification takes.
|
||||||
*
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testOnlineChangeTableSchema() throws IOException,
|
public void testOnlineChangeTableSchema() throws IOException, InterruptedException {
|
||||||
InterruptedException {
|
final TableName tableName =
|
||||||
final TableName tableName = TableName.valueOf("changeTableSchemaOnline");
|
TableName.valueOf("changeTableSchemaOnline");
|
||||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
|
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
|
||||||
.setBoolean("hbase.online.schema.update.enable", true);
|
"hbase.online.schema.update.enable", true);
|
||||||
HTableDescriptor [] tables = admin.listTables();
|
HTableDescriptor [] tables = admin.listTables();
|
||||||
int numTables = tables.length;
|
int numTables = tables.length;
|
||||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
||||||
tables = this.admin.listTables();
|
tables = this.admin.listTables();
|
||||||
assertEquals(numTables + 1, tables.length);
|
assertEquals(numTables + 1, tables.length);
|
||||||
|
|
||||||
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
|
|
||||||
.getTableRegions(tableName).size();
|
|
||||||
|
|
||||||
// FIRST, do htabledescriptor changes.
|
// FIRST, do htabledescriptor changes.
|
||||||
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
|
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
|
||||||
// Make a copy and assert copy is good.
|
// Make a copy and assert copy is good.
|
||||||
|
@ -443,10 +428,7 @@ public class TestAdmin {
|
||||||
assertFalse(expectedException);
|
assertFalse(expectedException);
|
||||||
modifiedHtd = this.admin.getTableDescriptor(tableName);
|
modifiedHtd = this.admin.getTableDescriptor(tableName);
|
||||||
HColumnDescriptor modifiedHcd = modifiedHtd.getFamily(hcdName);
|
HColumnDescriptor modifiedHcd = modifiedHtd.getFamily(hcdName);
|
||||||
assertEquals(
|
assertEquals(newMaxVersions, modifiedHcd.getMaxVersions());
|
||||||
newMaxVersions,
|
|
||||||
waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(),
|
|
||||||
tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
|
|
||||||
|
|
||||||
// Try adding a column
|
// Try adding a column
|
||||||
assertFalse(this.admin.isTableDisabled(tableName));
|
assertFalse(this.admin.isTableDisabled(tableName));
|
||||||
|
@ -472,38 +454,6 @@ public class TestAdmin {
|
||||||
hcd = modifiedHtd.getFamily(xtracol.getName());
|
hcd = modifiedHtd.getFamily(xtracol.getName());
|
||||||
assertTrue(hcd == null);
|
assertTrue(hcd == null);
|
||||||
|
|
||||||
// Modify bloom filter
|
|
||||||
countOfFamilies = modifiedHtd.getFamilies().size();
|
|
||||||
assertTrue(countOfFamilies > 0);
|
|
||||||
hcd = modifiedHtd.getFamilies().iterator().next();
|
|
||||||
BloomType initialBT = hcd.getBloomFilterType();
|
|
||||||
BloomType newBloomType = null;
|
|
||||||
BloomType[] possibleBloomFilters = BloomType.values();
|
|
||||||
for (BloomType type : possibleBloomFilters) {
|
|
||||||
|
|
||||||
if (initialBT == null || !initialBT.equals(type)) {
|
|
||||||
|
|
||||||
newBloomType = type;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
hcd.setBloomFilterType(newBloomType);
|
|
||||||
expectedException = false;
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.admin.modifyColumn(tableName, hcd);
|
|
||||||
} catch (TableNotDisabledException re) {
|
|
||||||
expectedException = true;
|
|
||||||
}
|
|
||||||
assertFalse(expectedException);
|
|
||||||
modifiedHtd = this.admin.getTableDescriptor(tableName);
|
|
||||||
modifiedHcd = modifiedHtd.getFamily(hcdName);
|
|
||||||
assertEquals(
|
|
||||||
newBloomType,
|
|
||||||
waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(),
|
|
||||||
tableName, EXPECTED_NUM_REGIONS).getBloomFilterType());
|
|
||||||
|
|
||||||
// Delete the table
|
// Delete the table
|
||||||
this.admin.disableTable(tableName);
|
this.admin.disableTable(tableName);
|
||||||
this.admin.deleteTable(tableName);
|
this.admin.deleteTable(tableName);
|
||||||
|
@ -515,8 +465,8 @@ public class TestAdmin {
|
||||||
public void testShouldFailOnlineSchemaUpdateIfOnlineSchemaIsNotEnabled()
|
public void testShouldFailOnlineSchemaUpdateIfOnlineSchemaIsNotEnabled()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final byte[] tableName = Bytes.toBytes("changeTableSchemaOnlineFailure");
|
final byte[] tableName = Bytes.toBytes("changeTableSchemaOnlineFailure");
|
||||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
|
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
|
||||||
.setBoolean("hbase.online.schema.update.enable", false);
|
"hbase.online.schema.update.enable", false);
|
||||||
HTableDescriptor[] tables = admin.listTables();
|
HTableDescriptor[] tables = admin.listTables();
|
||||||
int numTables = tables.length;
|
int numTables = tables.length;
|
||||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
||||||
|
@ -546,79 +496,8 @@ public class TestAdmin {
|
||||||
assertTrue("Online schema update should not happen.", expectedException);
|
assertTrue("Online schema update should not happen.", expectedException);
|
||||||
|
|
||||||
// Reset the value for the other tests
|
// Reset the value for the other tests
|
||||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
|
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration().setBoolean(
|
||||||
.setBoolean("hbase.online.schema.update.enable", true);
|
"hbase.online.schema.update.enable", true);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOnlineChangeReplicationScope() throws Exception {
|
|
||||||
|
|
||||||
final TableName tableName = TableName.valueOf("changeReplicationTable");
|
|
||||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
|
|
||||||
.setBoolean("hbase.online.schema.update.enable", true);
|
|
||||||
HTableDescriptor[] tables = admin.listTables();
|
|
||||||
int numTables = tables.length;
|
|
||||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
|
||||||
tables = this.admin.listTables();
|
|
||||||
assertEquals(numTables + 1, tables.length);
|
|
||||||
|
|
||||||
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
|
|
||||||
.getTableRegions(tableName).size();
|
|
||||||
|
|
||||||
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
|
|
||||||
// Make a copy and assert copy is good.
|
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
|
|
||||||
|
|
||||||
HColumnDescriptor originalHCD = htd.getFamily(HConstants.CATALOG_FAMILY);
|
|
||||||
assertEquals(
|
|
||||||
"Replication is enabled by default, which should not be the case", 0,
|
|
||||||
hcd.getScope());
|
|
||||||
originalHCD.setScope(1);
|
|
||||||
this.admin.modifyColumn(tableName, originalHCD);
|
|
||||||
|
|
||||||
// verify that the replication scope is off (0) by default
|
|
||||||
|
|
||||||
HColumnDescriptor[] hcds = this.admin.getTableDescriptor(tableName)
|
|
||||||
.getColumnFamilies();
|
|
||||||
assertEquals("Unexpected number of column families returned", 1,
|
|
||||||
hcds.length);
|
|
||||||
|
|
||||||
assertEquals(
|
|
||||||
1,
|
|
||||||
waitForColumnSchemasToSettle(TEST_UTIL.getHBaseCluster(), tableName,
|
|
||||||
EXPECTED_NUM_REGIONS).getScope());
|
|
||||||
|
|
||||||
this.admin.disableTable(tableName);
|
|
||||||
this.admin.deleteTable(tableName);
|
|
||||||
this.admin.listTables();
|
|
||||||
assertFalse(this.admin.tableExists(tableName));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOnlineSetTableOwner() throws Exception {
|
|
||||||
|
|
||||||
final TableName tableName = TableName.valueOf("changeTableOwnerTable");
|
|
||||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getConfiguration()
|
|
||||||
.setBoolean("hbase.online.schema.update.enable", true);
|
|
||||||
HTableDescriptor[] tables = admin.listTables();
|
|
||||||
int numTables = tables.length;
|
|
||||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
|
|
||||||
tables = this.admin.listTables();
|
|
||||||
assertEquals(numTables + 1, tables.length);
|
|
||||||
|
|
||||||
HTableDescriptor htd = this.admin.getTableDescriptor(tableName);
|
|
||||||
// Make a copy and assert copy is good.
|
|
||||||
|
|
||||||
assertEquals("There is an owner by default, which should not be the case",
|
|
||||||
null, htd.getOwnerString());
|
|
||||||
|
|
||||||
htd.setOwnerString("someUser"); // does this need to be a valid user
|
|
||||||
admin.modifyTable(tableName, htd);
|
|
||||||
|
|
||||||
// verify that the replication scope is off (0) by default
|
|
||||||
|
|
||||||
HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(tableName);
|
|
||||||
assertEquals("Owner was not set", modifiedHtd.getOwnerString(), "someUser");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1840,67 +1719,4 @@ public class TestAdmin {
|
||||||
ct.stop();
|
ct.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HColumnDescriptor waitForColumnSchemasToSettle(
|
|
||||||
MiniHBaseCluster miniCluster, TableName tableName, int totalRegions)
|
|
||||||
throws InterruptedException {
|
|
||||||
|
|
||||||
Thread.sleep(2000); // wait 2s so that at least some of the RSes have the
|
|
||||||
// info
|
|
||||||
|
|
||||||
Set<HColumnDescriptor> descriptorSet = new HashSet<HColumnDescriptor>();
|
|
||||||
|
|
||||||
final int MAX_POLLS = 5;
|
|
||||||
int numRegionsEncountered = 0;
|
|
||||||
for (int i = 0; i < MAX_POLLS; i++) {
|
|
||||||
for (JVMClusterUtil.RegionServerThread rst : miniCluster
|
|
||||||
.getRegionServerThreads()) {
|
|
||||||
|
|
||||||
for (HRegion hri : rst.getRegionServer().getOnlineRegions(tableName)) {
|
|
||||||
numRegionsEncountered++;
|
|
||||||
HColumnDescriptor hcd = hri.getTableDesc().getColumnFamilies()[0];
|
|
||||||
descriptorSet.add(hcd);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (descriptorSet.size() == 1) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(2000);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (descriptorSet.size() != 1) {
|
|
||||||
System.out
|
|
||||||
.println("FAIL: HColumnDescriptor definition did not settle. Here is the output:");
|
|
||||||
Iterator<HColumnDescriptor> hcIter = descriptorSet.iterator();
|
|
||||||
while (hcIter.hasNext()) {
|
|
||||||
System.out.println("HCD entry: " + hcIter.next());
|
|
||||||
}
|
|
||||||
fail("HColumnDescription did not settle as expected.");
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals("The number of regions did not match. Expected "
|
|
||||||
+ totalRegions + " but received " + numRegionsEncountered,
|
|
||||||
totalRegions, numRegionsEncountered);
|
|
||||||
|
|
||||||
return descriptorSet.iterator().next();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void verifyBloomFilterPropertyOnEachRS(
|
|
||||||
MiniHBaseCluster miniCluster, TableName tableName, BloomType expectedType)
|
|
||||||
throws Exception {
|
|
||||||
|
|
||||||
for (JVMClusterUtil.RegionServerThread rst : miniCluster
|
|
||||||
.getRegionServerThreads()) {
|
|
||||||
|
|
||||||
for (HRegion hri : rst.getRegionServer().getOnlineRegions(tableName)) {
|
|
||||||
|
|
||||||
assertEquals(
|
|
||||||
"The bloom filter did not match expected value " + expectedType
|
|
||||||
+ " on RS " + rst.getName() + " region "
|
|
||||||
+ hri.getRegionNameAsString(), expectedType, hri.getTableDesc()
|
|
||||||
.getColumnFamilies()[0].getBloomFilterType());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -32,18 +31,10 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.TestAdmin;
|
|
||||||
import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
|
import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
@ -65,9 +56,8 @@ public class TestRowCounter {
|
||||||
private final static String COL_FAM = "col_fam";
|
private final static String COL_FAM = "col_fam";
|
||||||
private final static String COL1 = "c1";
|
private final static String COL1 = "c1";
|
||||||
private final static String COL2 = "c2";
|
private final static String COL2 = "c2";
|
||||||
private final static int NUM_ONLINE_CHANGES = 4;
|
private final static int TOTAL_ROWS = 10;
|
||||||
private final static int TOTAL_ROWS = 100;
|
private final static int ROWS_WITH_ONE_COL = 2;
|
||||||
private final static int ROWS_WITH_ONE_COL = 20;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
|
@ -101,29 +91,7 @@ public class TestRowCounter {
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
TABLE_NAME
|
TABLE_NAME
|
||||||
};
|
};
|
||||||
runRowCount(args, TOTAL_ROWS);
|
runRowCount(args, 10);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRowCounterWithOnlineSchemaChange() throws Exception {
|
|
||||||
|
|
||||||
String[] args = new String[] { TABLE_NAME };
|
|
||||||
final TableName tableName = TableName.valueOf(TABLE_NAME);
|
|
||||||
HTableDescriptor htd = TEST_UTIL.getHBaseAdmin().getTableDescriptor(
|
|
||||||
tableName);
|
|
||||||
final int INITAL_MAX_VERSIONS = htd.getFamilies().iterator().next()
|
|
||||||
.getMaxVersions();
|
|
||||||
final int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin()
|
|
||||||
.getTableRegions(tableName).size();
|
|
||||||
|
|
||||||
runRowCounterWithOnlineSchemaChange(args, TOTAL_ROWS);
|
|
||||||
final int FINAL_MAX_VERSIONS = TestAdmin.waitForColumnSchemasToSettle(
|
|
||||||
TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS)
|
|
||||||
.getMaxVersions();
|
|
||||||
assertEquals(
|
|
||||||
"There was a mismatch in the number of online schema modifications that were created",
|
|
||||||
FINAL_MAX_VERSIONS, INITAL_MAX_VERSIONS + NUM_ONLINE_CHANGES);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -137,7 +105,7 @@ public class TestRowCounter {
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
TABLE_NAME, COL_FAM + ":" + COL1
|
TABLE_NAME, COL_FAM + ":" + COL1
|
||||||
};
|
};
|
||||||
runRowCount(args, TOTAL_ROWS - ROWS_WITH_ONE_COL);
|
runRowCount(args, 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -151,7 +119,7 @@ public class TestRowCounter {
|
||||||
String[] args = new String[] {
|
String[] args = new String[] {
|
||||||
TABLE_NAME, COL_FAM + ":" + COL2
|
TABLE_NAME, COL_FAM + ":" + COL2
|
||||||
};
|
};
|
||||||
runRowCount(args, TOTAL_ROWS);
|
runRowCount(args, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -174,38 +142,6 @@ public class TestRowCounter {
|
||||||
assertEquals(expectedCount, counter.getValue());
|
assertEquals(expectedCount, counter.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runRowCounterWithOnlineSchemaChange(String[] args,
|
|
||||||
int expectedCount) throws Exception {
|
|
||||||
|
|
||||||
GenericOptionsParser opts = new GenericOptionsParser(
|
|
||||||
TEST_UTIL.getConfiguration(), args);
|
|
||||||
Configuration conf = opts.getConfiguration();
|
|
||||||
args = opts.getRemainingArgs();
|
|
||||||
Job job = RowCounter.createSubmittableJob(conf, args);
|
|
||||||
|
|
||||||
// This is where we'd want to start a background operation to make change on
|
|
||||||
// the table
|
|
||||||
|
|
||||||
BackgroundSchemaChangeThread schemaChangeThread = new BackgroundSchemaChangeThread(
|
|
||||||
TEST_UTIL.getHBaseAdmin(), TableName.valueOf(TABLE_NAME),
|
|
||||||
NUM_ONLINE_CHANGES);
|
|
||||||
schemaChangeThread.start();
|
|
||||||
|
|
||||||
job.waitForCompletion(true);
|
|
||||||
String trackingURL2 = job.getTrackingURL();
|
|
||||||
System.out.println("Tracking URL is: " + trackingURL2);
|
|
||||||
schemaChangeThread.join();
|
|
||||||
// this is where we'd have the thread returning
|
|
||||||
|
|
||||||
//might be a timing issue - if it takes too long, then that service is just down. stupid.
|
|
||||||
//it might also be an issue of asking for the tracking url. that may kill the history server (nope. it's a time thing).
|
|
||||||
|
|
||||||
assertTrue(job.isSuccessful());
|
|
||||||
Counter counter = job.getCounters().findCounter(
|
|
||||||
RowCounterMapper.Counters.ROWS);
|
|
||||||
assertEquals(expectedCount, counter.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
|
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
|
||||||
* two columns, Few have one.
|
* two columns, Few have one.
|
||||||
|
@ -290,89 +226,4 @@ public class TestRowCounter {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public class BackgroundSchemaChangeThread extends Thread {
|
|
||||||
private int numOnlineChanges;
|
|
||||||
HBaseAdmin admin;
|
|
||||||
TableName tableName;
|
|
||||||
|
|
||||||
public BackgroundSchemaChangeThread(HBaseAdmin admin, TableName tableName,
|
|
||||||
int numOnlineChanges) throws IOException {
|
|
||||||
this.admin = admin;
|
|
||||||
this.tableName = tableName;
|
|
||||||
this.numOnlineChanges = numOnlineChanges;
|
|
||||||
|
|
||||||
if (admin == null) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"[Test Error]: Provided admin should not be null");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
final long START_TIME = System.currentTimeMillis();
|
|
||||||
final int ONLINE_CHANGE_TIMEOUT = 200000;
|
|
||||||
|
|
||||||
HTableDescriptor htd = null;
|
|
||||||
try {
|
|
||||||
htd = admin.getTableDescriptor(tableName);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
|
|
||||||
ioe.printStackTrace();
|
|
||||||
fail("Fail: Issue pulling table descriptor");
|
|
||||||
}
|
|
||||||
|
|
||||||
HColumnDescriptor hcd = null;
|
|
||||||
assertTrue(htd != null);
|
|
||||||
final int countOfFamilies = htd.getFamilies().size();
|
|
||||||
assertTrue(countOfFamilies > 0);
|
|
||||||
boolean expectedException = false;
|
|
||||||
|
|
||||||
int numIterations = 0;
|
|
||||||
|
|
||||||
|
|
||||||
while (numIterations < numOnlineChanges) {
|
|
||||||
|
|
||||||
if (System.currentTimeMillis() - START_TIME > ONLINE_CHANGE_TIMEOUT) {
|
|
||||||
fail("Fail: Timed out reaching before required snapshot count. Only had "
|
|
||||||
+ numIterations + " updates");
|
|
||||||
}
|
|
||||||
|
|
||||||
hcd = htd.getFamilies().iterator().next();
|
|
||||||
int maxversions = hcd.getMaxVersions();
|
|
||||||
int newMaxVersions = maxversions + 1;
|
|
||||||
System.out.println("Setting max versions on CF to " + newMaxVersions);
|
|
||||||
|
|
||||||
hcd.setMaxVersions(newMaxVersions);
|
|
||||||
final byte[] hcdName = hcd.getName();
|
|
||||||
expectedException = false;
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.admin.modifyColumn(tableName, hcd);
|
|
||||||
} catch (TableNotDisabledException re) {
|
|
||||||
expectedException = true;
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("Fail: IO Issue while modifying column");
|
|
||||||
}
|
|
||||||
assertFalse(expectedException);
|
|
||||||
|
|
||||||
try {
|
|
||||||
int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName).size();
|
|
||||||
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (TableNotFoundException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("Fail: Table not found.");
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("Fail: IO Issue while modifying column");
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Sleep was interrupted. This is unusual, but not grounds for TF");
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
numIterations++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -35,15 +34,12 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.LargeTests;
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
|
||||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
@ -56,7 +52,6 @@ import org.apache.hadoop.hbase.client.MetaScanner;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TestAdmin;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
|
@ -81,7 +76,6 @@ public class TestEndToEndSplitTransaction {
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private static final Configuration conf = TEST_UTIL.getConfiguration();
|
private static final Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeAllTests() throws Exception {
|
public static void beforeAllTests() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
|
||||||
|
@ -183,13 +177,11 @@ public class TestEndToEndSplitTransaction {
|
||||||
/**
|
/**
|
||||||
* Tests that the client sees meta table changes as atomic during splits
|
* Tests that the client sees meta table changes as atomic during splits
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
private void runTestFromClientSideWhileSplitting(boolean onlineSchemaChange) throws Throwable {
|
public void testFromClientSideWhileSplitting() throws Throwable {
|
||||||
|
LOG.info("Starting testFromClientSideWhileSplitting");
|
||||||
final String tableName = "testFromClientSideWhileSplitting" + System.currentTimeMillis();
|
|
||||||
LOG.info("Starting " + tableName);
|
|
||||||
final TableName TABLENAME =
|
final TableName TABLENAME =
|
||||||
TableName.valueOf(tableName);
|
TableName.valueOf("testFromClientSideWhileSplitting");
|
||||||
final byte[] FAMILY = Bytes.toBytes("family");
|
final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
|
||||||
//SplitTransaction will update the meta table by offlining the parent region, and adding info
|
//SplitTransaction will update the meta table by offlining the parent region, and adding info
|
||||||
|
@ -197,16 +189,7 @@ public class TestEndToEndSplitTransaction {
|
||||||
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||||
|
|
||||||
Stoppable stopper = new StoppableImplementation();
|
Stoppable stopper = new StoppableImplementation();
|
||||||
|
RegionSplitter regionSplitter = new RegionSplitter(table);
|
||||||
RegionSplitter regionSplitter = null;
|
|
||||||
if (onlineSchemaChange) {
|
|
||||||
|
|
||||||
regionSplitter = new RegionSplitterWithSchemaChange(table);
|
|
||||||
} else {
|
|
||||||
|
|
||||||
regionSplitter = new RegionSplitter(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
|
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
|
||||||
|
|
||||||
regionChecker.start();
|
regionChecker.start();
|
||||||
|
@ -228,16 +211,6 @@ public class TestEndToEndSplitTransaction {
|
||||||
regionChecker.verify();
|
regionChecker.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFromClientSideOnlineSchemaChangeWhileSplitting() throws Throwable {
|
|
||||||
runTestFromClientSideWhileSplitting(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFromClientSideWhileSplitting() throws Throwable {
|
|
||||||
runTestFromClientSideWhileSplitting(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
static class RegionSplitter extends Thread {
|
static class RegionSplitter extends Thread {
|
||||||
Throwable ex;
|
Throwable ex;
|
||||||
HTable table;
|
HTable table;
|
||||||
|
@ -312,104 +285,6 @@ public class TestEndToEndSplitTransaction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RegionSplitterWithSchemaChange extends RegionSplitter {
|
|
||||||
|
|
||||||
RegionSplitterWithSchemaChange(HTable table) throws IOException {
|
|
||||||
super(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
|
|
||||||
try {
|
|
||||||
Random random = new Random();
|
|
||||||
for (int i = 0; i < 5; i++) {
|
|
||||||
NavigableMap<HRegionInfo, ServerName> regions = MetaScanner
|
|
||||||
.allTableRegions(conf, null, tableName, false);
|
|
||||||
if (regions.size() == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int regionIndex = random.nextInt(regions.size());
|
|
||||||
|
|
||||||
// pick a random region and split it into two
|
|
||||||
HRegionInfo region = Iterators.get(regions.keySet().iterator(),
|
|
||||||
regionIndex);
|
|
||||||
|
|
||||||
// pick the mid split point
|
|
||||||
int start = 0, end = Integer.MAX_VALUE;
|
|
||||||
if (region.getStartKey().length > 0) {
|
|
||||||
start = Bytes.toInt(region.getStartKey());
|
|
||||||
}
|
|
||||||
if (region.getEndKey().length > 0) {
|
|
||||||
end = Bytes.toInt(region.getEndKey());
|
|
||||||
}
|
|
||||||
int mid = start + ((end - start) / 2);
|
|
||||||
byte[] splitPoint = Bytes.toBytes(mid);
|
|
||||||
|
|
||||||
// put some rows to the regions
|
|
||||||
addData(start);
|
|
||||||
addData(mid);
|
|
||||||
|
|
||||||
flushAndBlockUntilDone(admin, rs, region.getRegionName());
|
|
||||||
compactAndBlockUntilDone(admin, rs, region.getRegionName());
|
|
||||||
|
|
||||||
log("Initiating region split for:" + region.getRegionNameAsString());
|
|
||||||
try {
|
|
||||||
admin.split(region.getRegionName(), splitPoint);
|
|
||||||
|
|
||||||
for (int j = 0; j < 5; j++) {
|
|
||||||
HTableDescriptor htd = null;
|
|
||||||
try {
|
|
||||||
htd = admin.getTableDescriptor(tableName);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
|
|
||||||
ioe.printStackTrace();
|
|
||||||
fail("Issue pulling table descriptor");
|
|
||||||
}
|
|
||||||
|
|
||||||
HColumnDescriptor hcd = null;
|
|
||||||
assertTrue(htd != null);
|
|
||||||
final int countOfFamilies = htd.getFamilies().size();
|
|
||||||
assertTrue(countOfFamilies > 0);
|
|
||||||
hcd = htd.getColumnFamilies()[0];
|
|
||||||
boolean expectedException = false;
|
|
||||||
assertFalse(expectedException);
|
|
||||||
|
|
||||||
int initMaxVersions = hcd.getMaxVersions();
|
|
||||||
int newMaxVersions = initMaxVersions + 1;
|
|
||||||
hcd.setMaxVersions(newMaxVersions);
|
|
||||||
admin.modifyColumn(tableName, hcd);
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
int EXPECTED_NUM_REGIONS = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName).size();
|
|
||||||
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(TEST_UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (TableNotFoundException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("Table not found. Failing.");
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("IO Issue while modifying column");
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Sleep was interrupted. This is unusual, but not grounds for TF");
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// wait until the split is complete
|
|
||||||
blockUntilRegionSplit(conf, 50000, region.getRegionName(), true);
|
|
||||||
|
|
||||||
} catch (NotServingRegionException ex) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
this.ex = ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks regions using MetaScanner, MetaReader and HTable methods
|
* Checks regions using MetaScanner, MetaReader and HTable methods
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -17,11 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -38,13 +33,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.LargeTests;
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
||||||
import org.apache.hadoop.hbase.TableExistsException;
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
||||||
|
@ -53,7 +45,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TestAdmin;
|
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
@ -66,7 +57,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionReque
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -84,8 +74,6 @@ public class TestHRegionServerBulkLoad {
|
||||||
private final static byte[] QUAL = Bytes.toBytes("qual");
|
private final static byte[] QUAL = Bytes.toBytes("qual");
|
||||||
private final static int NUM_CFS = 10;
|
private final static int NUM_CFS = 10;
|
||||||
public static int BLOCKSIZE = 64 * 1024;
|
public static int BLOCKSIZE = 64 * 1024;
|
||||||
public final static int NUM_ROWS = 2048;
|
|
||||||
public final static int NUM_CF_ITERATIONS = 2;
|
|
||||||
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
|
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
|
||||||
|
|
||||||
private final static byte[][] families = new byte[NUM_CFS][];
|
private final static byte[][] families = new byte[NUM_CFS][];
|
||||||
|
@ -160,7 +148,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
for (int i = 0; i < NUM_CFS; i++) {
|
for (int i = 0; i < NUM_CFS; i++) {
|
||||||
Path hfile = new Path(dir, family(i));
|
Path hfile = new Path(dir, family(i));
|
||||||
byte[] fam = Bytes.toBytes(family(i));
|
byte[] fam = Bytes.toBytes(family(i));
|
||||||
createHFile(fs, hfile, fam, QUAL, val, NUM_ROWS);
|
createHFile(fs, hfile, fam, QUAL, val, 1000);
|
||||||
famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
|
famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,110 +195,6 @@ public class TestHRegionServerBulkLoad {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class OnlineSchemaChangeMaxVersionsThread extends
|
|
||||||
RepeatingTestThread {
|
|
||||||
|
|
||||||
private HBaseAdmin admin;
|
|
||||||
private TableName tableName;
|
|
||||||
private int totalNumIterations;
|
|
||||||
byte[][] targetFamilies;
|
|
||||||
|
|
||||||
public OnlineSchemaChangeMaxVersionsThread(String tableName,
|
|
||||||
TestContext ctx, byte targetFamilies[][], int totalNumIterations) {
|
|
||||||
super(ctx);
|
|
||||||
|
|
||||||
this.tableName = TableName.valueOf(tableName);
|
|
||||||
this.targetFamilies = targetFamilies; // this should be validated, but
|
|
||||||
// it's only a test-facing API, so I
|
|
||||||
// can live with this
|
|
||||||
if (totalNumIterations < 1 || totalNumIterations > 20) {
|
|
||||||
fail("Unreasonable input provided to schema change thread. Please select a value between 1 and 20");
|
|
||||||
}
|
|
||||||
this.totalNumIterations = totalNumIterations;
|
|
||||||
try {
|
|
||||||
admin = UTIL.getHBaseAdmin();
|
|
||||||
Assert.assertNotNull(admin);
|
|
||||||
} catch (IOException e) {
|
|
||||||
fail("Not able to get a handle on the hbase admin.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
|
|
||||||
final long START_TIME = System.currentTimeMillis();
|
|
||||||
final int ONLINE_CHANGE_TIMEOUT = 2000000;
|
|
||||||
HTableDescriptor htd = null;
|
|
||||||
try {
|
|
||||||
htd = admin.getTableDescriptor(tableName);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
|
|
||||||
ioe.printStackTrace();
|
|
||||||
fail("Issue pulling table descriptor");
|
|
||||||
}
|
|
||||||
|
|
||||||
HColumnDescriptor hcd = null;
|
|
||||||
assertTrue(htd != null);
|
|
||||||
final int countOfFamilies = htd.getFamilies().size();
|
|
||||||
assertTrue(countOfFamilies > 0);
|
|
||||||
boolean expectedException = false;
|
|
||||||
|
|
||||||
int numIterations = 0;
|
|
||||||
|
|
||||||
while (numIterations < totalNumIterations) {
|
|
||||||
|
|
||||||
htd = admin.getTableDescriptor(tableName);
|
|
||||||
if (System.currentTimeMillis() - START_TIME > ONLINE_CHANGE_TIMEOUT) {
|
|
||||||
fail("Timed out reaching before required modify count. Only had "
|
|
||||||
+ numIterations + " updates");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (byte[] targetFamily : targetFamilies) {
|
|
||||||
|
|
||||||
hcd = htd.getFamily(targetFamily);
|
|
||||||
int maxversions = hcd.getMaxVersions();
|
|
||||||
System.out.println("NumIterations is: " + numIterations);
|
|
||||||
System.out.println("DEBUG: Current number of versions for family "
|
|
||||||
+ Bytes.toString(targetFamily) + " is " + maxversions);
|
|
||||||
int newMaxVersions = maxversions + 1;
|
|
||||||
System.out.println("Setting max versions on CF to " + newMaxVersions
|
|
||||||
+ " on CF " + Bytes.toString(targetFamily));
|
|
||||||
|
|
||||||
hcd.setMaxVersions(newMaxVersions);
|
|
||||||
final byte[] hcdName = hcd.getName();
|
|
||||||
expectedException = false;
|
|
||||||
try {
|
|
||||||
this.admin.modifyColumn(tableName, hcd);
|
|
||||||
} catch (TableNotDisabledException re) {
|
|
||||||
expectedException = true;
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("IO Issue while modifying column");
|
|
||||||
}
|
|
||||||
assertFalse(expectedException);
|
|
||||||
HColumnDescriptor modifiedHcd;
|
|
||||||
try {
|
|
||||||
int EXPECTED_NUM_REGIONS = UTIL.getHBaseAdmin().getTableRegions(tableName).size();
|
|
||||||
assertEquals("The max version count was not updated", newMaxVersions, TestAdmin.waitForColumnSchemasToSettle(UTIL.getMiniHBaseCluster(), tableName, EXPECTED_NUM_REGIONS).getMaxVersions());
|
|
||||||
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch (TableNotFoundException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("Table not found. Failing.");
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
fail("IO Issue while modifying column");
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
System.out
|
|
||||||
.println("WARN: Sleep was interrupted. This is unusual, but not grounds for TF");
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
numIterations++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread that does full scans of the table looking for any partially
|
* Thread that does full scans of the table looking for any partially
|
||||||
* completed rows.
|
* completed rows.
|
||||||
|
@ -378,7 +262,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
try {
|
try {
|
||||||
LOG.info("Creating table " + table);
|
LOG.info("Creating table " + table);
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||||
for (int i = 0; i < NUM_CFS; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
htd.addFamily(new HColumnDescriptor(family(i)));
|
htd.addFamily(new HColumnDescriptor(family(i)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,42 +279,23 @@ public class TestHRegionServerBulkLoad {
|
||||||
public void testAtomicBulkLoad() throws Exception {
|
public void testAtomicBulkLoad() throws Exception {
|
||||||
String TABLE_NAME = "atomicBulkLoad";
|
String TABLE_NAME = "atomicBulkLoad";
|
||||||
|
|
||||||
int millisToRun = 100000;
|
int millisToRun = 30000;
|
||||||
int numScanners = 50;
|
int numScanners = 50;
|
||||||
UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
|
|
||||||
UTIL.startMiniCluster(1);
|
UTIL.startMiniCluster(1);
|
||||||
try {
|
try {
|
||||||
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners, false);
|
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
|
||||||
} finally {
|
} finally {
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
|
||||||
public void testAtomicBulkLoadWithSchemaChange() throws Exception {
|
throws Exception {
|
||||||
String TABLE_NAME = "atomicBulkLoad";
|
setupTable(tableName, 10);
|
||||||
|
|
||||||
int millisToRun = 100000;
|
|
||||||
int numScanners = 50;
|
|
||||||
UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable",
|
|
||||||
true);
|
|
||||||
UTIL.startMiniCluster(1);
|
|
||||||
try {
|
|
||||||
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners, true);
|
|
||||||
} finally {
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void runAtomicBulkloadTest(String tableName, int millisToRun,
|
|
||||||
int numScanners, boolean schemaChange) throws Exception {
|
|
||||||
setupTable(tableName, NUM_CFS);
|
|
||||||
|
|
||||||
TestContext ctx = new TestContext(UTIL.getConfiguration());
|
TestContext ctx = new TestContext(UTIL.getConfiguration());
|
||||||
HTableDescriptor htd = UTIL.getHBaseAdmin().getTableDescriptor(
|
|
||||||
TableName.valueOf(tableName));
|
|
||||||
final int INITIAL_UPDATE_COUNT = htd.getColumnFamilies()[0]
|
|
||||||
.getMaxVersions();
|
|
||||||
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
|
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
|
||||||
ctx.addThread(loader);
|
ctx.addThread(loader);
|
||||||
|
|
||||||
|
@ -441,12 +306,6 @@ public class TestHRegionServerBulkLoad {
|
||||||
ctx.addThread(scanner);
|
ctx.addThread(scanner);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schemaChange) {
|
|
||||||
OnlineSchemaChangeMaxVersionsThread maxVersionsChangeThread = new OnlineSchemaChangeMaxVersionsThread(
|
|
||||||
tableName, ctx, families, NUM_CF_ITERATIONS);
|
|
||||||
ctx.addThread(maxVersionsChangeThread);
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.startThreads();
|
ctx.startThreads();
|
||||||
ctx.waitFor(millisToRun);
|
ctx.waitFor(millisToRun);
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
|
@ -460,27 +319,6 @@ public class TestHRegionServerBulkLoad {
|
||||||
LOG.info(" scanned " + scanner.numScans.get());
|
LOG.info(" scanned " + scanner.numScans.get());
|
||||||
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verification of data insertion
|
|
||||||
Assert.assertEquals("Incorrect number of rows found.", NUM_ROWS,
|
|
||||||
UTIL.countRows(new HTable(UTIL.getConfiguration(), tableName)));
|
|
||||||
|
|
||||||
if (schemaChange) {
|
|
||||||
// Verification of data updated
|
|
||||||
htd = UTIL.getHBaseAdmin().getTableDescriptor(
|
|
||||||
TableName.valueOf(tableName));
|
|
||||||
for (byte[] family : families) {
|
|
||||||
|
|
||||||
// verify that at least one pass occurred through the loop
|
|
||||||
HColumnDescriptor hcd = htd.getFamily(family);
|
|
||||||
assertTrue(
|
|
||||||
"The full number of iterations for family "
|
|
||||||
+ Bytes.toString(family) + " was not done. Expecting at least "
|
|
||||||
+ (NUM_CF_ITERATIONS + INITIAL_UPDATE_COUNT) + " but received "
|
|
||||||
+ hcd.getMaxVersions(),
|
|
||||||
hcd.getMaxVersions() >= (NUM_CF_ITERATIONS + INITIAL_UPDATE_COUNT));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -492,7 +330,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
Configuration c = HBaseConfiguration.create();
|
Configuration c = HBaseConfiguration.create();
|
||||||
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
|
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
|
||||||
test.setConf(c);
|
test.setConf(c);
|
||||||
test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50, false);
|
test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
|
||||||
} finally {
|
} finally {
|
||||||
System.exit(0); // something hangs (believe it is lru threadpool)
|
System.exit(0); // something hangs (believe it is lru threadpool)
|
||||||
}
|
}
|
||||||
|
@ -501,4 +339,6 @@ public class TestHRegionServerBulkLoad {
|
||||||
private void setConf(Configuration c) {
|
private void setConf(Configuration c) {
|
||||||
UTIL = new HBaseTestingUtility(c);
|
UTIL = new HBaseTestingUtility(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue