HBASE-7546 Obtain a table read lock on region split operations
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1459102 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
15a59be843
commit
5fce119685
@ -149,7 +149,7 @@ public abstract class TableLockManager {
|
||||
* A null implementation
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
static class NullTableLockManager extends TableLockManager {
|
||||
public static class NullTableLockManager extends TableLockManager {
|
||||
static class NullTableLock implements TableLock {
|
||||
@Override
|
||||
public void acquire() throws IOException {
|
||||
|
@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientEngine;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
@ -437,6 +438,9 @@ public class HRegionServer implements ClientProtocol,
|
||||
/** Handle all the snapshot requests to this server */
|
||||
RegionServerSnapshotManager snapshotManager;
|
||||
|
||||
// Table level lock manager for locking for region operations
|
||||
private TableLockManager tableLockManager;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
*
|
||||
@ -634,6 +638,8 @@ public class HRegionServer implements ClientProtocol,
|
||||
} catch (KeeperException e) {
|
||||
this.abort("Failed to reach zk cluster when creating snapshot handler.");
|
||||
}
|
||||
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
|
||||
new ServerName(isa.getHostName(), isa.getPort(), startcode));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1130,6 +1136,11 @@ public class HRegionServer implements ClientProtocol,
|
||||
return regionServerAccounting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableLockManager getTableLockManager() {
|
||||
return tableLockManager;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param r Region to get RegionLoad for.
|
||||
*
|
||||
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
@ -59,6 +60,11 @@ public interface RegionServerServices extends OnlineRegions {
|
||||
*/
|
||||
public RegionServerAccounting getRegionServerAccounting();
|
||||
|
||||
/**
|
||||
* @return RegionServer's instance of {@link TableLockManager}
|
||||
*/
|
||||
public TableLockManager getTableLockManager();
|
||||
|
||||
/**
|
||||
* Tasks to perform after region open to complete deploy of region on
|
||||
* regionserver
|
||||
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
@ -38,6 +39,7 @@ class SplitRequest implements Runnable {
|
||||
private final HRegion parent;
|
||||
private final byte[] midKey;
|
||||
private final HRegionServer server;
|
||||
private TableLock tableLock;
|
||||
|
||||
SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) {
|
||||
Preconditions.checkNotNull(hrs);
|
||||
@ -61,6 +63,18 @@ class SplitRequest implements Runnable {
|
||||
try {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
SplitTransaction st = new SplitTransaction(parent, midKey);
|
||||
|
||||
//acquire a shared read lock on the table, so that table schema modifications
|
||||
//do not happen concurrently
|
||||
tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getName()
|
||||
, "SPLIT_REGION:" + parent.getRegionNameAsString());
|
||||
try {
|
||||
tableLock.acquire();
|
||||
} catch (IOException ex) {
|
||||
tableLock = null;
|
||||
throw ex;
|
||||
}
|
||||
|
||||
// If prepare does not return true, for some reason -- logged inside in
|
||||
// the prepare call -- we are not ready to split just now. Just return.
|
||||
if (!st.prepare()) return;
|
||||
@ -109,6 +123,18 @@ class SplitRequest implements Runnable {
|
||||
RemoteExceptionHandler.checkIOException(io));
|
||||
}
|
||||
}
|
||||
releaseTableLock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void releaseTableLock() {
|
||||
if (this.tableLock != null) {
|
||||
try {
|
||||
this.tableLock.release();
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Could not release the table lock", ex);
|
||||
//TODO: if we get here, and not abort RS, this lock will never be released
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
@ -148,7 +149,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
||||
|
||||
/**
|
||||
* @param sn Name of this mock regionserver
|
||||
* @throws IOException
|
||||
* @throws IOException
|
||||
* @throws org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException
|
||||
*/
|
||||
MockRegionServer(final Configuration conf, final ServerName sn)
|
||||
@ -290,6 +291,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
||||
return null;
|
||||
}
|
||||
|
||||
public TableLockManager getTableLockManager() {
|
||||
return new NullTableLockManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOpenDeployTasks(HRegion r, CatalogTracker ct)
|
||||
throws KeeperException, IOException {
|
||||
|
@ -21,9 +21,12 @@ package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@ -34,10 +37,12 @@ import java.util.concurrent.Future;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
|
||||
import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
|
||||
@ -45,7 +50,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LoadTestTool;
|
||||
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
@ -56,7 +64,7 @@ import org.junit.experimental.categories.Category;
|
||||
/**
|
||||
* Tests the default table lock manager
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
@Category(LargeTests.class)
|
||||
public class TestTableLockManager {
|
||||
|
||||
private static final Log LOG =
|
||||
@ -291,4 +299,103 @@ public class TestTableLockManager {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testTableReadLock() throws Exception {
|
||||
// test plan: write some data to the table. Continuously alter the table and
|
||||
// force splits
|
||||
// concurrently until we have 10 regions. verify the data just in case.
|
||||
// Every region should contain the same table descriptor
|
||||
// This is not an exact test
|
||||
prepareMiniCluster();
|
||||
LoadTestTool loadTool = new LoadTestTool();
|
||||
loadTool.setConf(TEST_UTIL.getConfiguration());
|
||||
int numKeys = 10000;
|
||||
final byte[] tableName = Bytes.toBytes("testTableReadLock");
|
||||
final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||
final HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
final byte[] family = Bytes.toBytes("test_cf");
|
||||
desc.addFamily(new HColumnDescriptor(family));
|
||||
admin.createTable(desc); // create with one region
|
||||
|
||||
// write some data, not much
|
||||
int ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-write",
|
||||
String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
|
||||
if (0 != ret) {
|
||||
String errorMsg = "Load failed with error code " + ret;
|
||||
LOG.error(errorMsg);
|
||||
fail(errorMsg);
|
||||
}
|
||||
|
||||
int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
|
||||
StoppableImplementation stopper = new StoppableImplementation();
|
||||
|
||||
//alter table every 10 sec
|
||||
Chore alterThread = new Chore("Alter Chore", 10000, stopper) {
|
||||
@Override
|
||||
protected void chore() {
|
||||
Random random = new Random();
|
||||
try {
|
||||
HTableDescriptor htd = admin.getTableDescriptor(tableName);
|
||||
String val = String.valueOf(random.nextInt());
|
||||
htd.getFamily(family).setValue(val, val);
|
||||
desc.getFamily(family).setValue(val, val); // save it for later
|
||||
// control
|
||||
admin.modifyTable(tableName, htd);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Caught exception", ex);
|
||||
fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
//split table every 5 sec
|
||||
Chore splitThread = new Chore("Split thread", 5000, stopper) {
|
||||
@Override
|
||||
public void chore() {
|
||||
try {
|
||||
Random random = new Random();
|
||||
List<HRegionInfo> regions = admin.getTableRegions(tableName);
|
||||
byte[] regionName = regions.get(random.nextInt(regions.size())).getRegionName();
|
||||
admin.flush(regionName);
|
||||
admin.compact(regionName);
|
||||
admin.split(regionName);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Caught exception", ex);
|
||||
fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
alterThread.start();
|
||||
splitThread.start();
|
||||
while (true) {
|
||||
List<HRegionInfo> regions = admin.getTableRegions(tableName);
|
||||
LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
|
||||
assertEquals(admin.getTableDescriptor(tableName), desc);
|
||||
for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
|
||||
assertEquals(desc, region.getTableDesc());
|
||||
}
|
||||
if (regions.size() >= 10) {
|
||||
break;
|
||||
}
|
||||
Threads.sleep(1000);
|
||||
}
|
||||
stopper.stop("test finished");
|
||||
|
||||
int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
|
||||
LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
|
||||
assertTrue(newFamilyValues > familyValues); // at least one alter went
|
||||
// through
|
||||
|
||||
ret = loadTool.run(new String[] { "-tn", Bytes.toString(tableName), "-read", "100:10",
|
||||
"-num_keys", String.valueOf(numKeys), "-skip_init" });
|
||||
if (0 != ret) {
|
||||
String errorMsg = "Verify failed with error code " + ret;
|
||||
LOG.error(errorMsg);
|
||||
fail(errorMsg);
|
||||
}
|
||||
|
||||
admin.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
@ -130,6 +132,11 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableLockManager getTableLockManager() {
|
||||
return new NullTableLockManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerName getServerName() {
|
||||
return this.serverName;
|
||||
|
Loading…
x
Reference in New Issue
Block a user