HADOOP-1574 Concurrent creates of a table named 'X' all succeed

M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTable.java
    (testTable): Add checking of actual exceptions thrown and
    assertions that we are getting right behavior.  Add a test
    that has ten clients concurrently trying to create same table.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
    javadoc edit. Fix debug message that could give impression
    table was found when it wasn't.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
    Added comment on table nameing.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    (createTable): Refactored. Bulk moved to a private override.
    Changed how check for existance is done.
M rc/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
    (getTableNameFromRegionName): Utility method added.
A src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableExistsException.java
    Added.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@556334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-07-14 20:08:01 +00:00
parent d14fa81891
commit 14dd64dace
6 changed files with 221 additions and 137 deletions

View File

@ -59,3 +59,4 @@ Trunk (unreleased changes)
35. HADOOP-1375 a simple parser for hbase (Edward Yoon via Stack) 35. HADOOP-1375 a simple parser for hbase (Edward Yoon via Stack)
36. HADOOP-1600 Update license in HBase code 36. HADOOP-1600 Update license in HBase code
37. HADOOP-1589 Exception handling in HBase is broken over client server 37. HADOOP-1589 Exception handling in HBase is broken over client server
38. HADOOP-1574 Concurrent creates of a table named 'X' all succeed

View File

@ -220,9 +220,14 @@ public class HClient implements HConstants {
* *
* @param desc table descriptor for table * @param desc table descriptor for table
* *
* @throws RemoteException if exception occurred on remote side of
* connection.
* @throws IllegalArgumentException if the table name is reserved * @throws IllegalArgumentException if the table name is reserved
* @throws MasterNotRunningException if master is not running * @throws MasterNotRunningException if master is not running
* @throws NoServerForRegionException if root region is not being served * @throws NoServerForRegionException if root region is not being served
* @throws TableExistsException if table already exists (If concurrent
* threads, the table may have been created between test-for-existence
* and attempt-at-creation).
* @throws IOException * @throws IOException
*/ */
public synchronized void createTable(HTableDescriptor desc) public synchronized void createTable(HTableDescriptor desc)
@ -247,9 +252,14 @@ public class HClient implements HConstants {
* *
* @param desc table descriptor for table * @param desc table descriptor for table
* *
* @throws RemoteException if exception occurred on remote side of
* connection.
* @throws IllegalArgumentException if the table name is reserved * @throws IllegalArgumentException if the table name is reserved
* @throws MasterNotRunningException if master is not running * @throws MasterNotRunningException if master is not running
* @throws NoServerForRegionException if root region is not being served * @throws NoServerForRegionException if root region is not being served
* @throws TableExistsException if table already exists (If concurrent
* threads, the table may have been created between test-for-existence
* and attempt-at-creation).
* @throws IOException * @throws IOException
*/ */
public synchronized void createTableAsync(HTableDescriptor desc) public synchronized void createTableAsync(HTableDescriptor desc)
@ -266,7 +276,7 @@ public class HClient implements HConstants {
/** /**
* Deletes a table * Deletes a table
* *
* @param tableName - name of table to delete * @param tableName name of table to delete
* @throws IOException * @throws IOException
*/ */
public synchronized void deleteTable(Text tableName) throws IOException { public synchronized void deleteTable(Text tableName) throws IOException {
@ -338,8 +348,8 @@ public class HClient implements HConstants {
/** /**
* Add a column to an existing table * Add a column to an existing table
* *
* @param tableName - name of the table to add column to * @param tableName name of the table to add column to
* @param column - column descriptor of column to be added * @param column column descriptor of column to be added
* @throws IOException * @throws IOException
*/ */
public synchronized void addColumn(Text tableName, HColumnDescriptor column) public synchronized void addColumn(Text tableName, HColumnDescriptor column)
@ -357,8 +367,8 @@ public class HClient implements HConstants {
/** /**
* Delete a column from a table * Delete a column from a table
* *
* @param tableName - name of table * @param tableName name of table
* @param columnName - name of column to be deleted * @param columnName name of column to be deleted
* @throws IOException * @throws IOException
*/ */
public synchronized void deleteColumn(Text tableName, Text columnName) public synchronized void deleteColumn(Text tableName, Text columnName)
@ -376,7 +386,7 @@ public class HClient implements HConstants {
/** /**
* Brings a table on-line (enables it) * Brings a table on-line (enables it)
* *
* @param tableName - name of the table * @param tableName name of the table
* @throws IOException * @throws IOException
*/ */
public synchronized void enableTable(Text tableName) throws IOException { public synchronized void enableTable(Text tableName) throws IOException {
@ -467,7 +477,7 @@ public class HClient implements HConstants {
* Disables a table (takes it off-line) If it is being served, the master * Disables a table (takes it off-line) If it is being served, the master
* will tell the servers to stop serving it. * will tell the servers to stop serving it.
* *
* @param tableName - name of table * @param tableName name of table
* @throws IOException * @throws IOException
*/ */
public synchronized void disableTable(Text tableName) throws IOException { public synchronized void disableTable(Text tableName) throws IOException {
@ -591,8 +601,8 @@ public class HClient implements HConstants {
/** /**
* Loads information so that a table can be manipulated. * Loads information so that a table can be manipulated.
* *
* @param tableName - the table to be located * @param tableName the table to be located
* @throws IOException - if the table can not be located after retrying * @throws IOException if the table can not be located after retrying
*/ */
public synchronized void openTable(Text tableName) throws IOException { public synchronized void openTable(Text tableName) throws IOException {
if(tableName == null || tableName.getLength() == 0) { if(tableName == null || tableName.getLength() == 0) {
@ -851,7 +861,8 @@ public class HClient implements HConstants {
if(!regionInfo.tableDesc.getName().equals(tableName)) { if(!regionInfo.tableDesc.getName().equals(tableName)) {
// We're done // We're done
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Found " + tableName); LOG.debug("Found " + servers.size() + " servers for table " +
tableName);
} }
break; break;
} }
@ -1352,11 +1363,12 @@ public class HClient implements HConstants {
} }
/** /**
* Change a value for the specified column * Change a value for the specified column.
* Runs {@link #abort(long)} if exception thrown.
* *
* @param lockid - lock id returned from startUpdate * @param lockid lock id returned from startUpdate
* @param column - column whose value is being set * @param column column whose value is being set
* @param val - new value for column * @param val new value for column
* @throws IOException * @throws IOException
*/ */
public void put(long lockid, Text column, byte val[]) throws IOException { public void put(long lockid, Text column, byte val[]) throws IOException {

View File

@ -24,10 +24,12 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.Timer; import java.util.Timer;
@ -49,6 +51,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* HMaster is the "master server" for a HBase. * HMaster is the "master server" for a HBase.
* There is only one HMaster for a single HBase deployment. * There is only one HMaster for a single HBase deployment.
@ -231,7 +234,6 @@ public class HMaster implements HConstants, HMasterInterface,
protected void checkAssigned(final HRegionInfo info, protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) { final String serverName, final long startCode) {
// Skip region - if ... // Skip region - if ...
if(info.offLine // offline if(info.offLine // offline
|| killedRegions.contains(info.regionName) // queued for offline || killedRegions.contains(info.regionName) // queued for offline
@ -466,7 +468,6 @@ public class HMaster implements HConstants, HMasterInterface,
try { try {
// Rescan the known meta regions every so often // Rescan the known meta regions every so often
synchronized(metaScannerLock) { // Don't interrupt us while we're working synchronized(metaScannerLock) { // Don't interrupt us while we're working
Vector<MetaRegion> v = new Vector<MetaRegion>(); Vector<MetaRegion> v = new Vector<MetaRegion>();
v.addAll(knownMetaRegions.values()); v.addAll(knownMetaRegions.values());
@ -639,13 +640,11 @@ public class HMaster implements HConstants, HMasterInterface,
this.serverLeases = new Leases( this.serverLeases = new Leases(
conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.period", 30 * 1000),
conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = RPC.getServer(this, address.getBindAddress(), this.server = RPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf); false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info // The rpc-server port can be ephemeral... ensure we have the correct info
this.address = new HServerAddress(server.getListenerAddress()); this.address = new HServerAddress(server.getListenerAddress());
conf.set(MASTER_ADDRESS, address.toString()); conf.set(MASTER_ADDRESS, address.toString());
@ -847,13 +846,7 @@ public class HMaster implements HConstants, HMasterInterface,
synchronized boolean waitForRootRegionOrClose() { synchronized boolean waitForRootRegionOrClose() {
while (!closed && rootRegionLocation == null) { while (!closed && rootRegionLocation == null) {
try { try {
if (LOG.isDebugEnabled()) {
LOG.debug("Wait for root region (or close)");
}
wait(threadWakeFrequency); wait(threadWakeFrequency);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake from wait for root region (or close)");
}
} catch(InterruptedException e) { } catch(InterruptedException e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Wake from wait for root region (or close) (IE)"); LOG.debug("Wake from wait for root region (or close) (IE)");
@ -1154,11 +1147,9 @@ public class HMaster implements HConstants, HMasterInterface,
int counter = 0; int counter = 0;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (Text curRegionName: unassignedRegions.keySet()) { for (Text curRegionName: unassignedRegions.keySet()) {
HRegionInfo regionInfo = unassignedRegions.get(curRegionName); HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
long assignedTime = assignAttempts.get(curRegionName); long assignedTime = assignAttempts.get(curRegionName);
if (now - assignedTime > maxRegionOpenTime) { if (now - assignedTime > maxRegionOpenTime) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("assigning region " + regionInfo.regionName + " to server " LOG.debug("assigning region " + regionInfo.regionName + " to server "
@ -1757,7 +1748,8 @@ public class HMaster implements HConstants, HMasterInterface,
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public void createTable(HTableDescriptor desc) throws IOException { public void createTable(HTableDescriptor desc)
throws IOException {
if (!isMasterRunning()) { if (!isMasterRunning()) {
throw new MasterNotRunningException(); throw new MasterNotRunningException();
} }
@ -1765,61 +1757,13 @@ public class HMaster implements HConstants, HMasterInterface,
for(int tries = 0; tries < numRetries; tries++) { for(int tries = 0; tries < numRetries; tries++) {
try { try {
// We can not access any meta region if they have not already been assigned // We can not access meta regions if they have not already been
// and scanned. // assigned and scanned. If we timeout waiting, just shutdown.
if (metaScanner.waitForMetaScanOrClose()) { if (metaScanner.waitForMetaScanOrClose()) {
return; // We're shutting down. Forget it. return;
} }
createTable(newRegion);
// 1. Check to see if table already exists
MetaRegion m = (knownMetaRegions.containsKey(newRegion.regionName))?
knownMetaRegions.get(newRegion.regionName):
knownMetaRegions.get(
knownMetaRegions.headMap(newRegion.regionName).lastKey());
Text metaRegionName = m.regionName;
HRegionInterface server = client.getHRegionConnection(m.server);
byte [] infoBytes =
server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
if (infoBytes != null && infoBytes.length != 0) {
DataInputBuffer inbuf = new DataInputBuffer();
inbuf.reset(infoBytes, infoBytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
if (info.tableDesc.getName().compareTo(desc.getName()) == 0) {
throw new IOException("table already exists");
}
}
// 2. Create the HRegion
HRegion r = HRegion.createHRegion(newRegion.regionId, desc, this.dir,
this.conf);
// 3. Insert into meta
HRegionInfo info = r.getRegionInfo();
Text regionName = r.getRegionName();
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
info.write(s);
long clientId = rand.nextLong();
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
byteValue.toByteArray());
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
// 4. Close the new region to flush it to disk
r.close();
// 5. Get it assigned to a server
unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, Long.valueOf(0L));
break; break;
} catch (IOException e) { } catch (IOException e) {
if(tries == numRetries - 1) { if(tries == numRetries - 1) {
if (e instanceof RemoteException) { if (e instanceof RemoteException) {
@ -1835,6 +1779,81 @@ public class HMaster implements HConstants, HMasterInterface,
} }
} }
/*
* Set of tables currently in creation. Access needs to be synchronized.
*/
private Set<Text> tableInCreation = new HashSet<Text>();
private void createTable(final HRegionInfo newRegion) throws IOException {
Text tableName = newRegion.tableDesc.getName();
synchronized (tableInCreation) {
if (tableInCreation.contains(tableName)) {
throw new TableExistsException("Table " + tableName + " in process "
+ "of being created");
}
tableInCreation.add(tableName);
}
try {
// 1. Check to see if table already exists. Get meta region where
// table would sit should it exist. Open scanner on it. If a region
// for the table we want to create already exists, then table already
// created. Throw already-exists exception.
MetaRegion m = (knownMetaRegions.containsKey(newRegion.regionName))?
knownMetaRegions.get(newRegion.regionName):
knownMetaRegions.get(knownMetaRegions.
headMap(newRegion.getTableDesc().getName()).lastKey());
Text metaRegionName = m.regionName;
HRegionInterface connection = client.getHRegionConnection(m.server);
long scannerid = connection.openScanner(metaRegionName,
new Text[] { COL_REGIONINFO }, tableName, System.currentTimeMillis(),
null);
try {
KeyedData[] data = connection.next(scannerid);
// Test data and that the row for the data is for our table. If
// table does not exist, scanner will return row after where our table
// would be inserted if it exists so look for exact match on table
// name.
if (data != null && data.length > 0 &&
HRegionInfo.getTableNameFromRegionName(data[0].getKey().getRow()).
equals(tableName)) {
// Then a region for this table already exists. Ergo table exists.
throw new TableExistsException(tableName.toString());
}
} finally {
connection.close(scannerid);
}
// 2. Create the HRegion
HRegion r = HRegion.createHRegion(newRegion.regionId, newRegion.
getTableDesc(), this.dir, this.conf);
// 3. Insert into meta
HRegionInfo info = r.getRegionInfo();
Text regionName = r.getRegionName();
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
info.write(s);
long clientId = rand.nextLong();
long lockid = connection.
startUpdate(metaRegionName, clientId, regionName);
connection.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
byteValue.toByteArray());
connection.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
// 4. Close the new region to flush it to disk
r.close();
// 5. Get it assigned to a server
unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, Long.valueOf(0L));
} finally {
synchronized (tableInCreation) {
tableInCreation.remove(newRegion.getTableDesc().getName());
}
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -1869,15 +1888,15 @@ public class HMaster implements HConstants, HMasterInterface,
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public HServerAddress findRootRegion() { public void disableTable(Text tableName) throws IOException {
return rootRegionLocation; new ChangeTableState(tableName, false).process();
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public void disableTable(Text tableName) throws IOException { public HServerAddress findRootRegion() {
new ChangeTableState(tableName, false).process(); return rootRegionLocation;
} }
// Helper classes for HMasterInterface // Helper classes for HMasterInterface

View File

@ -40,6 +40,7 @@ public class HRegionInfo implements WritableComparable {
Text endKey; Text endKey;
boolean offLine; boolean offLine;
HTableDescriptor tableDesc; HTableDescriptor tableDesc;
public static final char DELIMITER = '_';
/** Default constructor - creates empty object */ /** Default constructor - creates empty object */
public HRegionInfo() { public HRegionInfo() {
@ -92,8 +93,8 @@ public class HRegionInfo implements WritableComparable {
this.endKey.set(endKey); this.endKey.set(endKey);
} }
this.regionName = new Text(tableDesc.getName() + "_" + this.regionName = new Text(tableDesc.getName().toString() + DELIMITER +
(startKey == null ? "" : startKey.toString()) + "_" + (startKey == null ? "" : startKey.toString()) + DELIMITER +
regionId); regionId);
this.offLine = false; this.offLine = false;
@ -165,6 +166,30 @@ public class HRegionInfo implements WritableComparable {
return regionName; return regionName;
} }
/**
* Extracts table name prefix from a region name.
* Presumes region names are ASCII characters only.
* @param regionName A region name.
* @return The table prefix of a region name.
*/
public static Text getTableNameFromRegionName(final Text regionName) {
int index = -1;
byte [] bytes = regionName.getBytes();
for (int i = 0; i < bytes.length; i++) {
if (((char) bytes[i]) == DELIMITER) {
index = i;
break;
}
}
if (index == -1) {
throw new IllegalArgumentException(regionName.toString() + " does not " +
"contain " + DELIMITER + " character");
}
byte [] tableName = new byte[index];
System.arraycopy(bytes, 0, tableName, 0, index);
return new Text(tableName);
}
/** /**
* @return the startKey * @return the startKey
*/ */

View File

@ -39,11 +39,12 @@ public class HTableDescriptor implements WritableComparable {
Text name; Text name;
TreeMap<Text, HColumnDescriptor> families; TreeMap<Text, HColumnDescriptor> families;
/** /*
* Legal table names can only contain 'word characters': * Legal table names can only contain 'word characters':
* i.e. <code>[a-zA-Z_0-9]</code>. * i.e. <code>[a-zA-Z_0-9]</code>.
* * Lets be restrictive until a reason to be otherwise. One reason to limit
* Let's be restrictive until a reason to be otherwise. * characters in table name is to ensure table regions as entries in META
* regions can be found (See HADOOP-1581 'HBASE: Un-openable tablename bug').
*/ */
private static final Pattern LEGAL_TABLE_NAME = private static final Pattern LEGAL_TABLE_NAME =
Pattern.compile("[\\w-]+"); Pattern.compile("[\\w-]+");

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/** Tests table creation restrictions*/ /** Tests table creation restrictions*/
public class TestTable extends HBaseClusterTestCase { public class TestTable extends HBaseClusterTestCase {
@ -27,55 +28,80 @@ public class TestTable extends HBaseClusterTestCase {
super(true); super(true);
} }
public void testTable() { public void testTable() throws IOException {
HClient client = new HClient(conf); final HClient client = new HClient(conf);
String msg = null;
try { try {
client.createTable(HGlobals.rootTableDesc); client.createTable(HGlobals.rootTableDesc);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// Expected - ignore it msg = e.toString();
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
} }
assertTrue("Unexcepted exception message " + msg, msg != null &&
msg.startsWith(IllegalArgumentException.class.getName()) &&
msg.contains(HGlobals.rootTableDesc.getName().toString()));
msg = null;
try { try {
client.createTable(HGlobals.metaTableDesc); client.createTable(HGlobals.metaTableDesc);
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
// Expected - ignore it msg = e.toString();
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
} }
assertTrue("Unexcepted exception message " + msg, msg != null &&
msg.startsWith(IllegalArgumentException.class.getName()) &&
msg.contains(HGlobals.metaTableDesc.getName().toString()));
HTableDescriptor desc = new HTableDescriptor("test"); // Try doing a duplicate database create.
msg = null;
HTableDescriptor desc = new HTableDescriptor(getName());
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
client.createTable(desc);
try { try {
client.createTable(desc); client.createTable(desc);
} catch (TableExistsException e) {
} catch(Exception e) { msg = e.getMessage();
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
} }
assertTrue("Unexpected exception message " + msg, msg != null &&
msg.contains(getName()));
// Now try and do concurrent creation with a bunch of threads.
final HTableDescriptor threadDesc =
new HTableDescriptor("threaded-" + getName());
threadDesc.addFamily(new HColumnDescriptor(HConstants.
COLUMN_FAMILY.toString()));
int count = 10;
Thread [] threads = new Thread [count];
final AtomicInteger successes = new AtomicInteger(0);
final AtomicInteger failures = new AtomicInteger(0);
for (int i = 0; i < count; i++) {
threads[i] = new Thread(Integer.toString(i)) {
@Override
public void run() {
try { try {
client.createTable(desc); client.createTable(threadDesc);
successes.incrementAndGet();
} catch (TableExistsException e) {
failures.incrementAndGet();
} catch (IOException e) { } catch (IOException e) {
// Expected. Ignore it. // ignore.
}
} catch(Exception e) { }
System.err.println("Unexpected exception"); };
e.printStackTrace(); }
fail(); for (int i = 0; i < count; i++) {
threads[i].start();
}
for (int i = 0; i < count; i++) {
while(threads[i].isAlive()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
} }
} }
} }
// All threads are now dead. Count up how many tables were created and
// how many failed w/ appropriate exception.
assertTrue(successes.get() == 1);
assertTrue(failures.get() == (count - 1));
}
}