Hadoop-1384

HBase omnibus patch. Contributions by Vuk Ercegovac and Michael Stack.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@539243 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-05-18 03:22:54 +00:00
parent 3a70f2f97b
commit a6b7be60d6
25 changed files with 1578 additions and 1280 deletions

View File

@ -183,10 +183,10 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
abstract boolean getNext(int i) throws IOException;
/** Mechanism used by concrete implementation to shut down a particular scanner */
abstract void closeSubScanner(int i) throws IOException;
abstract void closeSubScanner(int i);
/** Mechanism used to shut down the whole scan */
public abstract void close() throws IOException;
public abstract void close();
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HInternalScannerInterface#isWildcardScanner()

View File

@ -288,17 +288,35 @@ public class HClient implements HConstants {
throw new IOException("Timed out trying to locate root region");
}
// Verify that this server still serves the root region
HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) {
try {
rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName);
this.tableServers = new TreeMap<Text, TableInfo>();
this.tableServers.put(EMPTY_START_ROW,
new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
this.tablesToServers.put(ROOT_TABLE_NAME, this.tableServers);
break;
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// Don't bother sleeping. We've run out of retries.
break;
}
// Sleep and retry finding root region.
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Root region location changed. Sleeping.");
}
Thread.sleep(this.clientTimeout);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch(InterruptedException iex) {
}
}
rootRegionLocation = null;
}
@ -453,7 +471,7 @@ public class HClient implements HConstants {
* Right now, it only exists as part of the META table's region info.
*/
public synchronized HTableDescriptor[] listTables()
throws IOException {
throws IOException {
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
TreeMap<Text, TableInfo> metaTables =
@ -523,24 +541,84 @@ public class HClient implements HConstants {
return this.tableServers.get(serverKey);
}
private synchronized void findRegion(TableInfo info) throws IOException {
// Wipe out everything we know about this table
this.tablesToServers.remove(info.regionInfo.tableDesc.getName());
this.tableServers.clear();
// Reload information for the whole table
findTableInMeta(info.regionInfo.tableDesc.getName());
if(this.tableServers.get(info.regionInfo.startKey) == null ) {
throw new IOException("region " + info.regionInfo.regionName + " does not exist");
}
}
/** Get a single value for the specified row and column */
public byte[] get(Text row, Text column) throws IOException {
TableInfo info = getTableInfo(row);
return getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column).get();
TableInfo info = null;
BytesWritable value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
value = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
info = null;
}
}
if(value != null) {
byte[] bytes = new byte[value.getSize()];
System.arraycopy(value.get(), 0, bytes, 0, bytes.length);
return bytes;
}
return null;
}
/** Get the specified number of versions of the specified row and column */
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
TableInfo info = getTableInfo(row);
BytesWritable[] values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, numVersions);
TableInfo info = null;
BytesWritable[] values = null;
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
bytes.add(values[i].get());
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, numVersions);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
info = null;
}
}
return bytes.toArray(new byte[values.length][]);
if(values != null) {
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
byte[] value = new byte[values[i].getSize()];
System.arraycopy(values[i].get(), 0, value, 0, value.length);
bytes.add(value);
}
return bytes.toArray(new byte[values.length][]);
}
return null;
}
/**
@ -548,22 +626,61 @@ public class HClient implements HConstants {
* the specified timestamp.
*/
public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException {
TableInfo info = getTableInfo(row);
BytesWritable[] values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, timestamp, numVersions);
TableInfo info = null;
BytesWritable[] values = null;
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
bytes.add(values[i].get());
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
values = getHRegionConnection(info.serverAddress).get(
info.regionInfo.regionName, row, column, timestamp, numVersions);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
info = null;
}
}
return bytes.toArray(new byte[values.length][]);
if(values != null) {
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
for(int i = 0 ; i < values.length; i++) {
byte[] value = new byte[values[i].getSize()];
System.arraycopy(values[i].get(), 0, value, 0, value.length);
bytes.add(value);
}
return bytes.toArray(new byte[values.length][]);
}
return null;
}
/** Get all the data for the specified row */
public LabelledData[] getRow(Text row) throws IOException {
TableInfo info = getTableInfo(row);
return getHRegionConnection(info.serverAddress).getRow(
info.regionInfo.regionName, row);
TableInfo info = null;
LabelledData[] value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
value = getHRegionConnection(info.serverAddress).getRow(
info.regionInfo.regionName, row);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
info = null;
}
}
return value;
}
/**
@ -579,19 +696,34 @@ public class HClient implements HConstants {
/** Start an atomic row insertion or update */
public long startUpdate(Text row) throws IOException {
TableInfo info = getTableInfo(row);
long lockid;
try {
this.currentServer = getHRegionConnection(info.serverAddress);
this.currentRegion = info.regionInfo.regionName;
this.clientid = rand.nextLong();
lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
TableInfo info = null;
long lockid = -1L;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
this.currentServer = getHRegionConnection(info.serverAddress);
this.currentRegion = info.regionInfo.regionName;
this.clientid = rand.nextLong();
lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
info = null;
} catch(IOException e) {
this.currentServer = null;
this.currentRegion = null;
throw e;
}
} catch(IOException e) {
this.currentServer = null;
this.currentRegion = null;
throw e;
}
return lockid;
}
@ -666,11 +798,7 @@ public class HClient implements HConstants {
private HRegionInterface server;
private long scannerId;
public ClientScanner(Text[] columns, Text startRow) throws IOException {
this.columns = columns;
this.startRow = startRow;
this.closed = false;
private void loadRegions() {
Text firstServer = null;
if(this.startRow == null || this.startRow.getLength() == 0) {
firstServer = tableServers.firstKey();
@ -682,8 +810,15 @@ public class HClient implements HConstants {
firstServer = tableServers.headMap(startRow).lastKey();
}
Collection<TableInfo> info = tableServers.tailMap(firstServer).values();
this.regions = info.toArray(new TableInfo[info.size()]);
}
public ClientScanner(Text[] columns, Text startRow) throws IOException {
this.columns = columns;
this.startRow = startRow;
this.closed = false;
loadRegions();
this.currentRegion = -1;
this.server = null;
this.scannerId = -1L;
@ -706,9 +841,25 @@ public class HClient implements HConstants {
}
try {
this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
this.scannerId = this.server.openScanner(
this.regions[currentRegion].regionInfo.regionName, this.columns,
this.startRow);
for(int tries = 0; tries < numRetries; tries++) {
TableInfo info = this.regions[currentRegion];
try {
this.scannerId = this.server.openScanner(info.regionInfo.regionName,
this.columns, currentRegion == 0 ? this.startRow : EMPTY_START_ROW);
break;
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
}
findRegion(info);
loadRegions();
}
}
} catch(IOException e) {
close();
@ -743,6 +894,7 @@ public class HClient implements HConstants {
public void close() throws IOException {
if(this.scannerId != -1L) {
this.server.close(this.scannerId);
this.scannerId = -1L;
}
this.server = null;
this.closed = true;

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.io.Text;
public interface HInternalScannerInterface {
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException;
public void close() throws IOException;
public void close();
/** Returns true if the scanner is matching a column family or regex */
public boolean isWildcardScanner();

View File

@ -15,6 +15,8 @@
*/
package org.apache.hadoop.hbase;
import java.util.concurrent.atomic.AtomicInteger;
/*******************************************************************************
* HLocking is a set of lock primitives that does not rely on a
* particular thread holding the monitor for an object. This is
@ -33,12 +35,12 @@ public class HLocking {
// If lockers > 0, locked for read
// If lockers == -1 locked for write
private int lockers;
private AtomicInteger lockers;
/** Constructor */
public HLocking() {
this.mutex = new Integer(0);
this.lockers = 0;
this.lockers = new AtomicInteger(0);
}
/**
@ -46,13 +48,13 @@ public class HLocking {
*/
public void obtainReadLock() {
synchronized(mutex) {
while(lockers < 0) {
while(lockers.get() < 0) {
try {
mutex.wait();
} catch(InterruptedException ie) {
}
}
lockers++;
lockers.incrementAndGet();
mutex.notifyAll();
}
}
@ -62,8 +64,7 @@ public class HLocking {
*/
public void releaseReadLock() {
synchronized(mutex) {
lockers--;
if(lockers < 0) {
if(lockers.decrementAndGet() < 0) {
throw new IllegalStateException("lockers: " + lockers);
}
mutex.notifyAll();
@ -75,13 +76,12 @@ public class HLocking {
*/
public void obtainWriteLock() {
synchronized(mutex) {
while(lockers != 0) {
while(!lockers.compareAndSet(0, -1)) {
try {
mutex.wait();
} catch (InterruptedException ie) {
}
}
lockers = -1;
mutex.notifyAll();
}
}
@ -91,10 +91,9 @@ public class HLocking {
*/
public void releaseWriteLock() {
synchronized(mutex) {
if(lockers != -1) {
if(!lockers.compareAndSet(-1, 0)) {
throw new IllegalStateException("lockers: " + lockers);
}
lockers = 0;
mutex.notifyAll();
}
}

View File

@ -270,8 +270,14 @@ public class HLog implements HConstants {
/** Shut down the log. */
public synchronized void close() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("closing log writer");
}
this.writer.close();
this.closed = true;
if(LOG.isDebugEnabled()) {
LOG.debug("log writer closed");
}
}
/**

View File

@ -177,11 +177,11 @@ public class HMaster implements HConstants, HMasterInterface,
}
} finally {
try {
if (scannerId != -1L) {
server.close(scannerId);
}
if (scannerId != -1L) {
server.close(scannerId);
}
} catch (IOException e) {
e.printStackTrace();
e.printStackTrace();
}
scannerId = -1L;
}
@ -581,13 +581,13 @@ public class HMaster implements HConstants, HMasterInterface,
// Main processing loop
for(PendingOperation op = null; !closed; ) {
synchronized(msgQueue) {
while(msgQueue.size() == 0 && serversToServerInfo.size() != 0) {
while(msgQueue.size() == 0 && !closed) {
try {
msgQueue.wait(threadWakeFrequency);
} catch(InterruptedException iex) {
}
}
if(msgQueue.size() == 0 || closed) {
if(closed) {
continue;
}
op = msgQueue.remove(msgQueue.size()-1);
@ -616,14 +616,6 @@ public class HMaster implements HConstants, HMasterInterface,
}
server.stop(); // Stop server
serverLeases.close(); // Turn off the lease monitor
try {
fs.close();
client.close(); // Shut down the client
} catch(IOException iex) {
// Print if ever there is an interrupt (Just for kicks. Remove if it
// ever happens).
iex.printStackTrace();
}
// Join up with all threads
@ -652,6 +644,7 @@ public class HMaster implements HConstants, HMasterInterface,
// ever happens).
iex.printStackTrace();
}
if(LOG.isDebugEnabled()) {
LOG.debug("HMaster main thread exiting");
}
@ -774,19 +767,9 @@ public class HMaster implements HConstants, HMasterInterface,
HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException {
Vector<HMsg> returnMsgs = new Vector<HMsg>();
// Process the kill list
TreeMap<Text, HRegionInfo> regionsToKill =
killList.remove(info.getServerAddress().toString());
if(regionsToKill != null) {
for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
i.hasNext(); ) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
}
}
// Get reports on what the RegionServer did.
for(int i = 0; i < incomingMsgs.length; i++) {
@ -872,18 +855,11 @@ public class HMaster implements HConstants, HMasterInterface,
} else {
boolean reassignRegion = true;
synchronized(regionsToKill) {
if(regionsToKill.containsKey(region.regionName)) {
regionsToKill.remove(region.regionName);
if(regionsToKill.size() > 0) {
killList.put(info.toString(), regionsToKill);
} else {
killList.remove(info.toString());
}
reassignRegion = false;
}
if(regionsToKill.containsKey(region.regionName)) {
regionsToKill.remove(region.regionName);
unassignedRegions.remove(region.regionName);
assignAttempts.remove(region.regionName);
reassignRegion = false;
}
synchronized(msgQueue) {
@ -903,13 +879,14 @@ public class HMaster implements HConstants, HMasterInterface,
LOG.debug("new region " + region.regionName);
}
// A region has split and the old server is serving the two new regions.
if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
// A meta region has split.
allMetaRegionsScanned = false;
}
unassignedRegions.put(region.regionName, region);
assignAttempts.put(region.regionName, 0L);
break;
default:
@ -918,6 +895,16 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
// Process the kill list
if(regionsToKill != null) {
for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
i.hasNext(); ) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
}
}
// Figure out what the RegionServer ought to do, and write back.
if(unassignedRegions.size() > 0) {
@ -1460,109 +1447,168 @@ public class HMaster implements HConstants, HMasterInterface,
} else {
firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
}
for(Iterator<MetaRegion> it =
knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
it.hasNext(); ) {
// Find all the regions that make up this table
synchronized(metaScannerLock) { // Prevent meta scanner from running
for(Iterator<MetaRegion> it =
knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
it.hasNext(); ) {
MetaRegion m = it.next();
HRegionInterface server = client.getHRegionConnection(m.server);
Vector<Text> rowsToDelete = new Vector<Text>();
// Find all the regions that make up this table
long scannerId = -1L;
try {
scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
MetaRegion m = it.next();
HRegionInterface server = client.getHRegionConnection(m.server);
// Rows in the meta table we will need to delete
Vector<Text> rowsToDelete = new Vector<Text>();
// Regions that are being served. We will get the HRegionServers
// to delete them for us, but we don't tell them that until after
// we are done scanning to prevent lock contention
TreeMap<String, TreeMap<Text, HRegionInfo>> localKillList =
new TreeMap<String, TreeMap<Text, HRegionInfo>>();
// Regions that are not being served. We will have to delete
// them ourselves
TreeSet<Text> unservedRegions = new TreeSet<Text>();
long scannerId = -1L;
try {
scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
DataInputBuffer inbuf = new DataInputBuffer();
byte[] bytes;
while(true) {
LabelledData[] values = null;
HStoreKey key = new HStoreKey();
values = server.next(scannerId, key);
if(values == null || values.length == 0) {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for(int i = 0; i < values.length; i++) {
bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getLabel(), bytes);
}
bytes = results.get(COL_REGIONINFO);
if(bytes == null || bytes.length == 0) {
break;
}
inbuf.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
DataInputBuffer inbuf = new DataInputBuffer();
byte[] bytes;
while(true) {
LabelledData[] values = null;
HStoreKey key = new HStoreKey();
values = server.next(scannerId, key);
if(values == null || values.length == 0) {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for(int i = 0; i < values.length; i++) {
bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getLabel(), bytes);
}
bytes = results.get(COL_REGIONINFO);
if(bytes == null || bytes.length == 0) {
break;
}
inbuf.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
if(info.tableDesc.getName().compareTo(tableName) > 0) {
break; // Beyond any more entries for this table
}
if(info.tableDesc.getName().compareTo(tableName) > 0) {
break; // Beyond any more entries for this table
}
rowsToDelete.add(info.regionName);
rowsToDelete.add(info.regionName);
// Is it being served?
// Is it being served?
bytes = results.get(COL_SERVER);
if(bytes != null && bytes.length != 0) {
String serverName = new String(bytes, UTF8_ENCODING);
bytes = results.get(COL_STARTCODE);
bytes = results.get(COL_SERVER);
if(bytes != null && bytes.length != 0) {
long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
String serverName = new String(bytes, UTF8_ENCODING);
HServerInfo s = serversToServerInfo.get(serverName);
if(s != null && s.getStartCode() == startCode) {
bytes = results.get(COL_STARTCODE);
if(bytes != null && bytes.length != 0) {
long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
// It is being served.
// Tell the server to stop it and not report back.
HServerInfo s = serversToServerInfo.get(serverName);
if(s != null && s.getStartCode() == startCode) {
TreeMap<Text, HRegionInfo> regionsToKill =
killList.get(serverName);
// It is being served.
// Tell the server to stop it and not report back.
if(regionsToKill == null) {
regionsToKill = new TreeMap<Text, HRegionInfo>();
TreeMap<Text, HRegionInfo> regionsToKill =
localKillList.get(serverName);
if(regionsToKill == null) {
regionsToKill = new TreeMap<Text, HRegionInfo>();
}
regionsToKill.put(info.regionName, info);
localKillList.put(serverName, regionsToKill);
continue;
}
regionsToKill.put(info.regionName, info);
killList.put(serverName, regionsToKill);
}
}
// Region is not currently being served.
// Prevent it from getting assigned and add it to the list of
// regions we need to delete here.
unassignedRegions.remove(info.regionName);
assignAttempts.remove(info.regionName);
unservedRegions.add(info.regionName);
}
} catch(IOException e) {
e.printStackTrace();
} finally {
if(scannerId != -1L) {
try {
server.close(scannerId);
} catch(IOException e) {
e.printStackTrace();
}
}
scannerId = -1L;
}
// Wipe the existence of the regions out of the meta table
for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
Text rowName = row.next();
if(LOG.isDebugEnabled()) {
LOG.debug("deleting columns in row: " + rowName);
}
long lockid = -1L;
long clientId = rand.nextLong();
try {
lockid = server.startUpdate(m.regionName, clientId, rowName);
server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
server.delete(m.regionName, clientId, lockid, COL_SERVER);
server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
server.commit(m.regionName, clientId, lockid);
lockid = -1L;
if(LOG.isDebugEnabled()) {
LOG.debug("deleted columns in row: " + rowName);
}
} catch(Exception e) {
if(lockid != -1L) {
server.abort(m.regionName, clientId, lockid);
}
LOG.error("columns deletion failed in row: " + rowName);
LOG.error(e);
}
}
} catch(IOException e) {
e.printStackTrace();
} finally {
if(scannerId != -1L) {
// Notify region servers that some regions need to be closed and deleted
if(localKillList.size() != 0) {
killList.putAll(localKillList);
}
// Delete any regions that are not being served
for(Iterator<Text> i = unservedRegions.iterator(); i.hasNext(); ) {
Text regionName = i.next();
try {
server.close(scannerId);
HRegion.deleteRegion(fs, dir, regionName);
} catch(IOException e) {
e.printStackTrace();
LOG.error("failed to delete region " + regionName);
LOG.error(e);
}
}
scannerId = -1L;
}
for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
Text rowName = row.next();
if(LOG.isDebugEnabled()) {
LOG.debug("deleting columns in row: " + rowName);
}
try {
long clientId = rand.nextLong();
long lockid = server.startUpdate(m.regionName, clientId, rowName);
server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
server.delete(m.regionName, clientId, lockid, COL_SERVER);
server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
server.commit(m.regionName, clientId, lockid);
} catch(Exception e) {
e.printStackTrace();
}
}
}
if(LOG.isDebugEnabled()) {

View File

@ -322,8 +322,10 @@ public class HMemcache {
}
}
} catch(Exception ex) {
} catch(IOException ex) {
LOG.error(ex);
close();
throw ex;
}
}
@ -365,7 +367,7 @@ public class HMemcache {
}
/** Shut down map iterators, and release the lock */
public void close() throws IOException {
public void close() {
if(! scannerClosed) {
try {
for(int i = 0; i < keys.length; i++) {

View File

@ -64,6 +64,19 @@ public class HRegion implements HConstants {
private static final Log LOG = LogFactory.getLog(HRegion.class);
/**
* Deletes all the files for a HRegion
*
* @param fs - the file system object
* @param baseDirectory - base directory for HBase
* @param regionName - name of the region to delete
* @throws IOException
*/
public static void deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
}
/**
* Merge two HRegions. They must be available on the current
* HRegionServer. Returns a brand-new active HRegion, also
@ -245,7 +258,7 @@ public class HRegion implements HConstants {
TreeMap<Long, TreeMap<Text, BytesWritable>> targetColumns
= new TreeMap<Long, TreeMap<Text, BytesWritable>>();
HMemcache memcache = new HMemcache();
HMemcache memcache;
Path dir;
HLog log;
@ -255,9 +268,9 @@ public class HRegion implements HConstants {
Path regiondir;
class WriteState {
public boolean writesOngoing;
public boolean writesEnabled;
public boolean closed;
public volatile boolean writesOngoing;
public volatile boolean writesEnabled;
public volatile boolean closed;
public WriteState() {
this.writesOngoing = true;
this.writesEnabled = true;
@ -265,12 +278,13 @@ public class HRegion implements HConstants {
}
}
WriteState writestate = new WriteState();
volatile WriteState writestate = new WriteState();
int recentCommits = 0;
int commitsSinceFlush = 0;
volatile int commitsSinceFlush = 0;
int maxUnflushedEntries = 0;
int compactionThreshold = 0;
HLocking lock = null;
//////////////////////////////////////////////////////////////////////////////
// Constructor
@ -302,11 +316,15 @@ public class HRegion implements HConstants {
this.fs = fs;
this.conf = conf;
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true;
this.writestate.closed = false;
this.lock = new HLocking();
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@ -355,11 +373,21 @@ public class HRegion implements HConstants {
return this.regionInfo;
}
/** returns true if region is closed */
public boolean isClosed() {
boolean closed = false;
synchronized(writestate) {
closed = writestate.closed;
}
return closed;
}
/** Closes and deletes this HRegion. Called when doing a table deletion, for example */
public void closeAndDelete() throws IOException {
LOG.info("deleting region: " + regionInfo.regionName);
close();
fs.delete(regiondir);
deleteRegion(fs, dir, regionInfo.regionName);
LOG.info("region deleted: " + regionInfo.regionName);
}
/**
@ -373,42 +401,47 @@ public class HRegion implements HConstants {
* time-sensitive thread.
*/
public Vector<HStoreFile> close() throws IOException {
boolean shouldClose = false;
synchronized(writestate) {
if(writestate.closed) {
LOG.info("region " + this.regionInfo.regionName + " closed");
return new Vector<HStoreFile>();
lock.obtainWriteLock();
try {
boolean shouldClose = false;
synchronized(writestate) {
if(writestate.closed) {
LOG.info("region " + this.regionInfo.regionName + " closed");
return new Vector<HStoreFile>();
}
while(writestate.writesOngoing) {
try {
writestate.wait();
} catch (InterruptedException iex) {
}
}
writestate.writesOngoing = true;
shouldClose = true;
}
while(writestate.writesOngoing) {
if(! shouldClose) {
return null;
} else {
LOG.info("closing region " + this.regionInfo.regionName);
Vector<HStoreFile> allHStoreFiles = internalFlushcache();
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.close();
}
try {
writestate.wait();
} catch (InterruptedException iex) {
return allHStoreFiles;
} finally {
synchronized(writestate) {
writestate.closed = true;
writestate.writesOngoing = false;
}
LOG.info("region " + this.regionInfo.regionName + " closed");
}
}
writestate.writesOngoing = true;
shouldClose = true;
}
if(! shouldClose) {
return null;
} else {
LOG.info("closing region " + this.regionInfo.regionName);
Vector<HStoreFile> allHStoreFiles = internalFlushcache();
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.close();
}
try {
return allHStoreFiles;
} finally {
synchronized(writestate) {
writestate.closed = true;
writestate.writesOngoing = false;
}
LOG.info("region " + this.regionInfo.regionName + " closed");
}
} finally {
lock.releaseWriteLock();
}
}
@ -418,7 +451,9 @@ public class HRegion implements HConstants {
*
* Returns two brand-new (and open) HRegions
*/
public HRegion[] closeAndSplit(Text midKey) throws IOException {
public HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
throws IOException {
if(((regionInfo.startKey.getLength() != 0)
&& (regionInfo.startKey.compareTo(midKey) > 0))
|| ((regionInfo.endKey.getLength() != 0)
@ -428,9 +463,6 @@ public class HRegion implements HConstants {
LOG.info("splitting region " + this.regionInfo.regionName);
// Flush this HRegion out to storage, and turn off flushes
// or compactions until close() is called.
Path splits = new Path(regiondir, SPLITDIR);
if(! fs.exists(splits)) {
fs.mkdirs(splits);
@ -453,6 +485,10 @@ public class HRegion implements HConstants {
}
TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
// Flush this HRegion out to storage, and turn off flushes
// or compactions until close() is called.
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
@ -472,8 +508,12 @@ public class HRegion implements HConstants {
alreadySplit.add(hsf);
}
// We just copied most of the data. Now close the HRegion
// and copy the small remainder
// We just copied most of the data.
// Notify the caller that we are about to close the region
listener.regionIsUnavailable(this.getRegionName());
// Now close the HRegion and copy the small remainder
hstoreFilesToSplit = close();
for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
@ -577,19 +617,26 @@ public class HRegion implements HConstants {
* @return - true if the region should be split
*/
public boolean needsSplit(Text midKey) {
Text key = new Text();
long maxSize = 0;
lock.obtainReadLock();
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
long size = i.next().getLargestFileSize(key);
try {
Text key = new Text();
long maxSize = 0;
if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
long size = i.next().getLargestFileSize(key);
if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
}
}
}
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
} finally {
lock.releaseReadLock();
}
}
/**
@ -597,11 +644,16 @@ public class HRegion implements HConstants {
*/
public boolean needsCompaction() {
boolean needsCompaction = false;
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) {
needsCompaction = true;
break;
lock.obtainReadLock();
try {
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) {
needsCompaction = true;
break;
}
}
} finally {
lock.releaseReadLock();
}
return needsCompaction;
}
@ -621,15 +673,20 @@ public class HRegion implements HConstants {
*/
public boolean compactStores() throws IOException {
boolean shouldCompact = false;
synchronized(writestate) {
if((! writestate.writesOngoing)
&& writestate.writesEnabled
&& (! writestate.closed)
&& recentCommits > MIN_COMMITS_FOR_COMPACTION) {
lock.obtainReadLock();
try {
synchronized(writestate) {
if((! writestate.writesOngoing)
&& writestate.writesEnabled
&& (! writestate.closed)
&& recentCommits > MIN_COMMITS_FOR_COMPACTION) {
writestate.writesOngoing = true;
shouldCompact = true;
writestate.writesOngoing = true;
shouldCompact = true;
}
}
} finally {
lock.releaseReadLock();
}
if(! shouldCompact) {
@ -637,6 +694,7 @@ public class HRegion implements HConstants {
return false;
} else {
lock.obtainWriteLock();
try {
LOG.info("starting compaction on region " + this.regionInfo.regionName);
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
@ -652,6 +710,7 @@ public class HRegion implements HConstants {
recentCommits = 0;
writestate.notifyAll();
}
lock.releaseWriteLock();
}
}
}
@ -872,22 +931,28 @@ public class HRegion implements HConstants {
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
// Check the memcache
lock.obtainReadLock();
try {
// Check the memcache
BytesWritable[] result = memcache.get(key, numVersions);
if(result != null) {
return result;
BytesWritable[] result = memcache.get(key, numVersions);
if(result != null) {
return result;
}
// If unavailable in memcache, check the appropriate HStore
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
if(targetStore == null) {
return null;
}
return targetStore.get(key, numVersions);
} finally {
lock.releaseReadLock();
}
// If unavailable in memcache, check the appropriate HStore
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
if(targetStore == null) {
return null;
}
return targetStore.get(key, numVersions);
}
/**
@ -903,13 +968,19 @@ public class HRegion implements HConstants {
public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
HStore targetStore = stores.get(colFamily);
targetStore.getFull(key, memResult);
lock.obtainReadLock();
try {
TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
HStore targetStore = stores.get(colFamily);
targetStore.getFull(key, memResult);
}
return memResult;
} finally {
lock.releaseReadLock();
}
return memResult;
}
/**
@ -917,18 +988,24 @@ public class HRegion implements HConstants {
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
TreeSet<Text> families = new TreeSet<Text>();
for(int i = 0; i < cols.length; i++) {
families.add(HStoreKey.extractFamily(cols[i]));
}
lock.obtainReadLock();
try {
TreeSet<Text> families = new TreeSet<Text>();
for(int i = 0; i < cols.length; i++) {
families.add(HStoreKey.extractFamily(cols[i]));
}
HStore[] storelist = new HStore[families.size()];
int i = 0;
for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
Text family = it.next();
storelist[i++] = stores.get(family);
HStore[] storelist = new HStore[families.size()];
int i = 0;
for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
Text family = it.next();
storelist[i++] = stores.get(family);
}
return new HScanner(cols, firstRow, memcache, storelist);
} finally {
lock.releaseReadLock();
}
return new HScanner(cols, firstRow, memcache, storelist);
}
//////////////////////////////////////////////////////////////////////////////
@ -950,7 +1027,13 @@ public class HRegion implements HConstants {
// We obtain a per-row lock, so other clients will
// block while one client performs an update.
return obtainLock(row);
lock.obtainReadLock();
try {
return obtainLock(row);
} finally {
lock.releaseReadLock();
}
}
/**
@ -1176,9 +1259,16 @@ public class HRegion implements HConstants {
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) throws IOException {
public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
throws IOException {
long scanTime = System.currentTimeMillis();
this.scanners = new HInternalScannerInterface[stores.length + 1];
for(int i = 0; i < this.scanners.length; i++) {
this.scanners[i] = null;
}
this.resultSets = new TreeMap[scanners.length];
this.keys = new HStoreKey[scanners.length];
this.wildcardMatch = false;
@ -1189,28 +1279,38 @@ public class HRegion implements HConstants {
// NOTE: the memcache scanner should be the first scanner
HInternalScannerInterface scanner =
memcache.getScanner(scanTime, cols, firstRow);
try {
HInternalScannerInterface scanner =
memcache.getScanner(scanTime, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
if(scanner.isMultipleMatchScanner()) {
this.multipleMatchers = true;
}
scanners[0] = scanner;
for(int i = 0; i < stores.length; i++) {
scanner = stores[i].getScanner(scanTime, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
if(scanner.isMultipleMatchScanner()) {
this.multipleMatchers = true;
}
scanners[i + 1] = scanner;
}
scanners[0] = scanner;
for(int i = 0; i < stores.length; i++) {
scanner = stores[i].getScanner(scanTime, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
if(scanner.isMultipleMatchScanner()) {
this.multipleMatchers = true;
}
scanners[i + 1] = scanner;
}
} catch(IOException e) {
for(int i = 0; i < this.scanners.length; i++) {
if(scanners[i] != null) {
closeScanner(i);
}
}
throw e;
}
for(int i = 0; i < scanners.length; i++) {
keys[i] = new HStoreKey();
resultSets[i] = new TreeMap<Text, BytesWritable>();
@ -1319,7 +1419,7 @@ public class HRegion implements HConstants {
}
/** Shut down a single scanner */
void closeScanner(int i) throws IOException {
void closeScanner(int i) {
try {
scanners[i].close();
@ -1331,7 +1431,7 @@ public class HRegion implements HConstants {
}
/** All done with the scanner. */
public void close() throws IOException {
public void close() {
for(int i = 0; i < scanners.length; i++) {
if(scanners[i] != null) {
closeScanner(i);

View File

@ -30,7 +30,7 @@ public interface HRegionInterface extends VersionedProtocol {
// Get metainfo about an HRegion
public HRegionInfo getRegionInfo(Text regionName);
public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException;
// GET methods for an HRegion.

View File

@ -46,7 +46,7 @@ public class HRegionServer
private volatile boolean stopRequested;
private Path regionDir;
private HServerAddress address;
private HServerInfo info;
private Configuration conf;
private Random rand;
private TreeMap<Text, HRegion> regions; // region name -> HRegion
@ -64,24 +64,26 @@ public class HRegionServer
private Thread splitOrCompactCheckerThread;
private Integer splitOrCompactLock = new Integer(0);
private class SplitOrCompactChecker implements Runnable {
private class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
private HClient client = new HClient(conf);
private class SplitRegion {
public HRegion region;
public Text midKey;
SplitRegion(HRegion region, Text midKey) {
this.region = region;
this.midKey = midKey;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
*/
public void regionIsUnavailable(Text regionName) {
lock.obtainWriteLock();
regions.remove(regionName);
lock.releaseWriteLock();
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(! stopRequested) {
long startTime = System.currentTimeMillis();
synchronized(splitOrCompactLock) {
synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
// Grab a list of regions to check
@ -93,85 +95,81 @@ public class HRegionServer
lock.releaseReadLock();
}
// Check to see if they need splitting or compacting
Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
Vector<HRegion> toCompact = new Vector<HRegion>();
for(Iterator<HRegion> it = regionsToCheck.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
Text midKey = new Text();
if(cur.needsCompaction()) {
toCompact.add(cur);
} else if(cur.needsSplit(midKey)) {
toSplit.add(new SplitRegion(cur, midKey));
}
}
try {
for(Iterator<HRegion>it = toCompact.iterator(); it.hasNext(); ) {
it.next().compactStores();
}
for(Iterator<HRegion>it = regionsToCheck.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
SplitRegion r = it.next();
lock.obtainWriteLock();
regions.remove(r.region.getRegionName());
lock.releaseWriteLock();
HRegion[] newRegions = null;
Text oldRegion = r.region.getRegionName();
LOG.info("splitting region: " + oldRegion);
newRegions = r.region.closeAndSplit(r.midKey);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
Text tableToUpdate =
(oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
if(LOG.isDebugEnabled()) {
LOG.debug("region split complete. updating meta");
if(cur.isClosed()) {
continue; // Skip if closed
}
client.openTable(tableToUpdate);
long lockid = client.startUpdate(oldRegion);
client.delete(lockid, COL_REGIONINFO);
client.delete(lockid, COL_SERVER);
client.delete(lockid, COL_STARTCODE);
client.commit(lockid);
if(cur.needsCompaction()) {
for(int i = 0; i < newRegions.length; i++) {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
newRegions[i].getRegionInfo().write(out);
// The best time to split a region is right after it has been compacted
lockid = client.startUpdate(newRegions[i].getRegionName());
client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
client.commit(lockid);
if(cur.compactStores()) {
Text midKey = new Text();
if(cur.needsSplit(midKey)) {
Text oldRegion = cur.getRegionName();
LOG.info("splitting region: " + oldRegion);
HRegion[] newRegions = cur.closeAndSplit(midKey, this);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
if(LOG.isDebugEnabled()) {
LOG.debug("region split complete. updating meta");
}
Text tableToUpdate =
(oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
client.openTable(tableToUpdate);
long lockid = client.startUpdate(oldRegion);
client.delete(lockid, COL_REGIONINFO);
client.delete(lockid, COL_SERVER);
client.delete(lockid, COL_STARTCODE);
client.commit(lockid);
for(int i = 0; i < newRegions.length; i++) {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
newRegions[i].getRegionInfo().write(out);
lockid = client.startUpdate(newRegions[i].getRegionName());
client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
client.put(lockid, COL_SERVER,
info.getServerAddress().toString().getBytes(UTF8_ENCODING));
client.put(lockid, COL_STARTCODE,
String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
client.commit(lockid);
}
// Now tell the master about the new regions
if(LOG.isDebugEnabled()) {
LOG.debug("reporting region split to master");
}
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
LOG.info("region split successful. old region=" + oldRegion
+ ", new regions: " + newRegions[0].getRegionName() + ", "
+ newRegions[1].getRegionName());
// Finally, start serving the new regions
lock.obtainWriteLock();
regions.put(newRegions[0].getRegionName(), newRegions[0]);
regions.put(newRegions[1].getRegionName(), newRegions[1]);
lock.releaseWriteLock();
}
}
}
// Now tell the master about the new regions
if(LOG.isDebugEnabled()) {
LOG.debug("reporting region split to master");
}
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
LOG.info("region split successful. old region=" + oldRegion
+ ", new regions: " + newRegions[0].getRegionName() + ", "
+ newRegions[1].getRegionName());
newRegions[0].close();
newRegions[1].close();
}
} catch(IOException e) {
//TODO: What happens if this fails? Are we toast?
@ -229,6 +227,10 @@ public class HRegionServer
for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
if(cur.isClosed()) { // Skip if closed
continue;
}
try {
cur.optionallyFlush();
@ -330,8 +332,7 @@ public class HRegionServer
/** Start a HRegionServer at an indicated location */
public HRegionServer(Path regionDir, HServerAddress address,
Configuration conf)
throws IOException {
Configuration conf) throws IOException {
// Basic setup
this.stopRequested = false;
@ -369,19 +370,25 @@ public class HRegionServer
try {
// Server to handle client requests
this.server = RPC.getServer(this, address.getBindAddress().toString(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf);
this.address = new HServerAddress(server.getListenerAddress());
this.info = new HServerInfo(new HServerAddress(server.getListenerAddress()),
this.rand.nextLong());
// Local file paths
String serverName =
this.address.getBindAddress() + "_" + this.address.getPort();
this.info.getServerAddress().getBindAddress() + "_"
+ this.info.getServerAddress().getPort();
Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
// Logging
this.fs = FileSystem.get(conf);
HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
// TODO: Now we have a consolidated log for all regions, sort and
@ -393,13 +400,14 @@ public class HRegionServer
this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
// Remote HMaster
this.hbaseMaster = (HMasterRegionInterface)RPC.
waitForProxy(HMasterRegionInterface.class,
HMasterRegionInterface.versionID,
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
conf);
this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
HMasterRegionInterface.class, HMasterRegionInterface.versionID,
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
conf);
// Threads
this.workerThread.start();
this.cacheFlusherThread.start();
this.splitOrCompactCheckerThread.start();
@ -452,7 +460,7 @@ public class HRegionServer
this.server.join();
} catch(InterruptedException iex) {
}
LOG.info("HRegionServer stopped at: " + address.toString());
LOG.info("HRegionServer stopped at: " + info.getServerAddress().toString());
}
/**
@ -462,7 +470,6 @@ public class HRegionServer
*/
public void run() {
while(! stopRequested) {
HServerInfo info = new HServerInfo(address, rand.nextLong());
long lastMsg = 0;
long waitTime;
@ -557,7 +564,7 @@ public class HRegionServer
}
} catch(IOException e) {
e.printStackTrace();
LOG.error(e);
}
}
@ -580,7 +587,7 @@ public class HRegionServer
}
}
try {
LOG.info("stopping server at: " + address.toString());
LOG.info("stopping server at: " + info.getServerAddress().toString());
// Send interrupts to wake up threads if sleeping so they notice shutdown.
@ -761,58 +768,68 @@ public class HRegionServer
throws IOException {
this.lock.obtainWriteLock();
HRegion region = null;
try {
HRegion region = regions.remove(info.regionName);
if(region != null) {
region.close();
if(reportWhenCompleted) {
reportClose(region);
}
}
region = regions.remove(info.regionName);
} finally {
this.lock.releaseWriteLock();
}
if(region != null) {
region.close();
if(reportWhenCompleted) {
reportClose(region);
}
}
}
private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
this.lock.obtainWriteLock();
HRegion region = null;
try {
HRegion region = regions.remove(info.regionName);
if(region != null) {
region.closeAndDelete();
}
region = regions.remove(info.regionName);
} finally {
this.lock.releaseWriteLock();
}
if(region != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("deleting region " + info.regionName);
}
region.closeAndDelete();
if(LOG.isDebugEnabled()) {
LOG.debug("region " + info.regionName + " deleted");
}
}
}
/** Called either when the master tells us to restart or from stop() */
private void closeAllRegions() {
Vector<HRegion> regionsToClose = new Vector<HRegion>();
this.lock.obtainWriteLock();
try {
for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) {
HRegion region = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + region.getRegionName());
}
try {
region.close();
} catch(IOException e) {
e.printStackTrace();
}
}
regionsToClose.addAll(regions.values());
regions.clear();
} finally {
this.lock.releaseWriteLock();
}
for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) {
HRegion region = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + region.getRegionName());
}
try {
region.close();
LOG.debug("region closed " + region.getRegionName());
} catch(IOException e) {
LOG.error("error closing region " + region.getRegionName(), e);
}
}
}
/*****************************************************************************
@ -847,20 +864,14 @@ public class HRegionServer
//////////////////////////////////////////////////////////////////////////////
/** Obtain a table descriptor for the given region */
public HRegionInfo getRegionInfo(Text regionName) {
public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException {
HRegion region = getRegion(regionName);
if(region == null) {
return null;
}
return region.getRegionInfo();
}
/** Get the indicated row/column */
public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("get " + row.toString() + ", " + column.toString());
@ -877,9 +888,6 @@ public class HRegionServer
int numVersions) throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
BytesWritable[] results = region.get(row, column, numVersions);
if(results != null) {
@ -893,9 +901,6 @@ public class HRegionServer
long timestamp, int numVersions) throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
BytesWritable[] results = region.get(row, column, timestamp, numVersions);
if(results != null) {
@ -907,9 +912,6 @@ public class HRegionServer
/** Get all the columns (along with their names) for a given row. */
public LabelledData[] getRow(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
TreeMap<Text, BytesWritable> map = region.getFull(row);
LabelledData result[] = new LabelledData[map.size()];
@ -949,9 +951,6 @@ public class HRegionServer
throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
long lockid = region.startUpdate(row);
leases.createLease(new Text(String.valueOf(clientid)),
@ -966,9 +965,6 @@ public class HRegionServer
BytesWritable val) throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
@ -981,9 +977,6 @@ public class HRegionServer
throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
@ -996,9 +989,6 @@ public class HRegionServer
throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
@ -1011,9 +1001,6 @@ public class HRegionServer
throws IOException {
HRegion region = getRegion(regionName);
if(region == null) {
throw new IOException("Not serving region " + regionName);
}
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
@ -1028,14 +1015,20 @@ public class HRegionServer
}
/** Private utility method for safely obtaining an HRegion handle. */
private HRegion getRegion(Text regionName) {
private HRegion getRegion(Text regionName) throws NotServingRegionException {
this.lock.obtainReadLock();
HRegion region = null;
try {
return regions.get(regionName);
region = regions.get(regionName);
} finally {
this.lock.releaseReadLock();
}
if(region == null) {
throw new NotServingRegionException(regionName.toString());
}
return region;
}
//////////////////////////////////////////////////////////////////////////////
@ -1051,14 +1044,12 @@ public class HRegionServer
}
public void leaseExpired() {
HInternalScannerInterface s = scanners.remove(scannerName);
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
}
if(s != null) {
try {
s.close();
} catch(IOException e) {
e.printStackTrace();
}
s.close();
}
}
}
@ -1068,16 +1059,14 @@ public class HRegionServer
throws IOException {
HRegion r = getRegion(regionName);
if(r == null) {
throw new IOException("Not serving region " + regionName);
}
long scannerId = -1L;
try {
HInternalScannerInterface s = r.getScanner(cols, firstRow);
scannerId = rand.nextLong();
Text scannerName = new Text(String.valueOf(scannerId));
scanners.put(scannerName, s);
synchronized(scanners) {
scanners.put(scannerName, s);
}
leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
} catch(IOException e) {
@ -1121,16 +1110,14 @@ public class HRegionServer
public void close(long scannerId) throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = scanners.remove(scannerName);
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
}
if(s == null) {
throw new IOException("unknown scanner");
}
try {
s.close();
} catch(IOException ex) {
ex.printStackTrace();
}
s.close();
leases.cancelLease(scannerName, scannerName);
}

View File

@ -342,8 +342,14 @@ public class HStore {
}
}
public synchronized Vector<HStoreFile> getAllMapFiles() {
return new Vector<HStoreFile>(mapFiles.values());
public Vector<HStoreFile> getAllMapFiles() {
this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(mapFiles.values());
} finally {
this.lock.releaseReadLock();
}
}
//////////////////////////////////////////////////////////////////////////////
@ -938,7 +944,9 @@ public class HStore {
class HStoreScanner extends HAbstractScanner {
private MapFile.Reader[] readers;
public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException {
public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
throws IOException {
super(timestamp, targetCols);
lock.obtainReadLock();
@ -976,6 +984,7 @@ public class HStore {
}
} catch (Exception ex) {
LOG.error(ex);
close();
}
}
@ -1021,10 +1030,15 @@ public class HStore {
}
/** Close down the indicated reader. */
void closeSubScanner(int i) throws IOException {
void closeSubScanner(int i) {
try {
if(readers[i] != null) {
readers[i].close();
try {
readers[i].close();
} catch(IOException e) {
LOG.error(e);
}
}
} finally {
@ -1035,12 +1049,17 @@ public class HStore {
}
/** Shut it down! */
public void close() throws IOException {
public void close() {
if(! scannerClosed) {
try {
for(int i = 0; i < readers.length; i++) {
if(readers[i] != null) {
readers[i].close();
try {
readers[i].close();
} catch(IOException e) {
LOG.error(e);
}
}
}

View File

@ -15,6 +15,8 @@
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import java.io.*;
@ -36,6 +38,8 @@ import java.util.*;
* You should close() the instance if you want to clean up the thread properly.
******************************************************************************/
public class Leases {
private static final Log LOG = LogFactory.getLog(Leases.class);
long leasePeriod;
long leaseCheckFrequency;
LeaseMonitor leaseMonitor;
@ -47,7 +51,7 @@ public class Leases {
/** Indicate the length of the lease, in milliseconds */
public Leases(long leasePeriod, long leaseCheckFrequency) {
this.leasePeriod = leasePeriod;
this.leaseCheckFrequency = leaseCheckFrequency;
this.leaseMonitor = new LeaseMonitor();
this.leaseMonitorThread = new Thread(leaseMonitor);
this.leaseMonitorThread.setName("Lease.monitor");
@ -59,6 +63,9 @@ public class Leases {
* without any cancellation calls.
*/
public void close() {
if(LOG.isDebugEnabled()) {
LOG.debug("closing leases");
}
this.running = false;
try {
this.leaseMonitorThread.interrupt();
@ -71,6 +78,9 @@ public class Leases {
sortedLeases.clear();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("leases closed");
}
}
/** A client obtains a lease... */

View File

@ -0,0 +1,30 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class NotServingRegionException extends IOException {
private static final long serialVersionUID = 1L << 17 - 1L;
public NotServingRegionException() {
super();
}
public NotServingRegionException(String s) {
super(s);
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.io.Text;
/**
* Used as a callback mechanism so that an HRegion can notify the HRegionServer
* when a region is about to be closed during a split operation. This is done
* to minimize the amount of time the region is off-line.
*/
public interface RegionUnavailableListener {
public void regionIsUnavailable(Text regionName);
}

View File

@ -1,58 +0,0 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.log4j.Level;
/**
* Retrieve environment variables that control debugging and logging environment
*/
public class Environment {
public static boolean debugging = false;
public static Level logLevel = Level.INFO;
private Environment() {}; // Not instantiable
public static void getenv() {
String value = null;
value = System.getenv("DEBUGGING");
if(value != null && value.equalsIgnoreCase("TRUE")) {
debugging = true;
}
value = System.getenv("LOGGING_LEVEL");
if(value != null && value.length() != 0) {
if(value.equalsIgnoreCase("ALL")) {
logLevel = Level.ALL;
} else if(value.equalsIgnoreCase("DEBUG")) {
logLevel = Level.DEBUG;
} else if(value.equalsIgnoreCase("ERROR")) {
logLevel = Level.ERROR;
} else if(value.equalsIgnoreCase("FATAL")) {
logLevel = Level.FATAL;
} else if(value.equalsIgnoreCase("INFO")) {
logLevel = Level.INFO;
} else if(value.equalsIgnoreCase("OFF")) {
logLevel = Level.OFF;
} else if(value.equalsIgnoreCase("TRACE")) {
logLevel = Level.TRACE;
} else if(value.equalsIgnoreCase("WARN")) {
logLevel = Level.WARN;
}
}
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
/**
* Abstract base class for HBase cluster junit tests. Spins up cluster on
* {@link #setUp()} and takes it down again in {@link #tearDown()}.
*/
public abstract class HBaseClusterTestCase extends HBaseTestCase {
protected MiniHBaseCluster cluster;
final boolean miniHdfs;
protected HBaseClusterTestCase() {
this(false);
}
protected HBaseClusterTestCase(String name) {
this(name, false);
}
protected HBaseClusterTestCase(final boolean miniHdfs) {
super();
this.miniHdfs = miniHdfs;
}
protected HBaseClusterTestCase(String name, final boolean miniHdfs) {
super(name);
this.miniHdfs = miniHdfs;
}
public void setUp() throws Exception {
super.setUp();
this.cluster = new MiniHBaseCluster(this.conf, 1, this.miniHdfs);
}
public void tearDown() throws Exception {
super.tearDown();
if (this.cluster != null) {
this.cluster.shutdown();
}
}
}

View File

@ -0,0 +1,46 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* Abstract base class for test cases. Performs all static initialization
*/
public abstract class HBaseTestCase extends TestCase {
static {
StaticTestEnvironment.initialize();
}
protected Configuration conf;
protected HBaseTestCase() {
super();
conf = new HBaseConfiguration();
}
protected HBaseTestCase(String name) {
super(name);
conf = new HBaseConfiguration();
}
protected Path getUnitTestdir(String testName) {
return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
}
}

View File

@ -35,9 +35,9 @@ public class MiniHBaseCluster implements HConstants {
private MiniDFSCluster cluster;
private FileSystem fs;
private Path parentdir;
private HMasterRunner masterRunner;
private Thread masterRunnerThread;
private HRegionServerRunner[] regionServers;
private HMaster master;
private Thread masterThread;
private HRegionServer[] regionServers;
private Thread[] regionThreads;
public MiniHBaseCluster(Configuration conf, int nRegionNodes) {
@ -58,13 +58,13 @@ public class MiniHBaseCluster implements HConstants {
try {
try {
if(System.getProperty("test.build.data") == null) {
if(System.getProperty(StaticTestEnvironment.TEST_DIRECTORY_KEY) == null) {
File testDir = new File(new File("").getAbsolutePath(),
"build/contrib/hbase/test");
String dir = testDir.getAbsolutePath();
LOG.info("Setting test.build.data to " + dir);
System.setProperty("test.build.data", dir);
System.setProperty(StaticTestEnvironment.TEST_DIRECTORY_KEY, dir);
}
if (miniHdfsFilesystem) {
@ -85,26 +85,15 @@ public class MiniHBaseCluster implements HConstants {
}
// Create the master
this.masterRunner = new HMasterRunner();
this.masterRunnerThread = new Thread(masterRunner, "masterRunner");
this.master = new HMaster(conf);
this.masterThread = new Thread(this.master, "HMaster");
// Start up the master
LOG.info("Starting HMaster");
masterRunnerThread.start();
while(! masterRunner.isCrashed() && ! masterRunner.isInitialized()) {
try {
LOG.info("...waiting for HMaster to initialize...");
Thread.sleep(1000);
} catch(InterruptedException e) {
}
if(masterRunner.isCrashed()) {
throw new RuntimeException("HMaster crashed");
}
}
LOG.info("HMaster started.");
masterThread.start();
// Set the master's port for the HRegionServers
String address = masterRunner.getHMasterAddress().toString();
String address = master.getMasterAddress().toString();
this.conf.set(MASTER_ADDRESS, address);
// Start the HRegionServers
@ -115,34 +104,20 @@ public class MiniHBaseCluster implements HConstants {
LOG.info("Starting HRegionServers");
startRegionServers(this.conf, nRegionNodes);
LOG.info("HRegionServers running");
// Wait for things to get started
while(! masterRunner.isCrashed() && ! masterRunner.isUp()) {
try {
LOG.info("Waiting for Mini HBase cluster to start...");
Thread.sleep(1000);
} catch(InterruptedException e) {
}
if(masterRunner.isCrashed()) {
throw new RuntimeException("HMaster crashed");
}
}
} catch(Throwable e) {
// Delete all DFS files
deleteFile(new File(System.getProperty("test.build.data"), "dfs"));
throw new RuntimeException("Mini HBase cluster did not start");
e.printStackTrace();
shutdown();
}
}
private void startRegionServers(Configuration conf, int nRegionNodes) {
this.regionServers = new HRegionServerRunner[nRegionNodes];
private void startRegionServers(Configuration conf, int nRegionNodes)
throws IOException {
this.regionServers = new HRegionServer[nRegionNodes];
this.regionThreads = new Thread[nRegionNodes];
for(int i = 0; i < nRegionNodes; i++) {
regionServers[i] = new HRegionServerRunner(conf);
regionServers[i] = new HRegionServer(conf);
regionThreads[i] = new Thread(regionServers[i], "HRegionServer-" + i);
regionThreads[i].start();
}
@ -153,35 +128,48 @@ public class MiniHBaseCluster implements HConstants {
* supplied port is not necessarily the actual port used.
*/
public HServerAddress getHMasterAddress() {
return masterRunner.getHMasterAddress();
return master.getMasterAddress();
}
/** Shut down the HBase cluster */
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");
for(int i = 0; i < regionServers.length; i++) {
regionServers[i].shutdown();
}
masterRunner.shutdown();
for(int i = 0; i < regionServers.length; i++) {
try {
regionThreads[i].join();
} catch (InterruptedException e) {
regionServers[i].stop();
} catch(IOException e) {
e.printStackTrace();
}
}
try {
masterRunnerThread.join();
} catch (InterruptedException e) {
master.shutdown();
} catch(IOException e) {
e.printStackTrace();
}
if (cluster != null) {
for(int i = 0; i < regionServers.length; i++) {
try {
regionThreads[i].join();
} catch(InterruptedException e) {
}
}
try {
masterThread.join();
} catch(InterruptedException e) {
}
LOG.info("HBase Cluster shutdown complete");
if(cluster != null) {
LOG.info("Shutting down Mini DFS cluster");
cluster.shutdown();
}
// Delete all DFS files
deleteFile(new File(System.getProperty("test.build.data"), "dfs"));
deleteFile(new File(System.getProperty(
StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
}
private void deleteFile(File f) {
@ -193,126 +181,4 @@ public class MiniHBaseCluster implements HConstants {
}
f.delete();
}
private class HMasterRunner implements Runnable {
private HMaster master = null;
private Thread masterThread = null;
private volatile boolean isInitialized = false;
private boolean isCrashed = false;
private boolean isRunning = true;
private long threadSleepTime = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
public HServerAddress getHMasterAddress() {
return this.master.getMasterAddress();
}
public synchronized boolean isInitialized() {
return isInitialized;
}
public synchronized boolean isCrashed() {
return isCrashed;
}
public boolean isUp() {
if(master == null) {
return false;
}
synchronized(this) {
return isInitialized;
}
}
/** Create the HMaster and run it */
public void run() {
try {
synchronized(this) {
if(isRunning) {
this.master = new HMaster(conf);
masterThread = new Thread(this.master);
masterThread.start();
}
isInitialized = true;
}
} catch(Throwable e) {
shutdown();
LOG.error("HMaster crashed:", e);
synchronized(this) {
isCrashed = true;
}
}
while(this.master != null && this.master.isMasterRunning()) {
try {
Thread.sleep(threadSleepTime);
} catch(InterruptedException e) {
}
}
synchronized(this) {
isCrashed = true;
}
shutdown();
}
/** Shut down the HMaster and wait for it to finish */
public synchronized void shutdown() {
isRunning = false;
if (this.master != null) {
try {
this.master.shutdown();
} catch(IOException e) {
LOG.error("Master crashed during stop", e);
} finally {
try {
masterThread.join();
} catch(InterruptedException e) {
}
master = null;
}
}
}
}
private class HRegionServerRunner implements Runnable {
private HRegionServer server = null;
private boolean isRunning = true;
private Configuration conf;
public HRegionServerRunner(Configuration conf) {
this.conf = conf;
}
/** Start up the HRegionServer */
public void run() {
try {
synchronized(this) {
if(isRunning) {
server = new HRegionServer(conf);
}
}
server.run();
} catch(Throwable e) {
shutdown();
LOG.error("HRegionServer crashed:", e);
}
}
/** Shut down the HRegionServer */
public synchronized void shutdown() {
isRunning = false;
if(server != null) {
try {
server.stop();
} catch(IOException e) {
LOG.error("HRegionServer crashed during stop", e);
} finally {
server.join();
server = null;
}
}
}
}
}

View File

@ -0,0 +1,92 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.File;
import java.util.Enumeration;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
public class StaticTestEnvironment {
private StaticTestEnvironment() {}; // Not instantiable
public static final String TEST_DIRECTORY_KEY = "test.build.data";
public static boolean debugging = false;
@SuppressWarnings("unchecked")
public static void initialize() {
String value = null;
if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
System.setProperty(TEST_DIRECTORY_KEY, new File(
"build/contrib/hbase/test").getAbsolutePath());
}
value = System.getenv("DEBUGGING");
if(value != null && value.equalsIgnoreCase("TRUE")) {
debugging = true;
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
Level logLevel = Level.INFO;
value = System.getenv("LOGGING_LEVEL");
if(value != null && value.length() != 0) {
if(value.equalsIgnoreCase("ALL")) {
logLevel = Level.ALL;
} else if(value.equalsIgnoreCase("DEBUG")) {
logLevel = Level.DEBUG;
} else if(value.equalsIgnoreCase("ERROR")) {
logLevel = Level.ERROR;
} else if(value.equalsIgnoreCase("FATAL")) {
logLevel = Level.FATAL;
} else if(value.equalsIgnoreCase("INFO")) {
logLevel = Level.INFO;
} else if(value.equalsIgnoreCase("OFF")) {
logLevel = Level.OFF;
} else if(value.equalsIgnoreCase("TRACE")) {
logLevel = Level.TRACE;
} else if(value.equalsIgnoreCase("WARN")) {
logLevel = Level.WARN;
}
}
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = rootLogger.getAllAppenders();
e.hasMoreElements();) {
Appender a = e.nextElement();
if(a instanceof ConsoleAppender) {
consoleAppender = (ConsoleAppender)a;
break;
}
}
if(consoleAppender != null) {
Layout layout = consoleAppender.getLayout();
if(layout instanceof PatternLayout) {
PatternLayout consoleLayout = (PatternLayout)layout;
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger(
HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
}
}
}

View File

@ -17,29 +17,17 @@ package org.apache.hadoop.hbase;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import junit.framework.TestCase;
public class TestGet extends TestCase {
public class TestGet extends HBaseTestCase {
private static final Text CONTENTS = new Text("contents:");
private static final Text ROW_KEY = new Text(HGlobals.rootRegionInfo.regionName);
@ -71,7 +59,6 @@ public class TestGet extends TestCase {
}
}
@SuppressWarnings("unchecked")
public void testGet() throws IOException {
MiniDFSCluster cluster = null;
@ -79,37 +66,6 @@ public class TestGet extends TestCase {
// Initialization
if(System.getProperty("test.build.data") == null) {
String dir = new File(new File("").getAbsolutePath(), "build/contrib/hbase/test").getAbsolutePath();
System.out.println(dir);
System.setProperty("test.build.data", dir);
}
Configuration conf = new HBaseConfiguration();
Environment.getenv();
if(Environment.debugging) {
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders();
e.hasMoreElements();) {
Appender a = e.nextElement();
if(a instanceof ConsoleAppender) {
consoleAppender = (ConsoleAppender)a;
break;
}
}
if(consoleAppender != null) {
Layout layout = consoleAppender.getLayout();
if(layout instanceof PatternLayout) {
PatternLayout consoleLayout = (PatternLayout)layout;
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel);
}
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
FileSystem fs = cluster.getFileSystem();
Path dir = new Path("/hbase");

View File

@ -15,45 +15,52 @@
*/
package org.apache.hadoop.hbase;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.TreeSet;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
/**
* Test HBase Master and Region servers, client API
*/
public class TestHBaseCluster extends TestCase {
public class TestHBaseCluster extends HBaseClusterTestCase {
private HTableDescriptor desc;
private HClient client;
/** constructor */
public TestHBaseCluster(String name) {
super(name);
public TestHBaseCluster() {
super(true);
this.desc = null;
this.client = null;
}
/** Test suite so that all tests get run */
public static Test suite() {
TestSuite suite = new TestSuite();
suite.addTest(new TestHBaseCluster("testSetup"));
suite.addTest(new TestHBaseCluster("testBasic"));
suite.addTest(new TestHBaseCluster("testScanner"));
suite.addTest(new TestHBaseCluster("testCleanup"));
return suite;
/**
* Since all the "tests" depend on the results of the previous test, they are
* not Junit tests that can stand alone. Consequently we have a single Junit
* test that runs the "sub-tests" as private methods.
*/
public void testHBaseCluster() {
try {
setup();
basic();
scanner();
listTables();
cleanup();
} catch(IOException e) {
e.printStackTrace();
fail();
}
}
public void tearDown() throws Exception {
super.tearDown();
if(client != null) {
client.close();
}
}
private static final int FIRST_ROW = 1;
@ -65,126 +72,61 @@ public class TestHBaseCluster extends TestCase {
private static final String ANCHORNUM = "anchor:anchornum-";
private static final String ANCHORSTR = "anchorstr";
private static Configuration conf = null;
private static boolean failures = false;
private static boolean initialized = false;
private static MiniHBaseCluster cluster = null;
private static HTableDescriptor desc = null;
private static HClient client = null;
// Set up environment, start mini cluster, etc.
@SuppressWarnings("unchecked")
public void testSetup() throws Exception {
try {
if(System.getProperty("test.build.data") == null) {
String dir = new File(new File("").getAbsolutePath(), "build/contrib/hbase/test").getAbsolutePath();
System.out.println(dir);
System.setProperty("test.build.data", dir);
}
conf = new HBaseConfiguration();
Environment.getenv();
Logger rootLogger = Logger.getRootLogger();
if(Environment.debugging) {
rootLogger.setLevel(Level.WARN);
}
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders();
e.hasMoreElements();) {
Appender a = e.nextElement();
if(a instanceof ConsoleAppender) {
consoleAppender = (ConsoleAppender)a;
break;
}
}
if(consoleAppender != null) {
Layout layout = consoleAppender.getLayout();
if(layout instanceof PatternLayout) {
PatternLayout consoleLayout = (PatternLayout)layout;
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel);
cluster = new MiniHBaseCluster(conf, 1);
client = new HClient(conf);
desc = new HTableDescriptor("test", 3);
desc.addFamily(new Text(CONTENTS));
desc.addFamily(new Text(ANCHOR));
client.createTable(desc);
} catch(Exception e) {
failures = true;
throw e;
}
initialized = true;
private void setup() throws IOException {
client = new HClient(conf);
desc = new HTableDescriptor("test", 3);
desc.addFamily(new Text(CONTENTS));
desc.addFamily(new Text(ANCHOR));
client.createTable(desc);
}
// Test basic functionality. Writes to contents:basic and anchor:anchornum-*
public void testBasic() throws IOException {
if(!initialized) {
throw new IllegalStateException();
private void basic() throws IOException {
long startTime = System.currentTimeMillis();
client.openTable(desc.getName());
// Write out a bunch of values
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
long writeid = client.startUpdate(new Text("row_" + k));
client.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes());
client.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes());
client.commit(writeid);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
startTime = System.currentTimeMillis();
Text collabel = null;
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text rowlabel = new Text("row_" + k);
byte bodydata[] = client.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata);
String bodystr = new String(bodydata).toString().trim();
String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
collabel = new Text(ANCHORNUM + k);
bodydata = client.get(rowlabel, collabel);
bodystr = new String(bodydata).toString().trim();
teststr = ANCHORSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
}
try {
long startTime = System.currentTimeMillis();
client.openTable(desc.getName());
// Write out a bunch of values
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
long writeid = client.startUpdate(new Text("row_" + k));
client.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes());
client.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes());
client.commit(writeid);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
startTime = System.currentTimeMillis();
Text collabel = null;
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text rowlabel = new Text("row_" + k);
byte bodydata[] = client.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata);
String bodystr = new String(bodydata).toString().trim();
String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
collabel = new Text(ANCHORNUM + k);
bodydata = client.get(rowlabel, collabel);
bodystr = new String(bodydata).toString().trim();
teststr = ANCHORSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
} catch(IOException e) {
failures = true;
throw e;
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
}
public void testScanner() throws IOException {
if(!initialized || failures) {
throw new IllegalStateException();
}
private void scanner() throws IOException {
Text[] cols = new Text[] {
new Text(ANCHORNUM + "[0-9]+"),
new Text(CONTENTS_BASIC)
@ -234,57 +176,31 @@ public class TestHBaseCluster extends TestCase {
+ " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
} catch(IOException e) {
failures = true;
throw e;
} finally {
s.close();
}
}
public void testListTables() throws IOException {
if(!initialized || failures) {
throw new IllegalStateException();
}
private void listTables() throws IOException {
HTableDescriptor[] tables = client.listTables();
assertEquals(1, tables.length);
assertEquals(desc.getName(), tables[0].getName());
TreeSet<Text> families = tables[0].families();
assertEquals(2, families.size());
assertTrue(families.contains(new Text(CONTENTS)));
assertTrue(families.contains(new Text(ANCHOR)));
}
private void cleanup() throws IOException {
// Delete the table we created
client.deleteTable(desc.getName());
try {
HTableDescriptor[] tables = client.listTables();
assertEquals(1, tables.length);
assertEquals(desc.getName(), tables[0].getName());
TreeSet<Text> families = tables[0].families();
assertEquals(2, families.size());
assertTrue(families.contains(new Text(CONTENTS)));
assertTrue(families.contains(new Text(ANCHOR)));
Thread.sleep(30000); // Wait for table to be deleted
} catch(IOException e) {
failures = true;
throw e;
} catch(InterruptedException e) {
}
}
public void testCleanup() throws IOException {
if(!initialized) {
throw new IllegalStateException();
}
try {
if(!failures) {
// Delete the table we created
client.deleteTable(desc.getName());
try {
Thread.sleep(60000); // Wait for table to be deleted
} catch(InterruptedException e) {
}
}
} finally {
// Shut down the cluster
cluster.shutdown();
client.close();
}
}
}

View File

@ -0,0 +1,97 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Reader;
public class TestHLog extends HBaseTestCase implements HConstants {
protected void setUp() throws Exception {
super.setUp();
}
public void testAppend() throws Exception {
Path dir = getUnitTestdir(getName());
FileSystem fs = FileSystem.get(this.conf);
if (fs.exists(dir)) {
fs.delete(dir);
}
final int COL_COUNT = 10;
final Text regionName = new Text("regionname");
final Text tableName = new Text("tablename");
final Text row = new Text("row");
Reader reader = null;
HLog log = new HLog(fs, dir, this.conf);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
TreeMap<Text, BytesWritable> cols = new TreeMap<Text, BytesWritable>();
for (int i = 0; i < COL_COUNT; i++) {
cols.put(new Text(Integer.toString(i)),
new BytesWritable(new byte[] { (byte)(i + '0') }));
}
long timestamp = System.currentTimeMillis();
log.append(regionName, tableName, row, cols, timestamp);
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.filenum - 1);
log = null;
// Now open a reader on the log and assert append worked.
reader = new SequenceFile.Reader(fs, filename, conf);
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
for (int i = 0; i < COL_COUNT; i++) {
reader.next(key, val);
assertEquals(key.getRegionName(), regionName);
assertEquals(key.getTablename(), tableName);
assertEquals(key.getRow(), row);
assertEquals(val.getVal().get()[0], (byte)(i + '0'));
System.out.println(key + " " + val);
}
while (reader.next(key, val)) {
// Assert only one more row... the meta flushed row.
assertEquals(key.getRegionName(), regionName);
assertEquals(key.getTablename(), tableName);
assertEquals(key.getRow(), HLog.METAROW);
assertEquals(val.getColumn(), HLog.METACOLUMN);
assertEquals(0, val.getVal().compareTo(COMPLETE_CACHEFLUSH));
System.out.println(key + " " + val);
}
} finally {
if (log != null) {
log.close();
}
if (reader != null) {
reader.close();
}
if (fs.exists(dir)) {
fs.delete(dir);
}
}
}
protected void tearDown() throws Exception {
super.tearDown();
}
}

View File

@ -15,31 +15,20 @@
*/
package org.apache.hadoop.hbase;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
/**
* Basic stand-alone testing of HRegion.
@ -47,27 +36,35 @@ import org.apache.log4j.PatternLayout;
* A lot of the meta information for an HRegion now lives inside other
* HRegions or in the HBaseMaster, so only basic testing is possible.
*/
public class TestHRegion extends TestCase {
public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener {
private Logger LOG = Logger.getLogger(this.getClass().getName());
/** Constructor */
public TestHRegion(String name) {
super(name);
public TestHRegion() {
super();
}
/** Test suite so that all tests get run */
public static Test suite() {
TestSuite suite = new TestSuite();
suite.addTest(new TestHRegion("testSetup"));
suite.addTest(new TestHRegion("testLocks"));
suite.addTest(new TestHRegion("testBadPuts"));
suite.addTest(new TestHRegion("testBasic"));
suite.addTest(new TestHRegion("testScan"));
suite.addTest(new TestHRegion("testBatchWrite"));
suite.addTest(new TestHRegion("testSplitAndMerge"));
suite.addTest(new TestHRegion("testRead"));
suite.addTest(new TestHRegion("testCleanup"));
return suite;
/**
* Since all the "tests" depend on the results of the previous test, they are
* not Junit tests that can stand alone. Consequently we have a single Junit
* test that runs the "sub-tests" as private methods.
*/
public void testHRegion() {
try {
setup();
locks();
badPuts();
basic();
scan();
batchWrite();
splitAndMerge();
read();
cleanup();
} catch(Exception e) {
e.printStackTrace();
fail();
}
}
@ -82,9 +79,6 @@ public class TestHRegion extends TestCase {
private static final Text CONTENTS_FIRSTCOL = new Text("contents:firstcol");
private static final Text ANCHOR_SECONDCOL = new Text("anchor:secondcol");
private static boolean initialized = false;
private static boolean failures = false;
private static Configuration conf = null;
private static MiniDFSCluster cluster = null;
private static FileSystem fs = null;
private static Path parentdir = null;
@ -96,138 +90,86 @@ public class TestHRegion extends TestCase {
private static int numInserted = 0;
// Set up environment, start mini cluster, etc.
// Create directories, start mini cluster, etc.
@SuppressWarnings("unchecked")
public void testSetup() throws IOException {
try {
if(System.getProperty("test.build.data") == null) {
String dir = new File(new File("").getAbsolutePath(), "build/contrib/hbase/test").getAbsolutePath();
System.out.println(dir);
System.setProperty("test.build.data", dir);
}
conf = new HBaseConfiguration();
private void setup() throws IOException {
Environment.getenv();
if(Environment.debugging) {
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
fs = cluster.getFileSystem();
parentdir = new Path("/hbase");
fs.mkdirs(parentdir);
newlogdir = new Path(parentdir, "log");
oldlogfile = new Path(parentdir, "oldlogfile");
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders();
e.hasMoreElements();) {
Appender a = e.nextElement();
if(a instanceof ConsoleAppender) {
consoleAppender = (ConsoleAppender)a;
break;
}
}
if(consoleAppender != null) {
Layout layout = consoleAppender.getLayout();
if(layout instanceof PatternLayout) {
PatternLayout consoleLayout = (PatternLayout)layout;
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel);
}
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
fs = cluster.getFileSystem();
parentdir = new Path("/hbase");
fs.mkdirs(parentdir);
newlogdir = new Path(parentdir, "log");
oldlogfile = new Path(parentdir, "oldlogfile");
log = new HLog(fs, newlogdir, conf);
desc = new HTableDescriptor("test", 3);
desc.addFamily(new Text("contents:"));
desc.addFamily(new Text("anchor:"));
region = new HRegion(parentdir, log, fs, conf,
new HRegionInfo(1, desc, null, null), null, oldlogfile);
} catch(IOException e) {
failures = true;
throw e;
}
initialized = true;
log = new HLog(fs, newlogdir, conf);
desc = new HTableDescriptor("test", 3);
desc.addFamily(new Text("contents:"));
desc.addFamily(new Text("anchor:"));
region = new HRegion(parentdir, log, fs, conf,
new HRegionInfo(1, desc, null, null), null, oldlogfile);
}
// Test basic functionality. Writes to contents:basic and anchor:anchornum-*
public void testBasic() throws IOException {
if(!initialized) {
throw new IllegalStateException();
private void basic() throws IOException {
long startTime = System.currentTimeMillis();
// Write out a bunch of values
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BASIC,
new BytesWritable((CONTENTSTR + k).getBytes()));
region.put(writeid, new Text(ANCHORNUM + k),
new BytesWritable((ANCHORSTR + k).getBytes()));
region.commit(writeid);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Flush cache
startTime = System.currentTimeMillis();
region.flushcache(false);
System.out.println("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
startTime = System.currentTimeMillis();
Text collabel = null;
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text rowlabel = new Text("row_" + k);
BytesWritable bodydata = region.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata);
byte[] bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
String bodystr = new String(bytes).toString().trim();
String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
collabel = new Text(ANCHORNUM + k);
bodydata = region.get(rowlabel, collabel);
bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
bodystr = new String(bytes).toString().trim();
teststr = ANCHORSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
}
try {
long startTime = System.currentTimeMillis();
// Write out a bunch of values
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BASIC,
new BytesWritable((CONTENTSTR + k).getBytes()));
region.put(writeid, new Text(ANCHORNUM + k),
new BytesWritable((ANCHORSTR + k).getBytes()));
region.commit(writeid);
}
System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Flush cache
startTime = System.currentTimeMillis();
region.flushcache(false);
System.out.println("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Read them back in
startTime = System.currentTimeMillis();
Text collabel = null;
for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
Text rowlabel = new Text("row_" + k);
BytesWritable bodydata = region.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata);
byte[] bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
String bodystr = new String(bytes).toString().trim();
String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
collabel = new Text(ANCHORNUM + k);
bodydata = region.get(rowlabel, collabel);
bytes = new byte[bodydata.getSize()];
System.arraycopy(bodydata.get(), 0, bytes, 0, bytes.length);
bodystr = new String(bytes).toString().trim();
teststr = ANCHORSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel
+ "), expected: '" + teststr + "' got: '" + bodystr + "'",
bodystr, teststr);
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
} catch(IOException e) {
failures = true;
throw e;
}
System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
}
public void testBadPuts() throws IOException {
if(!initialized) {
throw new IllegalStateException();
}
private void badPuts() throws IOException {
// Try put with bad lockid.
boolean exceptionThrown = false;
@ -259,7 +201,7 @@ public class TestHRegion extends TestCase {
/**
* Test getting and releasing locks.
*/
public void testLocks() {
private void locks() {
final int threadCount = 10;
final int lockCount = 10;
@ -317,11 +259,7 @@ public class TestHRegion extends TestCase {
// Test scanners. Writes contents:firstcol and anchor:secondcol
public void testScan() throws IOException {
if(!initialized) {
throw new IllegalStateException();
}
private void scan() throws IOException {
Text cols[] = new Text[] {
CONTENTS_FIRSTCOL,
ANCHOR_SECONDCOL
@ -583,138 +521,126 @@ public class TestHRegion extends TestCase {
// long time to run.
// Creates contents:body
public void testBatchWrite() throws IOException {
if(!initialized || failures) {
throw new IllegalStateException();
}
if(! Environment.debugging) {
private void batchWrite() throws IOException {
if(! StaticTestEnvironment.debugging) {
return;
}
try {
long totalFlush = 0;
long totalCompact = 0;
long totalLog = 0;
long startTime = System.currentTimeMillis();
long totalFlush = 0;
long totalCompact = 0;
long totalLog = 0;
long startTime = System.currentTimeMillis();
// 1M writes
// 1M writes
int valsize = 1000;
for (int k = FIRST_ROW; k <= N_ROWS; k++) {
// Come up with a random 1000-byte string
String randstr1 = "" + System.currentTimeMillis();
StringBuffer buf1 = new StringBuffer("val_" + k + "__");
while (buf1.length() < valsize) {
buf1.append(randstr1);
}
int valsize = 1000;
for (int k = FIRST_ROW; k <= N_ROWS; k++) {
// Come up with a random 1000-byte string
String randstr1 = "" + System.currentTimeMillis();
StringBuffer buf1 = new StringBuffer("val_" + k + "__");
while (buf1.length() < valsize) {
buf1.append(randstr1);
}
// Write to the HRegion
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BODY, new BytesWritable(buf1.toString().getBytes()));
region.commit(writeid);
if (k > 0 && k % (N_ROWS / 100) == 0) {
System.out.println("Flushing write #" + k);
// Write to the HRegion
long writeid = region.startUpdate(new Text("row_" + k));
region.put(writeid, CONTENTS_BODY, new BytesWritable(buf1.toString().getBytes()));
region.commit(writeid);
if (k > 0 && k % (N_ROWS / 100) == 0) {
System.out.println("Flushing write #" + k);
long flushStart = System.currentTimeMillis();
region.flushcache(false);
long flushEnd = System.currentTimeMillis();
totalFlush += (flushEnd - flushStart);
long flushStart = System.currentTimeMillis();
region.flushcache(false);
long flushEnd = System.currentTimeMillis();
totalFlush += (flushEnd - flushStart);
if (k % (N_ROWS / 10) == 0) {
System.out.print("Rolling log...");
long logStart = System.currentTimeMillis();
log.rollWriter();
long logEnd = System.currentTimeMillis();
totalLog += (logEnd - logStart);
System.out.println(" elapsed time: " + ((logEnd - logStart) / 1000.0));
}
if (k % (N_ROWS / 10) == 0) {
System.out.print("Rolling log...");
long logStart = System.currentTimeMillis();
log.rollWriter();
long logEnd = System.currentTimeMillis();
totalLog += (logEnd - logStart);
System.out.println(" elapsed time: " + ((logEnd - logStart) / 1000.0));
}
}
long startCompact = System.currentTimeMillis();
if(region.compactStores()) {
totalCompact = System.currentTimeMillis() - startCompact;
System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0));
} else {
System.out.println("No compaction required.");
}
long endTime = System.currentTimeMillis();
long totalElapsed = (endTime - startTime);
System.out.println();
System.out.println("Batch-write complete.");
System.out.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes");
System.out.println("Total flush-time: " + (totalFlush / 1000.0));
System.out.println("Total compact-time: " + (totalCompact / 1000.0));
System.out.println("Total log-time: " + (totalLog / 1000.0));
System.out.println("Total time elapsed: " + (totalElapsed / 1000.0));
System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0)));
System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0));
System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)));
System.out.println();
} catch(IOException e) {
failures = true;
throw e;
}
long startCompact = System.currentTimeMillis();
if(region.compactStores()) {
totalCompact = System.currentTimeMillis() - startCompact;
System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0));
} else {
System.out.println("No compaction required.");
}
long endTime = System.currentTimeMillis();
long totalElapsed = (endTime - startTime);
System.out.println();
System.out.println("Batch-write complete.");
System.out.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes");
System.out.println("Total flush-time: " + (totalFlush / 1000.0));
System.out.println("Total compact-time: " + (totalCompact / 1000.0));
System.out.println("Total log-time: " + (totalLog / 1000.0));
System.out.println("Total time elapsed: " + (totalElapsed / 1000.0));
System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0)));
System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0));
System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)));
System.out.println();
}
// NOTE: This test depends on testBatchWrite succeeding
public void testSplitAndMerge() throws IOException {
if(!initialized || failures) {
throw new IllegalStateException();
private void splitAndMerge() throws IOException {
Text midKey = new Text();
if(region.needsSplit(midKey)) {
System.out.println("Needs split");
}
try {
Text midKey = new Text();
// Split it anyway
if(region.needsSplit(midKey)) {
System.out.println("Needs split");
}
Text midkey = new Text("row_"
+ (StaticTestEnvironment.debugging ? (N_ROWS / 2) : (NUM_VALS/2)));
// Split it anyway
Path oldRegionPath = region.getRegionDir();
Text midkey = new Text("row_" + (Environment.debugging ? (N_ROWS / 2) : (NUM_VALS/2)));
Path oldRegionPath = region.getRegionDir();
long startTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
HRegion subregions[] = region.closeAndSplit(midkey, this);
HRegion subregions[] = region.closeAndSplit(midkey);
System.out.println("Split region elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
System.out.println("Split region elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
assertEquals("Number of subregions", subregions.length, 2);
assertEquals("Number of subregions", subregions.length, 2);
// Now merge it back together
// Now merge it back together
Path oldRegion1 = subregions[0].getRegionDir();
Path oldRegion2 = subregions[1].getRegionDir();
Path oldRegion1 = subregions[0].getRegionDir();
Path oldRegion2 = subregions[1].getRegionDir();
startTime = System.currentTimeMillis();
startTime = System.currentTimeMillis();
region = HRegion.closeAndMerge(subregions[0], subregions[1]);
region = HRegion.closeAndMerge(subregions[0], subregions[1]);
System.out.println("Merge regions elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
System.out.println("Merge regions elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
fs.delete(oldRegionPath);
fs.delete(oldRegion1);
fs.delete(oldRegion2);
}
fs.delete(oldRegionPath);
fs.delete(oldRegion1);
fs.delete(oldRegion2);
} catch(IOException e) {
failures = true;
throw e;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
*/
public void regionIsUnavailable(Text regionName) {
// We don't use this here. It is only for the HRegionServer
}
// This test verifies that everything is still there after splitting and merging
public void testRead() throws IOException {
if(!initialized || failures) {
throw new IllegalStateException();
}
private void read() throws IOException {
// First verify the data written by testBasic()
@ -820,9 +746,8 @@ public class TestHRegion extends TestCase {
// Verify testBatchWrite data
if(Environment.debugging) {
if(StaticTestEnvironment.debugging) {
startTime = System.currentTimeMillis();
s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text());
try {
int numFetched = 0;
@ -884,7 +809,6 @@ public class TestHRegion extends TestCase {
}
}
private static void deleteFile(File f) {
if(f.isDirectory()) {
File[] children = f.listFiles();
@ -895,10 +819,7 @@ public class TestHRegion extends TestCase {
f.delete();
}
public void testCleanup() throws IOException {
if(!initialized) {
throw new IllegalStateException();
}
private void cleanup() throws IOException {
// Shut down the mini cluster
@ -907,6 +828,5 @@ public class TestHRegion extends TestCase {
// Delete all the DFS files
deleteFile(new File(System.getProperty("test.build.data"), "dfs"));
}
}
}

View File

@ -17,9 +17,7 @@ package org.apache.hadoop.hbase;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@ -30,16 +28,7 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import junit.framework.TestCase;
public class TestScanner extends TestCase {
public class TestScanner extends HBaseTestCase {
private static final Text FIRST_ROW = new Text();
private static final Text[] COLS = {
HConstants.COLUMN_FAMILY
@ -127,12 +116,7 @@ public class TestScanner extends TestCase {
} finally {
if(scanner != null) {
try {
scanner.close();
} catch(IOException e) {
e.printStackTrace();
}
scanner.close();
scanner = null;
}
}
@ -146,7 +130,6 @@ public class TestScanner extends TestCase {
}
/** The test! */
@SuppressWarnings("unchecked")
public void testScanner() throws IOException {
MiniDFSCluster cluster = null;
FileSystem fs = null;
@ -155,37 +138,7 @@ public class TestScanner extends TestCase {
// Initialization
if(System.getProperty("test.build.data") == null) {
String dir = new File(new File("").getAbsolutePath(), "build/contrib/hbase/test").getAbsolutePath();
System.out.println(dir);
System.setProperty("test.build.data", dir);
}
Configuration conf = new HBaseConfiguration();
Environment.getenv();
if(Environment.debugging) {
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
ConsoleAppender consoleAppender = null;
for(Enumeration<Appender> e = (Enumeration<Appender>)rootLogger.getAllAppenders();
e.hasMoreElements();) {
Appender a = e.nextElement();
if(a instanceof ConsoleAppender) {
consoleAppender = (ConsoleAppender)a;
break;
}
}
if(consoleAppender != null) {
Layout layout = consoleAppender.getLayout();
if(layout instanceof PatternLayout) {
PatternLayout consoleLayout = (PatternLayout)layout;
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel);
}
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
fs = cluster.getFileSystem();
Path dir = new Path("/hbase");

View File

@ -0,0 +1,77 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
/** Tests table creation restrictions*/
public class TestTable extends HBaseClusterTestCase {
public TestTable() {
super(true);
}
public void testTable() {
HClient client = new HClient(conf);
try {
client.createTable(HGlobals.rootTableDesc);
} catch(IllegalArgumentException e) {
// Expected - ignore it
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
}
try {
client.createTable(HGlobals.metaTableDesc);
} catch(IllegalArgumentException e) {
// Expected - ignore it
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
}
HTableDescriptor desc = new HTableDescriptor("test", 1);
desc.addFamily(HConstants.COLUMN_FAMILY);
try {
client.createTable(desc);
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
}
try {
client.createTable(desc);
} catch(IOException e) {
// Expected. Ignore it.
} catch(Exception e) {
System.err.println("Unexpected exception");
e.printStackTrace();
fail();
}
}
}