HADOOP-1678 On region split, master should designate which host should serve daughter splits. Phase 2: Master assigns children of split region instead of HRegionServer serving both children.
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@565616 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
931d452cb2
commit
0c7ac6795f
|
@ -89,3 +89,7 @@ Trunk (unreleased changes)
|
|||
56. HADOOP-1678 On region split, master should designate which host should
|
||||
serve daughter splits. Phase 1: Master balances load for new regions and
|
||||
when a region server fails.
|
||||
57. HADOOP-1678 On region split, master should designate which host should
|
||||
serve daughter splits. Phase 2: Master assigns children of split region
|
||||
instead of HRegionServer serving both children.
|
||||
|
||||
|
|
|
@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* Provides administrative functions for HBase
|
||||
*/
|
||||
|
@ -170,7 +171,6 @@ public class HBaseAdmin implements HConstants {
|
|||
// Wait until first region is deleted
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
long scannerId = -1L;
|
||||
|
@ -185,8 +185,8 @@ public class HBaseAdmin implements HConstants {
|
|||
boolean found = false;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
info =
|
||||
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
|
||||
if (info.tableDesc.getName().equals(tableName)) {
|
||||
found = true;
|
||||
}
|
||||
|
@ -249,7 +249,6 @@ public class HBaseAdmin implements HConstants {
|
|||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int valuesfound = 0;
|
||||
|
@ -272,8 +271,8 @@ public class HBaseAdmin implements HConstants {
|
|||
valuesfound += 1;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
info =
|
||||
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
|
||||
isenabled = !info.offLine;
|
||||
break;
|
||||
}
|
||||
|
@ -349,7 +348,6 @@ public class HBaseAdmin implements HConstants {
|
|||
HRegionInterface server =
|
||||
connection.getHRegionConnection(firstMetaServer.getServerAddress());
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
int valuesfound = 0;
|
||||
|
@ -371,8 +369,8 @@ public class HBaseAdmin implements HConstants {
|
|||
valuesfound += 1;
|
||||
for (int j = 0; j < values.length; j++) {
|
||||
if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[j].getData(), values[j].getData().length);
|
||||
info.readFields(inbuf);
|
||||
info =
|
||||
(HRegionInfo) Writables.getWritable(values[j].getData(), info);
|
||||
disabled = info.offLine;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -34,10 +34,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* A non-instantiable class that manages connections to multiple tables in
|
||||
|
@ -237,7 +237,6 @@ public class HConnectionManager implements HConstants {
|
|||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
|
||||
null);
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
while (true) {
|
||||
KeyedData[] values = server.next(scannerId);
|
||||
if (values.length == 0) {
|
||||
|
@ -245,9 +244,9 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
|
||||
inbuf.reset(values[i].getData(), values[i].getData().length);
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info.readFields(inbuf);
|
||||
HRegionInfo info =
|
||||
(HRegionInfo) Writables.getWritable(values[i].getData(),
|
||||
new HRegionInfo());
|
||||
|
||||
// Only examine the rows where the startKey is zero length
|
||||
if (info.startKey.getLength() == 0) {
|
||||
|
@ -658,7 +657,6 @@ public class HConnectionManager implements HConstants {
|
|||
server.openScanner(t.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
while (true) {
|
||||
HRegionInfo regionInfo = null;
|
||||
String serverAddress = null;
|
||||
|
@ -684,9 +682,8 @@ public class HConnectionManager implements HConstants {
|
|||
results.put(values[i].getKey().getColumn(), values[i].getData());
|
||||
}
|
||||
regionInfo = new HRegionInfo();
|
||||
bytes = results.get(COL_REGIONINFO);
|
||||
inbuf.reset(bytes, bytes.length);
|
||||
regionInfo.readFields(inbuf);
|
||||
regionInfo = (HRegionInfo) Writables.getWritable(
|
||||
results.get(COL_REGIONINFO), regionInfo);
|
||||
|
||||
if (!regionInfo.tableDesc.getName().equals(tableName)) {
|
||||
// We're done
|
||||
|
@ -697,7 +694,7 @@ public class HConnectionManager implements HConstants {
|
|||
break;
|
||||
}
|
||||
|
||||
if (regionInfo.offLine) {
|
||||
if (regionInfo.isOffline() && !regionInfo.isSplit()) {
|
||||
throw new IllegalStateException("table offline: " + tableName);
|
||||
}
|
||||
|
||||
|
@ -710,7 +707,7 @@ public class HConnectionManager implements HConstants {
|
|||
servers.clear();
|
||||
break;
|
||||
}
|
||||
serverAddress = new String(bytes, UTF8_ENCODING);
|
||||
serverAddress = Writables.bytesToString(bytes);
|
||||
servers.put(regionInfo.startKey, new HRegionLocation(
|
||||
regionInfo, new HServerAddress(serverAddress)));
|
||||
}
|
||||
|
|
|
@ -120,6 +120,11 @@ public interface HConstants {
|
|||
/** ROOT/META column family member - contains server start code (a long) */
|
||||
static final Text COL_STARTCODE = new Text(COLUMN_FAMILY + "serverstartcode");
|
||||
|
||||
/** the lower half of a split region */
|
||||
static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA");
|
||||
|
||||
/** the upper half of a split region */
|
||||
static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
|
||||
// Other constants
|
||||
|
||||
/** used by scanners, etc when they want to start at the beginning of a region */
|
||||
|
|
|
@ -97,7 +97,7 @@ public class HLog implements HConstants {
|
|||
*/
|
||||
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
|
||||
Configuration conf) throws IOException {
|
||||
Path logfiles[] = fs.listPaths(srcDir);
|
||||
Path logfiles[] = fs.listPaths(new Path[] {srcDir});
|
||||
LOG.info("splitting " + logfiles.length + " log(s) in " +
|
||||
srcDir.toString());
|
||||
HashMap<Text, SequenceFile.Writer> logWriters =
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -46,15 +44,16 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.hbase.util.Keying;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
|
||||
|
||||
/**
|
||||
* HMaster is the "master server" for a HBase.
|
||||
|
@ -208,9 +207,11 @@ HMasterRegionInterface, Runnable {
|
|||
results.put(values[i].getKey().getColumn(), values[i].getData());
|
||||
}
|
||||
|
||||
HRegionInfo info = HRegion.getRegionInfo(results);
|
||||
String serverName = HRegion.getServerName(results);
|
||||
long startCode = HRegion.getStartCode(results);
|
||||
HRegionInfo info = (HRegionInfo) Writables.getWritable(
|
||||
results.get(COL_REGIONINFO), new HRegionInfo());
|
||||
|
||||
String serverName = Writables.bytesToString(results.get(COL_SERVER));
|
||||
long startCode = Writables.bytesToLong(results.get(COL_STARTCODE));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(Thread.currentThread().getName() + " scanner: " +
|
||||
|
@ -263,9 +264,11 @@ HMasterRegionInterface, Runnable {
|
|||
splitParents.entrySet()) {
|
||||
|
||||
TreeMap<Text, byte[]> results = e.getValue();
|
||||
cleanupSplits(e.getKey(),
|
||||
HRegion.getSplit(results, HRegion.COL_SPLITA),
|
||||
HRegion.getSplit(results, HRegion.COL_SPLITB));
|
||||
cleanupSplits(region.regionName, regionServer, e.getKey(),
|
||||
(HRegionInfo) Writables.getWritable(results.get(COL_SPLITA),
|
||||
new HRegionInfo()),
|
||||
(HRegionInfo) Writables.getWritable(results.get(COL_SPLITB),
|
||||
new HRegionInfo()));
|
||||
}
|
||||
}
|
||||
LOG.info(Thread.currentThread().getName() + " scan of meta region " +
|
||||
|
@ -286,17 +289,19 @@ HMasterRegionInterface, Runnable {
|
|||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param info
|
||||
* @param splitA
|
||||
* @param splitB
|
||||
/**
|
||||
* @param metaRegionName
|
||||
* @param server HRegionInterface of meta server to talk to
|
||||
* @param info HRegionInfo of split parent
|
||||
* @param splitA low key range child region
|
||||
* @param splitB upper key range child region
|
||||
* @return True if we removed <code>info</code> and this region has
|
||||
* been cleaned up.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean cleanupSplits(final HRegionInfo info,
|
||||
final HRegionInfo splitA, final HRegionInfo splitB)
|
||||
throws IOException {
|
||||
private boolean cleanupSplits(final Text metaRegionName,
|
||||
final HRegionInterface server, final HRegionInfo info,
|
||||
final HRegionInfo splitA, final HRegionInfo splitB) throws IOException {
|
||||
|
||||
boolean result = false;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -308,11 +313,11 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
if (!noReferencesA) {
|
||||
noReferencesA =
|
||||
hasReferences(info.getRegionName(), splitA, HRegion.COL_SPLITA);
|
||||
hasReferences(metaRegionName, server, info.getRegionName(), splitA, COL_SPLITA);
|
||||
}
|
||||
if (!noReferencesB) {
|
||||
noReferencesB =
|
||||
hasReferences(info.getRegionName(), splitB, HRegion.COL_SPLITB);
|
||||
hasReferences(metaRegionName, server, info.getRegionName(), splitB, COL_SPLITB);
|
||||
}
|
||||
if (!(noReferencesA && noReferencesB)) {
|
||||
|
||||
|
@ -322,9 +327,16 @@ HMasterRegionInterface, Runnable {
|
|||
LOG.info("Deleting region " + info.getRegionName() +
|
||||
" because daughter splits no longer hold references");
|
||||
|
||||
HRegion.deleteRegion(fs, dir, info.getRegionName());
|
||||
HRegion.removeRegionFromMETA(conf, this.tableName,
|
||||
info.getRegionName());
|
||||
if (!HRegion.deleteRegion(fs, dir, info.getRegionName())) {
|
||||
LOG.warn("Deletion of " + info.getRegionName() + " failed");
|
||||
}
|
||||
|
||||
BatchUpdate b = new BatchUpdate();
|
||||
long lockid = b.startUpdate(info.getRegionName());
|
||||
b.delete(lockid, COL_REGIONINFO);
|
||||
b.delete(lockid, COL_SERVER);
|
||||
b.delete(lockid, COL_STARTCODE);
|
||||
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
|
||||
|
||||
result = true;
|
||||
}
|
||||
|
@ -336,11 +348,30 @@ HMasterRegionInterface, Runnable {
|
|||
return result;
|
||||
}
|
||||
|
||||
protected boolean hasReferences(final Text regionName,
|
||||
protected boolean hasReferences(final Text metaRegionName,
|
||||
final HRegionInterface server, final Text regionName,
|
||||
final HRegionInfo split, final Text column) throws IOException {
|
||||
|
||||
boolean result =
|
||||
HRegion.hasReferences(fs, fs.makeQualified(dir), split);
|
||||
boolean result = false;
|
||||
for (Text family: split.getTableDesc().families().keySet()) {
|
||||
Path p = HStoreFile.getMapDir(fs.makeQualified(dir),
|
||||
split.getRegionName(), HStoreKey.extractFamily(family));
|
||||
|
||||
// Look for reference files.
|
||||
|
||||
Path [] ps = fs.listPaths(p,
|
||||
new PathFilter () {
|
||||
public boolean accept(Path path) {
|
||||
return HStoreFile.isReference(path);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (ps != null && ps.length > 0) {
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (result) {
|
||||
return result;
|
||||
|
@ -351,12 +382,11 @@ HMasterRegionInterface, Runnable {
|
|||
+" no longer has references to " + regionName.toString());
|
||||
}
|
||||
|
||||
HTable t = new HTable(conf, this.tableName);
|
||||
try {
|
||||
HRegion.removeSplitFromMETA(t, regionName, column);
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
BatchUpdate b = new BatchUpdate();
|
||||
long lockid = b.startUpdate(regionName);
|
||||
b.delete(lockid, column);
|
||||
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -380,7 +410,7 @@ HMasterRegionInterface, Runnable {
|
|||
}
|
||||
|
||||
HServerInfo storedInfo = null;
|
||||
if (serverName != null) {
|
||||
if (serverName.length() != 0) {
|
||||
Map<Text, HRegionInfo> regionsToKill = killList.get(serverName);
|
||||
if (regionsToKill != null &&
|
||||
regionsToKill.containsKey(info.regionName)) {
|
||||
|
@ -691,7 +721,7 @@ HMasterRegionInterface, Runnable {
|
|||
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
|
||||
* set of all known valid regions.
|
||||
*/
|
||||
Map<Text, HRegionInfo> unassignedRegions;
|
||||
SortedMap<Text, HRegionInfo> unassignedRegions;
|
||||
|
||||
/**
|
||||
* The 'assignAttempts' table maps from regions to a timestamp that indicates
|
||||
|
@ -775,10 +805,12 @@ HMasterRegionInterface, Runnable {
|
|||
if (!fs.exists(rootRegionDir)) {
|
||||
LOG.info("bootstrap: creating ROOT and first META regions");
|
||||
try {
|
||||
HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc,
|
||||
this.dir, this.conf);
|
||||
HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc,
|
||||
this.dir, this.conf);
|
||||
HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
|
||||
this.conf, null);
|
||||
|
||||
HRegion meta =
|
||||
HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
|
||||
null, null), this.dir, this.conf, null);
|
||||
|
||||
// Add first region from the META table to the ROOT region.
|
||||
|
||||
|
@ -842,7 +874,7 @@ HMasterRegionInterface, Runnable {
|
|||
this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
|
||||
|
||||
this.unassignedRegions =
|
||||
Collections.synchronizedMap(new HashMap<Text, HRegionInfo>());
|
||||
Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
|
||||
|
||||
this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
|
||||
HGlobals.rootRegionInfo);
|
||||
|
@ -1372,7 +1404,12 @@ HMasterRegionInterface, Runnable {
|
|||
// A region has split.
|
||||
|
||||
HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
|
||||
unassignedRegions.put(newRegionA.getRegionName(), newRegionA);
|
||||
assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L));
|
||||
|
||||
HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
|
||||
unassignedRegions.put(newRegionB.getRegionName(), newRegionB);
|
||||
assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L));
|
||||
|
||||
LOG.info("region " + region.regionName + " split. New regions are: "
|
||||
+ newRegionA.regionName + ", " + newRegionB.regionName);
|
||||
|
@ -1381,14 +1418,6 @@ HMasterRegionInterface, Runnable {
|
|||
// A meta region has split.
|
||||
|
||||
onlineMetaRegions.remove(region.getStartKey());
|
||||
onlineMetaRegions.put(newRegionA.getStartKey(),
|
||||
new MetaRegion(info.getServerAddress(),
|
||||
newRegionA.getRegionName(), newRegionA.getStartKey()));
|
||||
|
||||
onlineMetaRegions.put(newRegionB.getStartKey(),
|
||||
new MetaRegion(info.getServerAddress(),
|
||||
newRegionB.getRegionName(), newRegionB.getStartKey()));
|
||||
|
||||
numberOfMetaRegions.incrementAndGet();
|
||||
}
|
||||
break;
|
||||
|
@ -1673,15 +1702,15 @@ HMasterRegionInterface, Runnable {
|
|||
// region had been on shutdown server (could be null because we
|
||||
// missed edits in hlog because hdfs does not do write-append).
|
||||
|
||||
String serverName = null;
|
||||
String serverName;
|
||||
try {
|
||||
serverName = Keying.bytesToString(results.get(COL_SERVER));
|
||||
serverName = Writables.bytesToString(results.get(COL_SERVER));
|
||||
|
||||
} catch(UnsupportedEncodingException e) {
|
||||
LOG.error("Server name", e);
|
||||
break;
|
||||
}
|
||||
if (serverName != null && serverName.length() > 0 &&
|
||||
if (serverName.length() > 0 &&
|
||||
deadServerName.compareTo(serverName) != 0) {
|
||||
|
||||
// This isn't the server you're looking for - move along
|
||||
|
@ -1776,11 +1805,8 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
} else if (e.regionOffline) {
|
||||
e.info.offLine = true;
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
e.info.write(s);
|
||||
server.put(regionName, clientId, lockid, COL_REGIONINFO,
|
||||
byteValue.toByteArray());
|
||||
Writables.getBytes(e.info));
|
||||
}
|
||||
server.delete(regionName, clientId, lockid, COL_SERVER);
|
||||
server.delete(regionName, clientId, lockid, COL_STARTCODE);
|
||||
|
@ -2037,12 +2063,8 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
} else if (!reassignRegion ) {
|
||||
regionInfo.offLine = true;
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
regionInfo.write(s);
|
||||
|
||||
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
|
||||
byteValue.toByteArray());
|
||||
Writables.getBytes(regionInfo));
|
||||
}
|
||||
server.delete(metaRegionName, clientId, lockid, COL_SERVER);
|
||||
server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
|
||||
|
@ -2097,7 +2119,7 @@ HMasterRegionInterface, Runnable {
|
|||
private HServerAddress serverAddress;
|
||||
private byte [] startCode;
|
||||
|
||||
PendingOpenReport(HServerInfo info, HRegionInfo region) {
|
||||
PendingOpenReport(HServerInfo info, HRegionInfo region) throws IOException {
|
||||
if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
|
||||
// The region which just came on-line is a META region.
|
||||
// We need to look in the ROOT region for its information.
|
||||
|
@ -2111,12 +2133,7 @@ HMasterRegionInterface, Runnable {
|
|||
}
|
||||
this.region = region;
|
||||
this.serverAddress = info.getServerAddress();
|
||||
try {
|
||||
this.startCode =
|
||||
String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING);
|
||||
} catch(UnsupportedEncodingException e) {
|
||||
LOG.error("Start code", e);
|
||||
}
|
||||
this.startCode = Writables.longToBytes(info.getStartCode());
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -2188,7 +2205,7 @@ HMasterRegionInterface, Runnable {
|
|||
region.getRegionName());
|
||||
|
||||
server.put(metaRegionName, clientId, lockid, COL_SERVER,
|
||||
serverAddress.toString().getBytes(UTF8_ENCODING));
|
||||
Writables.stringToBytes(serverAddress.toString()));
|
||||
|
||||
server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
|
||||
|
||||
|
@ -2343,21 +2360,18 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
// 2. Create the HRegion
|
||||
|
||||
HRegion region = HRegion.createHRegion(newRegion.regionId,
|
||||
newRegion.getTableDesc(), this.dir, this.conf);
|
||||
HRegion region =
|
||||
HRegion.createHRegion(newRegion, this.dir, this.conf, null);
|
||||
|
||||
// 3. Insert into meta
|
||||
|
||||
HRegionInfo info = region.getRegionInfo();
|
||||
Text regionName = region.getRegionName();
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
info.write(s);
|
||||
long clientId = rand.nextLong();
|
||||
long lockid = r.startUpdate(metaRegionName, clientId, regionName);
|
||||
|
||||
r.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
|
||||
byteValue.toByteArray());
|
||||
Writables.getBytes(info));
|
||||
|
||||
r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis());
|
||||
|
||||
|
@ -2468,7 +2482,6 @@ HMasterRegionInterface, Runnable {
|
|||
System.currentTimeMillis(), null);
|
||||
|
||||
try {
|
||||
DataInputBuffer inbuf = new DataInputBuffer();
|
||||
while (true) {
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
String serverName = null;
|
||||
|
@ -2486,14 +2499,13 @@ HMasterRegionInterface, Runnable {
|
|||
Text column = values[i].getKey().getColumn();
|
||||
if (column.equals(COL_REGIONINFO)) {
|
||||
haveRegionInfo = true;
|
||||
inbuf.reset(values[i].getData(),
|
||||
values[i].getData().length);
|
||||
info.readFields(inbuf);
|
||||
info = (HRegionInfo) Writables.getWritable(
|
||||
values[i].getData(), info);
|
||||
|
||||
} else if (column.equals(COL_SERVER)) {
|
||||
try {
|
||||
serverName =
|
||||
new String(values[i].getData(), UTF8_ENCODING);
|
||||
Writables.bytesToString(values[i].getData());
|
||||
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
assert(false);
|
||||
|
@ -2501,8 +2513,7 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
} else if (column.equals(COL_STARTCODE)) {
|
||||
try {
|
||||
startCode = Long.valueOf(new String(values[i].getData(),
|
||||
UTF8_ENCODING)).longValue();
|
||||
startCode = Writables.bytesToLong(values[i].getData());
|
||||
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
assert(false);
|
||||
|
@ -2568,7 +2579,7 @@ HMasterRegionInterface, Runnable {
|
|||
|
||||
protected boolean isBeingServed(String serverName, long startCode) {
|
||||
boolean result = false;
|
||||
if (serverName != null && startCode != -1L) {
|
||||
if (serverName != null && serverName.length() > 0 && startCode != -1L) {
|
||||
HServerInfo s;
|
||||
synchronized (serversToServerInfo) {
|
||||
s = serversToServerInfo.get(serverName);
|
||||
|
@ -2731,13 +2742,8 @@ HMasterRegionInterface, Runnable {
|
|||
final Text regionName, final HRegionInfo i) throws IOException {
|
||||
|
||||
i.offLine = !online;
|
||||
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
i.write(s);
|
||||
|
||||
server.put(regionName, clientId, lockid, COL_REGIONINFO,
|
||||
byteValue.toByteArray());
|
||||
Writables.getBytes(i));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2811,15 +2817,12 @@ HMasterRegionInterface, Runnable {
|
|||
protected void updateRegionInfo(HRegionInterface server, Text regionName,
|
||||
HRegionInfo i) throws IOException {
|
||||
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
i.write(s);
|
||||
long lockid = -1L;
|
||||
long clientId = rand.nextLong();
|
||||
try {
|
||||
lockid = server.startUpdate(regionName, clientId, i.regionName);
|
||||
server.put(regionName, clientId, lockid, COL_REGIONINFO,
|
||||
byteValue.toByteArray());
|
||||
Writables.getBytes(i));
|
||||
|
||||
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
|
||||
lockid = -1L;
|
||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* A non-instantiable class that has a static method capable of compacting
|
||||
* a table by merging adjacent regions that have grown too small.
|
||||
|
@ -220,7 +222,9 @@ class HMerge implements HConstants {
|
|||
throw new NoSuchElementException("meta region entry missing "
|
||||
+ COL_REGIONINFO);
|
||||
}
|
||||
HRegionInfo region = new HRegionInfo(bytes);
|
||||
HRegionInfo region =
|
||||
(HRegionInfo) Writables.getWritable(bytes, new HRegionInfo());
|
||||
|
||||
if(!region.offLine) {
|
||||
throw new TableNotDisabledException("region " + region.regionName
|
||||
+ " is not disabled");
|
||||
|
|
|
@ -19,10 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
@ -36,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -81,8 +77,6 @@ public class HRegion implements HConstants {
|
|||
static final Log LOG = LogFactory.getLog(HRegion.class);
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private long noFlushCount = 0;
|
||||
static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA");
|
||||
static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
|
||||
|
||||
/**
|
||||
* Merge two HRegions. They must be available on the current
|
||||
|
@ -1667,26 +1661,6 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
|
||||
// Utility methods
|
||||
|
||||
/**
|
||||
* Convenience method creating new HRegions.
|
||||
* Note, this method creates an {@link HLog} for the created region. It
|
||||
* needs to be closed explicitly. Use {@link HRegion#getLog()} to get
|
||||
* access.
|
||||
* @param regionId ID to use
|
||||
* @param tableDesc Descriptor
|
||||
* @param rootDir Root directory of HBase instance
|
||||
* @param conf
|
||||
* @return New META region (ROOT or META).
|
||||
* @throws IOException
|
||||
*/
|
||||
static HRegion createHRegion(final long regionId,
|
||||
final HTableDescriptor tableDesc, final Path rootDir,
|
||||
final Configuration conf)
|
||||
throws IOException {
|
||||
return createHRegion(new HRegionInfo(regionId, tableDesc, null, null),
|
||||
rootDir, conf, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method creating new HRegions. Used by createTable and by the
|
||||
|
@ -1727,218 +1701,10 @@ public class HRegion implements HConstants {
|
|||
throws IOException {
|
||||
// The row key is the region name
|
||||
long writeid = meta.startUpdate(r.getRegionName());
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(bytes);
|
||||
r.getRegionInfo().write(s);
|
||||
meta.put(writeid, COL_REGIONINFO, bytes.toByteArray());
|
||||
meta.put(writeid, COL_REGIONINFO, Writables.getBytes(r.getRegionInfo()));
|
||||
meta.commit(writeid, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
static void addRegionToMETA(final Configuration conf,
|
||||
final Text table, final HRegion region,
|
||||
final HServerAddress serverAddress,
|
||||
final long startCode)
|
||||
throws IOException {
|
||||
HTable t = new HTable(conf, table);
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
DataOutputStream out = new DataOutputStream(bytes);
|
||||
region.getRegionInfo().write(out);
|
||||
long lockid = t.startUpdate(region.getRegionName());
|
||||
t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
|
||||
t.put(lockid, COL_SERVER,
|
||||
serverAddress.toString().getBytes(UTF8_ENCODING));
|
||||
t.put(lockid, COL_STARTCODE,
|
||||
String.valueOf(startCode).getBytes(UTF8_ENCODING));
|
||||
t.commit(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Added region " + region.getRegionName() + " to table " +
|
||||
table);
|
||||
}
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete <code>region</code> from META <code>table</code>.
|
||||
* @param conf Configuration object
|
||||
* @param table META table we are to delete region from.
|
||||
* @param regionName Region to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
static void removeRegionFromMETA(final Configuration conf,
|
||||
final Text table, final Text regionName)
|
||||
throws IOException {
|
||||
HTable t = new HTable(conf, table);
|
||||
try {
|
||||
removeRegionFromMETA(t, regionName);
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete <code>region</code> from META <code>table</code>.
|
||||
* @param conf Configuration object
|
||||
* @param table META table we are to delete region from.
|
||||
* @param regionName Region to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
static void removeRegionFromMETA(final HTable t, final Text regionName)
|
||||
throws IOException {
|
||||
long lockid = t.startBatchUpdate(regionName);
|
||||
t.delete(lockid, COL_REGIONINFO);
|
||||
t.delete(lockid, COL_SERVER);
|
||||
t.delete(lockid, COL_STARTCODE);
|
||||
t.commit(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed " + regionName + " from table " + t.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete <code>split</code> column from META <code>table</code>.
|
||||
* @param t
|
||||
* @param split
|
||||
* @param regionName Region to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
static void removeSplitFromMETA(final HTable t, final Text regionName,
|
||||
final Text split)
|
||||
throws IOException {
|
||||
long lockid = t.startBatchUpdate(regionName);
|
||||
t.delete(lockid, split);
|
||||
t.commit(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed " + split + " from " + regionName +
|
||||
" from table " + t.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <code>region</code> has split. Update META <code>table</code>.
|
||||
* @param client Client to use running update.
|
||||
* @param table META table we are to delete region from.
|
||||
* @param regionName Region to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
static void writeSplitToMETA(final Configuration conf,
|
||||
final Text table, final Text regionName, final HRegionInfo splitA,
|
||||
final HRegionInfo splitB)
|
||||
throws IOException {
|
||||
HTable t = new HTable(conf, table);
|
||||
try {
|
||||
HRegionInfo hri = getRegionInfo(t.get(regionName, COL_REGIONINFO));
|
||||
hri.offLine = true;
|
||||
hri.split = true;
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
DataOutputStream dos = new DataOutputStream(bytes);
|
||||
hri.write(dos);
|
||||
dos.close();
|
||||
long lockid = t.startBatchUpdate(regionName);
|
||||
t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
|
||||
t.put(lockid, COL_SPLITA, Writables.getBytes(splitA));
|
||||
t.put(lockid, COL_SPLITB, Writables.getBytes(splitB));
|
||||
t.commitBatch(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updated " + regionName + " in table " + table +
|
||||
" on its being split");
|
||||
}
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param whichSplit COL_SPLITA or COL_SPLITB?
|
||||
* @param data Map of META row labelled column data.
|
||||
* @return HRegionInfo or null if not found.
|
||||
* @throws IOException
|
||||
*/
|
||||
static HRegionInfo getSplit(final TreeMap<Text, byte[]> data,
|
||||
final Text whichSplit)
|
||||
throws IOException {
|
||||
if (!(whichSplit.equals(COL_SPLITA) || whichSplit.equals(COL_SPLITB))) {
|
||||
throw new IOException("Illegal Argument: " + whichSplit);
|
||||
}
|
||||
byte [] bytes = data.get(whichSplit);
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return null;
|
||||
}
|
||||
return (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data Map of META row labelled column data.
|
||||
* @return An HRegionInfo instance.
|
||||
* @throws IOException
|
||||
*/
|
||||
static HRegionInfo getRegionInfo(final TreeMap<Text, byte[]> data)
|
||||
throws IOException {
|
||||
return getRegionInfo(data.get(COL_REGIONINFO));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes Bytes of a HRegionInfo.
|
||||
* @return An HRegionInfo instance.
|
||||
* @throws IOException
|
||||
*/
|
||||
static HRegionInfo getRegionInfo(final byte[] bytes) throws IOException {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
throw new IOException("no value for " + COL_REGIONINFO);
|
||||
}
|
||||
return (HRegionInfo)Writables.getWritable(bytes, new HRegionInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data Map of META row labelled column data.
|
||||
* @return Server
|
||||
*/
|
||||
static String getServerName(final TreeMap<Text, byte[]> data) {
|
||||
byte [] bytes = data.get(COL_SERVER);
|
||||
String name = null;
|
||||
try {
|
||||
name = (bytes != null && bytes.length != 0) ?
|
||||
new String(bytes, UTF8_ENCODING): null;
|
||||
|
||||
} catch(UnsupportedEncodingException e) {
|
||||
assert(false);
|
||||
}
|
||||
return (name != null)? name.trim(): name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data Map of META row labelled column data.
|
||||
* @return Start code.
|
||||
*/
|
||||
static long getStartCode(final TreeMap<Text, byte[]> data) {
|
||||
long startCode = -1L;
|
||||
byte [] bytes = data.get(COL_STARTCODE);
|
||||
if(bytes != null && bytes.length != 0) {
|
||||
try {
|
||||
startCode = Long.parseLong(new String(bytes, UTF8_ENCODING).trim());
|
||||
} catch(NumberFormatException e) {
|
||||
LOG.error("Failed getting " + COL_STARTCODE, e);
|
||||
} catch(UnsupportedEncodingException e) {
|
||||
LOG.error("Failed getting " + COL_STARTCODE, e);
|
||||
}
|
||||
}
|
||||
return startCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the Path of the HRegion
|
||||
*
|
||||
* @param dir parent directory
|
||||
* @param regionName name of the region
|
||||
* @return Path of HRegion directory
|
||||
*/
|
||||
public static Path getRegionDir(final Path dir, final Text regionName) {
|
||||
return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Deletes all the files for a HRegion
|
||||
*
|
||||
|
@ -1953,32 +1719,15 @@ public class HRegion implements HConstants {
|
|||
Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), regionName);
|
||||
return fs.delete(p);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Look for HStoreFile references in passed region.
|
||||
* @param fs
|
||||
* @param baseDirectory
|
||||
* @param hri
|
||||
* @return True if we found references.
|
||||
* @throws IOException
|
||||
* Computes the Path of the HRegion
|
||||
*
|
||||
* @param dir parent directory
|
||||
* @param regionName name of the region
|
||||
* @return Path of HRegion directory
|
||||
*/
|
||||
static boolean hasReferences(final FileSystem fs, final Path baseDirectory,
|
||||
final HRegionInfo hri)
|
||||
throws IOException {
|
||||
boolean result = false;
|
||||
for (Text family: hri.getTableDesc().families().keySet()) {
|
||||
Path p = HStoreFile.getMapDir(baseDirectory, hri.getRegionName(),
|
||||
HStoreKey.extractFamily(family));
|
||||
// Look for reference files.
|
||||
Path [] ps = fs.listPaths(p, new PathFilter () {
|
||||
public boolean accept(Path path) {
|
||||
return HStoreFile.isReference(path);
|
||||
}});
|
||||
if (ps != null && ps.length > 0) {
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
public static Path getRegionDir(final Path dir, final Text regionName) {
|
||||
return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
|
||||
}
|
||||
}
|
|
@ -19,9 +19,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -80,17 +78,6 @@ public class HRegionInfo implements WritableComparable {
|
|||
this.split = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a HRegionInfo object from byte array
|
||||
*
|
||||
* @param serializedBytes
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegionInfo(final byte [] serializedBytes) throws IOException {
|
||||
this();
|
||||
readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct HRegionInfo with explicit parameters
|
||||
*
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
|||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.BatchOperation;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
@ -79,7 +80,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
// Go down hard. Used debugging and in unit tests.
|
||||
protected volatile boolean abortRequested;
|
||||
|
||||
private final Path rootDir;
|
||||
final Path rootDir;
|
||||
protected final HServerInfo serverInfo;
|
||||
protected final Configuration conf;
|
||||
private final Random rand;
|
||||
|
@ -103,6 +104,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
|
||||
/** Runs periodically to determine if regions need to be compacted or split */
|
||||
class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
|
||||
private HTable root = null;
|
||||
private HTable meta = null;
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
|
@ -199,65 +202,67 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
// When a region is split, the META table needs to updated if we're
|
||||
// splitting a 'normal' region, and the ROOT table needs to be
|
||||
// updated if we are splitting a META region.
|
||||
final Text tableToUpdate =
|
||||
region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)?
|
||||
ROOT_TABLE_NAME : META_TABLE_NAME;
|
||||
LOG.info("Updating " + tableToUpdate + " with region split info");
|
||||
|
||||
HTable t = null;
|
||||
if (region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)) {
|
||||
// We need to update the root region
|
||||
|
||||
if (root == null) {
|
||||
root = new HTable(conf, ROOT_TABLE_NAME);
|
||||
}
|
||||
t = root;
|
||||
|
||||
} else {
|
||||
// For normal regions we need to update the meta region
|
||||
|
||||
if (meta == null) {
|
||||
meta = new HTable(conf, META_TABLE_NAME);
|
||||
}
|
||||
t = meta;
|
||||
}
|
||||
LOG.info("Updating " + t.getTableName() + " with region split info");
|
||||
|
||||
// Remove old region from META
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
HRegion.writeSplitToMETA(conf, tableToUpdate,
|
||||
region.getRegionName(), newRegions[0].getRegionInfo(),
|
||||
newRegions[1].getRegionInfo());
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
if(tries == numRetries - 1) {
|
||||
if(e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
// NOTE: there is no need for retry logic here. HTable does it for us.
|
||||
|
||||
long lockid = t.startBatchUpdate(oldRegionInfo.getRegionName());
|
||||
oldRegionInfo.offLine = true;
|
||||
oldRegionInfo.split = true;
|
||||
t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
|
||||
|
||||
t.put(lockid, COL_SPLITA, Writables.getBytes(
|
||||
newRegions[0].getRegionInfo()));
|
||||
|
||||
t.put(lockid, COL_SPLITB, Writables.getBytes(
|
||||
newRegions[1].getRegionInfo()));
|
||||
t.commitBatch(lockid);
|
||||
|
||||
// Add new regions to META
|
||||
|
||||
for (int i = 0; i < newRegions.length; i++) {
|
||||
for (int tries = 0; tries < numRetries; tries ++) {
|
||||
try {
|
||||
HRegion.addRegionToMETA(conf, tableToUpdate, newRegions[i],
|
||||
serverInfo.getServerAddress(), serverInfo.getStartCode());
|
||||
break;
|
||||
} catch(IOException e) {
|
||||
if(tries == numRetries - 1) {
|
||||
if(e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
lockid = t.startBatchUpdate(newRegions[i].getRegionName());
|
||||
|
||||
t.put(lockid, COL_REGIONINFO, Writables.getBytes(
|
||||
newRegions[i].getRegionInfo()));
|
||||
|
||||
t.commitBatch(lockid);
|
||||
}
|
||||
|
||||
// Now tell the master about the new regions
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Reporting region split to master");
|
||||
}
|
||||
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
|
||||
newRegions[1].getRegionInfo());
|
||||
|
||||
LOG.info("region split, META update, and report to master all" +
|
||||
" successful. Old region=" + oldRegionInfo.getRegionName() +
|
||||
", new regions: " + newRegions[0].getRegionName() + ", " +
|
||||
newRegions[1].getRegionName());
|
||||
|
||||
// Finally, start serving the new regions
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
|
||||
onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
// Do not serve the new regions. Let the Master assign them.
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* A standalone HRegion directory reader. Currently reads content on
|
||||
* file system only.
|
||||
|
@ -194,7 +196,7 @@ class HRegiondirReader {
|
|||
byte [] colvalue = es.getValue();
|
||||
Object value = null;
|
||||
if (colname.toString().equals("info:regioninfo")) {
|
||||
value = new HRegionInfo(colvalue);
|
||||
value = Writables.getWritable(colvalue, new HRegionInfo());
|
||||
} else {
|
||||
value = new String(colvalue, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
|
|
@ -380,7 +380,7 @@ public class HStoreFile implements HConstants, WritableComparable {
|
|||
// Look first at info files. If a reference, these contain info we need
|
||||
// to create the HStoreFile.
|
||||
Path infodir = HStoreFile.getInfoDir(dir, regionName, colFamily);
|
||||
Path infofiles[] = fs.listPaths(infodir);
|
||||
Path infofiles[] = fs.listPaths(new Path[] {infodir});
|
||||
Vector<HStoreFile> results = new Vector<HStoreFile>(infofiles.length);
|
||||
Vector<Path> mapfiles = new Vector<Path>(infofiles.length);
|
||||
for (int i = 0; i < infofiles.length; i++) {
|
||||
|
@ -411,7 +411,7 @@ public class HStoreFile implements HConstants, WritableComparable {
|
|||
Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
|
||||
// List paths by experience returns fully qualified names -- at least when
|
||||
// running on a mini hdfs cluster.
|
||||
Path datfiles[] = fs.listPaths(mapdir);
|
||||
Path datfiles[] = fs.listPaths(new Path[] {mapdir});
|
||||
for (int i = 0; i < datfiles.length; i++) {
|
||||
// If does not have sympathetic info file, delete.
|
||||
if (!mapfiles.contains(fs.makeQualified(datfiles[i]))) {
|
||||
|
|
|
@ -91,6 +91,19 @@ public class HStoreKey implements WritableComparable {
|
|||
return offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns row and column bytes out of an HStoreKey.
|
||||
* @param hsk Store key.
|
||||
* @return byte array encoding of HStoreKey
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public static byte[] getBytes(final HStoreKey hsk)
|
||||
throws UnsupportedEncodingException {
|
||||
StringBuilder s = new StringBuilder(hsk.getRow().toString());
|
||||
s.append(hsk.getColumn().toString());
|
||||
return s.toString().getBytes(HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
Text row;
|
||||
Text column;
|
||||
long timestamp;
|
||||
|
|
|
@ -886,7 +886,17 @@ public class HTable implements HConstants {
|
|||
public void close() throws IOException {
|
||||
checkClosed();
|
||||
if (this.scannerId != -1L) {
|
||||
this.server.close(this.scannerId);
|
||||
try {
|
||||
this.server.close(this.scannerId);
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (!(e instanceof NotServingRegionException)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
this.scannerId = -1L;
|
||||
}
|
||||
this.server = null;
|
||||
|
|
|
@ -94,9 +94,8 @@ public class Leases {
|
|||
* without any cancellation calls.
|
||||
*/
|
||||
public void close() {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing leases");
|
||||
}
|
||||
LOG.info("closing leases");
|
||||
|
||||
this.running = false;
|
||||
try {
|
||||
this.leaseMonitorThread.interrupt();
|
||||
|
@ -110,9 +109,7 @@ public class Leases {
|
|||
sortedLeases.clear();
|
||||
}
|
||||
}
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("leases closed");
|
||||
}
|
||||
LOG.info("leases closed");
|
||||
}
|
||||
|
||||
/* A client obtains a lease... */
|
||||
|
@ -139,9 +136,9 @@ public class Leases {
|
|||
sortedLeases.add(lease);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Created lease " + name);
|
||||
}
|
||||
// if (LOG.isDebugEnabled()) {
|
||||
// LOG.debug("Created lease " + name);
|
||||
// }
|
||||
}
|
||||
|
||||
/* A client renews a lease... */
|
||||
|
@ -170,9 +167,9 @@ public class Leases {
|
|||
sortedLeases.add(lease);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renewed lease " + name);
|
||||
}
|
||||
// if (LOG.isDebugEnabled()) {
|
||||
// LOG.debug("Renewed lease " + name);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -196,9 +193,9 @@ public class Leases {
|
|||
leases.remove(name);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Cancel lease " + name);
|
||||
}
|
||||
// if (LOG.isDebugEnabled()) {
|
||||
// LOG.debug("Cancel lease " + name);
|
||||
// }
|
||||
}
|
||||
|
||||
/** LeaseMonitor is a thread that expires Leases that go on too long. */
|
||||
|
@ -327,9 +324,8 @@ public class Leases {
|
|||
}
|
||||
|
||||
void expired() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Lease expired " + getLeaseName());
|
||||
}
|
||||
LOG.info("Lease expired " + getLeaseName());
|
||||
|
||||
listener.leaseExpired();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,18 +19,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
|
||||
/**
|
||||
* Utility creating hbase friendly keys.
|
||||
* Use fabricating row names or column qualifiers.
|
||||
|
@ -119,61 +111,4 @@ public class Keying {
|
|||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param i
|
||||
* @return <code>i</code> as byte array.
|
||||
*/
|
||||
public static byte[] intToBytes(final int i){
|
||||
ByteBuffer buffer = ByteBuffer.allocate(Integer.SIZE);
|
||||
buffer.putInt(i);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param l
|
||||
* @return <code>i</code> as byte array.
|
||||
*/
|
||||
public static byte[] longToBytes(final long l){
|
||||
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
|
||||
buffer.putLong(l);
|
||||
return buffer.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns row and column bytes out of an HStoreKey.
|
||||
* @param hsk Store key.
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public static byte[] getBytes(final HStoreKey hsk)
|
||||
throws UnsupportedEncodingException {
|
||||
StringBuilder s = new StringBuilder(hsk.getRow().toString());
|
||||
s.append(hsk.getColumn().toString());
|
||||
return s.toString().getBytes(HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes
|
||||
* @return String made of the bytes or null if bytes are null.
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public static String bytesToString(final byte [] bytes)
|
||||
throws UnsupportedEncodingException {
|
||||
if(bytes == null) {
|
||||
return null;
|
||||
}
|
||||
return new String(bytes, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
public static long bytesToLong(final byte [] bytes) throws IOException {
|
||||
long result = -1;
|
||||
DataInputStream dis = null;
|
||||
try {
|
||||
dis = new DataInputStream(new ByteArrayInputStream(bytes));
|
||||
result = dis.readLong();
|
||||
} finally {
|
||||
dis.close();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -22,11 +22,17 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
/**
|
||||
* Utility class with methods for manipulating Writable objects
|
||||
*/
|
||||
public class Writables {
|
||||
/**
|
||||
* @param w
|
||||
|
@ -36,6 +42,9 @@ public class Writables {
|
|||
* @see #getWritable(byte[], Writable)
|
||||
*/
|
||||
public static byte [] getBytes(final Writable w) throws IOException {
|
||||
if (w == null) {
|
||||
throw new IllegalArgumentException("Writable cannot be null");
|
||||
}
|
||||
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
|
||||
DataOutputStream out = new DataOutputStream(byteStream);
|
||||
try {
|
||||
|
@ -64,7 +73,11 @@ public class Writables {
|
|||
public static Writable getWritable(final byte [] bytes, final Writable w)
|
||||
throws IOException {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
throw new IOException("Con't build a writable with empty bytes array");
|
||||
throw new IllegalArgumentException(
|
||||
"Con't build a writable with empty bytes array");
|
||||
}
|
||||
if (w == null) {
|
||||
throw new IllegalArgumentException("Writable cannot be null");
|
||||
}
|
||||
DataInputBuffer in = new DataInputBuffer();
|
||||
try {
|
||||
|
@ -85,14 +98,67 @@ public class Writables {
|
|||
*/
|
||||
public static Writable copyWritable(final Writable src, final Writable tgt)
|
||||
throws IOException {
|
||||
if (src == null || tgt == null) {
|
||||
throw new IllegalArgumentException("Writables cannot be null");
|
||||
}
|
||||
byte [] bytes = getBytes(src);
|
||||
DataInputStream dis = null;
|
||||
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
|
||||
try {
|
||||
dis = new DataInputStream(new ByteArrayInputStream(bytes));
|
||||
tgt.readFields(dis);
|
||||
} finally {
|
||||
dis.close();
|
||||
}
|
||||
return tgt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a long value to a byte array
|
||||
* @param val
|
||||
* @return the byte array
|
||||
* @throws IOException
|
||||
*/
|
||||
public static byte[] longToBytes(long val) throws IOException {
|
||||
return getBytes(new LongWritable(val));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to a long value
|
||||
* @param bytes
|
||||
* @return the long value
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long bytesToLong(byte[] bytes) throws IOException {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return -1L;
|
||||
}
|
||||
return ((LongWritable) getWritable(bytes, new LongWritable())).get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a string to a byte array in a consistent manner.
|
||||
* @param s
|
||||
* @return the byte array
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public static byte[] stringToBytes(String s)
|
||||
throws UnsupportedEncodingException {
|
||||
if (s == null) {
|
||||
throw new IllegalArgumentException("string cannot be null");
|
||||
}
|
||||
return s.getBytes(HConstants.UTF8_ENCODING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a byte array to a string in a consistent manner.
|
||||
* @param bytes
|
||||
* @return the string
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public static String bytesToString(byte[] bytes)
|
||||
throws UnsupportedEncodingException {
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
return "";
|
||||
}
|
||||
return new String(bytes, HConstants.UTF8_ENCODING);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/** Test case for get */
|
||||
public class TestGet extends HBaseTestCase {
|
||||
private static final Log LOG = LogFactory.getLog(TestGet.class.getName());
|
||||
|
@ -59,8 +61,7 @@ public class TestGet extends HBaseTestCase {
|
|||
for(Iterator<Text> i = values.keySet().iterator(); i.hasNext(); ) {
|
||||
Text column = i.next();
|
||||
if (column.equals(HConstants.COL_SERVER)) {
|
||||
byte [] val = values.get(column);
|
||||
String server = new String(val, HConstants.UTF8_ENCODING);
|
||||
String server = Writables.bytesToString(values.get(column));
|
||||
assertEquals(expectedServer, server);
|
||||
LOG.info(server);
|
||||
}
|
||||
|
@ -106,20 +107,17 @@ public class TestGet extends HBaseTestCase {
|
|||
bytes.reset();
|
||||
HGlobals.rootRegionInfo.write(s);
|
||||
|
||||
r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray());
|
||||
r.put(lockid, HConstants.COL_REGIONINFO,
|
||||
Writables.getBytes(HGlobals.rootRegionInfo));
|
||||
|
||||
r.commit(lockid, System.currentTimeMillis());
|
||||
|
||||
lockid = r.startUpdate(ROW_KEY);
|
||||
|
||||
r.put(lockid, HConstants.COL_SERVER,
|
||||
new HServerAddress(SERVER_ADDRESS).toString().
|
||||
getBytes(HConstants.UTF8_ENCODING)
|
||||
);
|
||||
Writables.stringToBytes(new HServerAddress(SERVER_ADDRESS).toString()));
|
||||
|
||||
r.put(lockid, HConstants.COL_STARTCODE,
|
||||
String.valueOf(lockid).getBytes(HConstants.UTF8_ENCODING)
|
||||
);
|
||||
r.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(lockid));
|
||||
|
||||
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"),
|
||||
"region".getBytes(HConstants.UTF8_ENCODING));
|
||||
|
@ -150,8 +148,7 @@ public class TestGet extends HBaseTestCase {
|
|||
|
||||
String otherServerName = "bar.foo.com:4321";
|
||||
r.put(lockid, HConstants.COL_SERVER,
|
||||
new HServerAddress(otherServerName).toString().
|
||||
getBytes(HConstants.UTF8_ENCODING));
|
||||
Writables.stringToBytes(new HServerAddress(otherServerName).toString()));
|
||||
|
||||
r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"),
|
||||
"junk".getBytes());
|
||||
|
|
|
@ -28,9 +28,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* Test of a long-lived scanner validating as we go.
|
||||
*/
|
||||
|
@ -52,13 +53,11 @@ public class TestScanner extends HBaseTestCase {
|
|||
private static final long START_CODE = Long.MAX_VALUE;
|
||||
|
||||
private HRegion region;
|
||||
private DataInputBuffer in = new DataInputBuffer();
|
||||
|
||||
/** Compare the HRegionInfo we read from HBase to what we stored */
|
||||
private void validateRegionInfo(byte [] regionBytes) throws IOException {
|
||||
in.reset(regionBytes, regionBytes.length);
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info.readFields(in);
|
||||
HRegionInfo info =
|
||||
(HRegionInfo) Writables.getWritable(regionBytes, new HRegionInfo());
|
||||
|
||||
assertEquals(REGION_INFO.regionId, info.regionId);
|
||||
assertEquals(0, info.startKey.getLength());
|
||||
|
@ -94,8 +93,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
val = results.get(HConstants.COL_STARTCODE);
|
||||
assertNotNull(val);
|
||||
assertFalse(val.length == 0);
|
||||
long startCode =
|
||||
Long.valueOf(new String(val, HConstants.UTF8_ENCODING));
|
||||
long startCode = Writables.bytesToLong(val);
|
||||
assertEquals(START_CODE, startCode);
|
||||
}
|
||||
|
||||
|
@ -104,7 +102,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
val = results.get(HConstants.COL_SERVER);
|
||||
assertNotNull(val);
|
||||
assertFalse(val.length == 0);
|
||||
String server = new String(val, HConstants.UTF8_ENCODING);
|
||||
String server = Writables.bytesToString(val);
|
||||
assertEquals(0, server.compareTo(serverName));
|
||||
}
|
||||
results.clear();
|
||||
|
@ -187,10 +185,10 @@ public class TestScanner extends HBaseTestCase {
|
|||
lockid = region.startUpdate(ROW_KEY);
|
||||
|
||||
region.put(lockid, HConstants.COL_SERVER,
|
||||
address.toString().getBytes(HConstants.UTF8_ENCODING));
|
||||
Writables.stringToBytes(address.toString()));
|
||||
|
||||
region.put(lockid, HConstants.COL_STARTCODE,
|
||||
String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING));
|
||||
region.put(lockid, HConstants.COL_STARTCODE,
|
||||
Writables.longToBytes(START_CODE));
|
||||
|
||||
region.commit(lockid, System.currentTimeMillis());
|
||||
|
||||
|
@ -227,7 +225,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
lockid = region.startUpdate(ROW_KEY);
|
||||
|
||||
region.put(lockid, HConstants.COL_SERVER,
|
||||
address.toString().getBytes(HConstants.UTF8_ENCODING));
|
||||
Writables.stringToBytes(address.toString()));
|
||||
|
||||
region.commit(lockid, System.currentTimeMillis());
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.filter.RowFilterSet;
|
|||
import org.apache.hadoop.hbase.filter.StopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
import org.apache.hadoop.hbase.io.KeyedData;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
|
@ -57,7 +58,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
final char LAST_COLKEY = '3';
|
||||
final byte[] GOOD_BYTES = "goodstuff".getBytes();
|
||||
final byte[] BAD_BYTES = "badstuff".getBytes();
|
||||
|
||||
|
||||
/**
|
||||
* Test the scanner's handling of various filters.
|
||||
*
|
||||
|
@ -170,7 +171,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
region.regionName.toString().startsWith(getName()));
|
||||
// Now do what happens at split time; remove old region and then add two
|
||||
// new ones in its place.
|
||||
HRegion.removeRegionFromMETA(conf, HConstants.META_TABLE_NAME,
|
||||
removeRegionFromMETA(new HTable(conf, HConstants.META_TABLE_NAME),
|
||||
region.regionName);
|
||||
HTableDescriptor desc = region.tableDesc;
|
||||
Path homedir = new Path(getName());
|
||||
|
@ -183,7 +184,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
homedir, this.conf, null));
|
||||
try {
|
||||
for (HRegion r : newRegions) {
|
||||
HRegion.addRegionToMETA(conf, HConstants.META_TABLE_NAME, r,
|
||||
addRegionToMETA(conf, HConstants.META_TABLE_NAME, r,
|
||||
this.cluster.getHMasterAddress(), -1L);
|
||||
}
|
||||
regions = scan(conf, HConstants.META_TABLE_NAME);
|
||||
|
@ -219,9 +220,15 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
results.put(values[i].getKey().getColumn(), values[i].getData());
|
||||
}
|
||||
|
||||
HRegionInfo info = HRegion.getRegionInfo(results);
|
||||
String serverName = HRegion.getServerName(results);
|
||||
long startCode = HRegion.getStartCode(results);
|
||||
HRegionInfo info = (HRegionInfo) Writables.getWritable(
|
||||
results.get(HConstants.COL_REGIONINFO), new HRegionInfo());
|
||||
|
||||
byte[] bytes = results.get(HConstants.COL_SERVER);
|
||||
String serverName = Writables.bytesToString(bytes);
|
||||
|
||||
long startCode =
|
||||
Writables.bytesToLong(results.get(HConstants.COL_STARTCODE));
|
||||
|
||||
LOG.info(Thread.currentThread().getName() + " scanner: "
|
||||
+ Long.valueOf(scannerId) + ": regioninfo: {" + info.toString()
|
||||
+ "}, server: " + serverName + ", startCode: " + startCode);
|
||||
|
@ -240,4 +247,49 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
private void addRegionToMETA(final Configuration conf,
|
||||
final Text table, final HRegion region,
|
||||
final HServerAddress serverAddress,
|
||||
final long startCode)
|
||||
throws IOException {
|
||||
HTable t = new HTable(conf, table);
|
||||
try {
|
||||
long lockid = t.startUpdate(region.getRegionName());
|
||||
t.put(lockid, HConstants.COL_REGIONINFO, Writables.getBytes(region.getRegionInfo()));
|
||||
t.put(lockid, HConstants.COL_SERVER,
|
||||
Writables.stringToBytes(serverAddress.toString()));
|
||||
t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode));
|
||||
t.commit(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Added region " + region.getRegionName() + " to table " +
|
||||
table);
|
||||
}
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete <code>region</code> from META <code>table</code>.
|
||||
* @param conf Configuration object
|
||||
* @param table META table we are to delete region from.
|
||||
* @param regionName Region to remove.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void removeRegionFromMETA(final HTable t, final Text regionName)
|
||||
throws IOException {
|
||||
try {
|
||||
long lockid = t.startBatchUpdate(regionName);
|
||||
t.delete(lockid, HConstants.COL_REGIONINFO);
|
||||
t.delete(lockid, HConstants.COL_SERVER);
|
||||
t.delete(lockid, HConstants.COL_STARTCODE);
|
||||
t.commit(lockid);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed " + regionName + " from table " + t.getTableName());
|
||||
}
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -31,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* {@Link TestHRegion} does a split but this TestCase adds testing of fast
|
||||
|
@ -45,7 +46,13 @@ public class TestSplit extends HBaseTestCase {
|
|||
private FileSystem fs = null;
|
||||
private static final char FIRST_CHAR = 'a';
|
||||
private static final char LAST_CHAR = 'z';
|
||||
|
||||
|
||||
/** constructor */
|
||||
public TestSplit() {
|
||||
Logger.getRootLogger().setLevel(Level.WARN);
|
||||
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
|
@ -63,12 +70,14 @@ public class TestSplit extends HBaseTestCase {
|
|||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
if (this.fs.exists(testDir)) {
|
||||
this.fs.delete(testDir);
|
||||
if (fs != null) {
|
||||
try {
|
||||
if (this.fs.exists(testDir)) {
|
||||
this.fs.delete(testDir);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
@ -175,13 +184,13 @@ public class TestSplit extends HBaseTestCase {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void testSplitRegionIsDeleted() throws Exception {
|
||||
final int timeout = 60;
|
||||
final int retries = 10;
|
||||
this.testDir = null;
|
||||
this.fs = null;
|
||||
// Start up a hbase cluster
|
||||
this.conf.set(HConstants.HBASE_DIR, this.testDir.toString());
|
||||
MiniHBaseCluster.MasterThread masterThread =
|
||||
MiniHBaseCluster.startMaster(this.conf);
|
||||
List<MiniHBaseCluster.RegionServerThread> regionServerThreads =
|
||||
MiniHBaseCluster.startRegionServers(this.conf, 1);
|
||||
MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1);
|
||||
Path testDir = cluster.regionThreads.get(0).getRegionServer().rootDir;
|
||||
FileSystem fs = cluster.getDFSCluster().getFileSystem();
|
||||
HTable meta = null;
|
||||
HTable t = null;
|
||||
try {
|
||||
|
@ -197,17 +206,15 @@ public class TestSplit extends HBaseTestCase {
|
|||
// region instance and bring on a split.
|
||||
HRegionInfo hri =
|
||||
t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo();
|
||||
HRegion r = null;
|
||||
synchronized(regionServerThreads) {
|
||||
r = regionServerThreads.get(0).getRegionServer().onlineRegions.
|
||||
get(hri.getRegionName());
|
||||
}
|
||||
HRegion r =
|
||||
cluster.regionThreads.get(0).getRegionServer().onlineRegions.get(
|
||||
hri.getRegionName());
|
||||
// Flush will provoke a split next time the split-checker thread runs.
|
||||
r.flushcache(false);
|
||||
// Now, wait until split makes it into the meta table.
|
||||
for (int i = 0; i < timeout &&
|
||||
(count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) {
|
||||
Thread.sleep(1000);
|
||||
for (int i = 0; i < retries &&
|
||||
(count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
int oldCount = count;
|
||||
count = count(meta, HConstants.COLUMN_FAMILY_STR);
|
||||
|
@ -217,47 +224,72 @@ public class TestSplit extends HBaseTestCase {
|
|||
HRegionInfo parent = getSplitParent(meta);
|
||||
assertTrue(parent.isOffline());
|
||||
Path parentDir =
|
||||
HRegion.getRegionDir(this.testDir, parent.getRegionName());
|
||||
assertTrue(this.fs.exists(parentDir));
|
||||
HRegion.getRegionDir(testDir, parent.getRegionName());
|
||||
assertTrue(fs.exists(parentDir));
|
||||
LOG.info("Split happened and parent " + parent.getRegionName() + " is " +
|
||||
"offline");
|
||||
"offline");
|
||||
for (int i = 0; i < retries; i++) {
|
||||
// Now open a scanner on the table. This will force HTable to recalibrate
|
||||
// and in doing so, will force us to wait until the new child regions
|
||||
// come on-line (since they are no longer automatically served by the
|
||||
// HRegionServer that was serving the parent. In this test they will
|
||||
// end up on the same server (since there is only one), but we have to
|
||||
// wait until the master assigns them.
|
||||
try {
|
||||
HScannerInterface s =
|
||||
t.obtainScanner(new Text[] {new Text(COLFAMILY_NAME3)},
|
||||
HConstants.EMPTY_START_ROW);
|
||||
try {
|
||||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
s.next(key, results);
|
||||
break;
|
||||
|
||||
} finally {
|
||||
s.close();
|
||||
}
|
||||
} catch (NotServingRegionException x) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
}
|
||||
// Now, force a compaction. This will rewrite references and make it
|
||||
// so the parent region becomes deletable.
|
||||
LOG.info("Starting compaction");
|
||||
synchronized(regionServerThreads) {
|
||||
for (MiniHBaseCluster.RegionServerThread thread: regionServerThreads) {
|
||||
SortedMap<Text, HRegion> regions =
|
||||
thread.getRegionServer().onlineRegions;
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
for (HRegion online: regions.values()) {
|
||||
if (online.getRegionName().toString().startsWith(getName())) {
|
||||
online.compactStores();
|
||||
}
|
||||
for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) {
|
||||
SortedMap<Text, HRegion> regions =
|
||||
thread.getRegionServer().onlineRegions;
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
for (HRegion online: regions.values()) {
|
||||
if (online.getRegionName().toString().startsWith(getName())) {
|
||||
online.compactStores();
|
||||
}
|
||||
break;
|
||||
} catch (ConcurrentModificationException e) {
|
||||
LOG.warn("Retrying because ..." + e.toString() + " -- one or " +
|
||||
"two should be fine");
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
} catch (ConcurrentModificationException e) {
|
||||
LOG.warn("Retrying because ..." + e.toString() + " -- one or " +
|
||||
"two should be fine");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Now wait until parent disappears.
|
||||
LOG.info("Waiting on parent " + parent.getRegionName() +
|
||||
" to disappear");
|
||||
for (int i = 0; i < timeout && getSplitParent(meta) != null; i++) {
|
||||
Thread.sleep(1000);
|
||||
" to disappear");
|
||||
for (int i = 0; i < retries && getSplitParent(meta) != null; i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
assertTrue(getSplitParent(meta) == null);
|
||||
// Assert cleaned up.
|
||||
assertFalse(this.fs.exists(parentDir));
|
||||
for (int i = 0; i < retries && fs.exists(parentDir); i++) {
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
assertFalse(fs.exists(parentDir));
|
||||
} finally {
|
||||
MiniHBaseCluster.shutdown(masterThread, regionServerThreads);
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,8 +314,13 @@ public class TestSplit extends HBaseTestCase {
|
|||
HStoreKey curKey = new HStoreKey();
|
||||
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
|
||||
while(s.next(curKey, curVals)) {
|
||||
HRegionInfo hri = (HRegionInfo)Writables.
|
||||
getWritable(curVals.get(HConstants.COL_REGIONINFO), new HRegionInfo());
|
||||
byte[] bytes = curVals.get(HConstants.COL_REGIONINFO);
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
continue;
|
||||
}
|
||||
HRegionInfo hri =
|
||||
(HRegionInfo) Writables.getWritable(bytes, new HRegionInfo());
|
||||
|
||||
// Assert that if region is a split region, that it is also offline.
|
||||
// Otherwise, if not a split region, assert that it is online.
|
||||
if (hri.isSplit() && hri.isOffline()) {
|
||||
|
|
Loading…
Reference in New Issue