HBASE-507 Use Callable pattern to sleep between retries

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@645014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-04-05 02:15:34 +00:00
parent 9c3fba564c
commit 2da5e018af
8 changed files with 316 additions and 265 deletions

View File

@ -12,6 +12,7 @@ Hbase Change Log
HBASE-561 HBase package does not include LICENSE.txt nor build.xml
HBASE-563 TestRowFilterAfterWrite erroneously sets master address to
0.0.0.0:60100 rather than relying on conf
HBASE-507 Use Callable pattern to sleep between retries
NEW FEATURES
HBASE-548 Tool to online single region

View File

@ -591,6 +591,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
sleeper.sleep();
}
}
}

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/**
* ProcessRegionClose is the way we do post-processing on a closed region. We
@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
* necessary.
*/
class ProcessRegionClose extends ProcessRegionStatusChange {
private boolean offlineRegion;
private boolean deleteRegion;
protected final boolean offlineRegion;
protected final boolean deleteRegion;
/**
* @param master
@ -61,41 +61,34 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
@Override
protected boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
if (master.closed.get()) {
return true;
}
LOG.info("region closed: " + regionInfo.getRegionName());
Boolean result =
new RetryableMetaOperation<Boolean>(this.metaRegion, this.master) {
public Boolean call() throws IOException {
LOG.info("region closed: " + regionInfo.getRegionName());
// Mark the Region as unavailable in the appropriate meta table
// Mark the Region as unavailable in the appropriate meta table
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the
// delayedToDoQueue, so return true so the operation is not put
// back on the toDoQueue
return true;
}
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the
// delayedToDoQueue, so return true so the operation is not put
// back on the toDoQueue
return true;
}
try {
if (deleteRegion) {
HRegion.removeRegionFromMETA(getMetaServer(), metaRegionName,
regionInfo.getRegionName());
} else if (offlineRegion) {
// offline the region in meta and then note that we've offlined the
// region.
HRegion.offlineRegionInMETA(getMetaServer(), metaRegionName,
regionInfo);
master.regionManager.regionOfflined(regionInfo.getRegionName());
if (deleteRegion) {
HRegion.removeRegionFromMETA(server, metaRegionName,
regionInfo.getRegionName());
} else if (offlineRegion) {
// offline the region in meta and then note that we've offlined the
// region.
HRegion.offlineRegionInMETA(server, metaRegionName,
regionInfo);
master.regionManager.regionOfflined(regionInfo.getRegionName());
}
return true;
}
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
continue;
}
}
}.doWithRetries();
// now that meta is updated, if we need to delete the region's files, now's
// the time.
@ -108,7 +101,6 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
throw e;
}
}
return true;
return result == null ? true : result;
}
}

View File

@ -25,9 +25,7 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/**
* ProcessRegionOpen is instantiated when a region server reports that it is
@ -35,10 +33,11 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
* root region which is handled specially.
*/
class ProcessRegionOpen extends ProcessRegionStatusChange {
private final HServerAddress serverAddress;
private final byte [] startCode;
protected final HServerAddress serverAddress;
protected final byte [] startCode;
/**
* @param master
* @param info
* @param regionInfo
* @throws IOException
@ -59,61 +58,54 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
@Override
protected boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
if (master.closed.get()) {
return true;
}
LOG.info(regionInfo.toString() + " open on " +
this.serverAddress.toString());
Boolean result =
new RetryableMetaOperation<Boolean>(this.metaRegion, this.master) {
public Boolean call() throws IOException {
LOG.info(regionInfo.toString() + " open on " + serverAddress.toString());
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the
// delayedToDoQueue, so return true so the operation is not put
// back on the toDoQueue
return true;
}
// Register the newly-available Region's location.
HRegionInterface server = getMetaServer();
LOG.info("updating row " + regionInfo.getRegionName() + " in table " +
metaRegionName + " with startcode " +
Writables.bytesToLong(this.startCode) + " and server "+
serverAddress.toString());
try {
BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
b.put(COL_SERVER, Writables.stringToBytes(serverAddress.toString()));
b.put(COL_STARTCODE, startCode);
server.batchUpdate(metaRegionName, b);
if (isMetaTable) {
// It's a meta region.
MetaRegion m = new MetaRegion(this.serverAddress,
this.regionInfo.getRegionName(), this.regionInfo.getStartKey());
if (!master.regionManager.isInitialMetaScanComplete()) {
// Put it on the queue to be scanned for the first time.
try {
LOG.debug("Adding " + m.toString() + " to regions to scan");
master.regionManager.addMetaRegionToScan(m);
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into metaRegionsToScan was interrupted.", e);
}
} else {
// Add it to the online meta regions
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
master.regionManager.putMetaRegionOnline(m);
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the
// delayedToDoQueue, so return true so the operation is not put
// back on the toDoQueue
return true;
}
// Register the newly-available Region's location.
LOG.info("updating row " + regionInfo.getRegionName() + " in table " +
metaRegionName + " with startcode " +
Writables.bytesToLong(startCode) + " and server " +
serverAddress.toString());
BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
b.put(COL_SERVER, Writables.stringToBytes(serverAddress.toString()));
b.put(COL_STARTCODE, startCode);
server.batchUpdate(metaRegionName, b);
if (isMetaTable) {
// It's a meta region.
MetaRegion m = new MetaRegion(serverAddress,
regionInfo.getRegionName(), regionInfo.getStartKey());
if (!master.regionManager.isInitialMetaScanComplete()) {
// Put it on the queue to be scanned for the first time.
try {
LOG.debug("Adding " + m.toString() + " to regions to scan");
master.regionManager.addMetaRegionToScan(m);
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into metaRegionsToScan was interrupted.", e);
}
} else {
// Add it to the online meta regions
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
master.regionManager.putMetaRegionOnline(m);
}
}
// If updated successfully, remove from pending list.
master.regionManager.noLongerPending(regionInfo.getRegionName());
return true;
}
// If updated successfully, remove from pending list.
master.regionManager.noLongerPending(regionInfo.getRegionName());
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
}
}
return true;
}.doWithRetries();
return result == null ? true : result;
}
}

View File

@ -19,11 +19,8 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.io.Text;
/**
@ -33,18 +30,26 @@ import org.apache.hadoop.io.Text;
abstract class ProcessRegionStatusChange extends RegionServerOperation {
protected final boolean isMetaTable;
protected final HRegionInfo regionInfo;
private MetaRegion metaRegion;
protected Text metaRegionName;
protected final MetaRegion metaRegion;
protected final Text metaRegionName;
/**
* @param master
* @param regionInfo
*/
public ProcessRegionStatusChange(HMaster master, HRegionInfo regionInfo) {
super(master);
this.regionInfo = regionInfo;
this.isMetaTable = regionInfo.isMetaTable();
this.metaRegion = null;
this.metaRegionName = null;
if (isMetaTable) {
this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
this.metaRegion = new MetaRegion(master.getRootRegionLocation(),
this.metaRegionName, HConstants.EMPTY_START_ROW);
} else {
this.metaRegion =
master.regionManager.getFirstMetaRegionForRegion(regionInfo);
this.metaRegionName = this.metaRegion.getRegionName();
}
}
protected boolean metaRegionAvailable() {
@ -66,23 +71,4 @@ abstract class ProcessRegionStatusChange extends RegionServerOperation {
}
return available;
}
protected HRegionInterface getMetaServer() throws IOException {
if (isMetaTable) {
metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
} else {
if (metaRegion == null) {
metaRegion = master.regionManager.getFirstMetaRegionForRegion(regionInfo);
metaRegionName = metaRegion.getRegionName();
}
}
HServerAddress server = null;
if (isMetaTable) {
server = master.getRootRegionLocation();
} else {
server = metaRegion.getServer();
}
return master.connection.getHRegionConnection(server);
}
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
@ -90,7 +91,7 @@ class ProcessServerShutdown extends RegionServerOperation {
}
/** Finds regions that the dead region server was serving */
private void scanMetaRegion(HRegionInterface server, long scannerId,
protected void scanMetaRegion(HRegionInterface server, long scannerId,
Text regionName) throws IOException {
List<ToDoEntry> toDoList = new ArrayList<ToDoEntry>();
@ -204,6 +205,46 @@ class ProcessServerShutdown extends RegionServerOperation {
}
}
private class ScanRootRegion extends RetryableMetaOperation<Boolean> {
ScanRootRegion(MetaRegion m, HMaster master) {
super(m, master);
}
/** {@inheritDoc} */
public Boolean call() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
master.getRootRegionLocation().getBindAddress());
}
long scannerId = server.openScanner(
HRegionInfo.rootRegionInfo.getRegionName(), COLUMN_FAMILY_ARRAY,
EMPTY_START_ROW, System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId,
HRegionInfo.rootRegionInfo.getRegionName());
return true;
}
}
private class ScanMetaRegions extends RetryableMetaOperation<Boolean> {
ScanMetaRegions(MetaRegion m, HMaster master) {
super(m, master);
}
/** {@inheritDoc} */
public Boolean call() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning " + m.getRegionName() +
" on " + m.getServer());
}
long scannerId =
server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY,
EMPTY_START_ROW, System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId, m.getRegionName());
return true;
}
}
@Override
protected boolean process() throws IOException {
LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
@ -237,36 +278,16 @@ class ProcessServerShutdown extends RegionServerOperation {
if (!rootRescanned) {
// Scan the ROOT region
HRegionInterface server = null;
long scannerId = -1L;
for (int tries = 0; tries < numRetries; tries ++) {
if (master.closed.get()) {
return true;
}
server = master.connection.getHRegionConnection(
master.getRootRegionLocation());
scannerId = -1L;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
master.getRootRegionLocation().getBindAddress());
}
scannerId =
server.openScanner(HRegionInfo.rootRegionInfo.getRegionName(),
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId,
HRegionInfo.rootRegionInfo.getRegionName());
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
}
Boolean result = new ScanRootRegion(
new MetaRegion(master.getRootRegionLocation(),
HRegionInfo.rootRegionInfo.getRegionName(),
HConstants.EMPTY_START_ROW), this.master).doWithRetries();
if (result == null) {
// Master is closing - give up
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
master.getRootRegionLocation().getBindAddress() +
@ -282,44 +303,18 @@ class ProcessServerShutdown extends RegionServerOperation {
return true;
}
for (int tries = 0; tries < numRetries; tries++) {
try {
if (master.closed.get()) {
return true;
}
List<MetaRegion> regions = master.regionManager.getListOfOnlineMetaRegions();
for (MetaRegion r: regions) {
HRegionInterface server = null;
long scannerId = -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning " +
r.getRegionName() + " on " + r.getServer() + " " +
Thread.currentThread().getName() + " attempt " + tries);
}
server = master.connection.getHRegionConnection(r.getServer());
scannerId =
server.openScanner(r.getRegionName(), COLUMN_FAMILY_ARRAY,
EMPTY_START_ROW, System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId, r.getRegionName());
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown finished scanning " +
r.getRegionName() + " on " + r.getServer() + " " +
Thread.currentThread().getName());
}
}
master.serverManager.removeDeadServer(deadServerName);
List<MetaRegion> regions = master.regionManager.getListOfOnlineMetaRegions();
for (MetaRegion r: regions) {
Boolean result = new ScanMetaRegions(r, this.master).doWithRetries();
if (result == null) {
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown finished scanning " +
r.getRegionName() + " on " + r.getServer());
}
}
master.serverManager.removeDeadServer(deadServerName);
return true;
}
}

View File

@ -0,0 +1,89 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
/**
* Uses Callable pattern so that operations against meta regions do not need
* to duplicate retry logic.
*/
abstract class RetryableMetaOperation<T> implements Callable<T> {
protected final Log LOG = LogFactory.getLog(this.getClass());
protected final MetaRegion m;
protected final HMaster master;
protected HRegionInterface server;
protected RetryableMetaOperation(MetaRegion m, HMaster master) {
this.m = m;
this.master = master;
}
protected T doWithRetries()
throws IOException, RuntimeException {
List<IOException> exceptions = new ArrayList<IOException>();
for(int tries = 0; tries < master.numRetries; tries++) {
if (master.closed.get()) {
return null;
}
try {
this.server = master.connection.getHRegionConnection(m.getServer());
return this.call();
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == master.numRetries - 1) {
if (LOG.isDebugEnabled()) {
StringBuilder message = new StringBuilder(
"Trying to contact region server for regionName '" +
m.getRegionName() + "', but failed after " + (tries + 1) +
" attempts.\n");
int i = 1;
for (IOException e2 : exceptions) {
message.append("Exception " + i + ":\n" + e2);
}
LOG.debug(message);
}
this.master.checkFileSystem();
throw e;
}
if (LOG.isDebugEnabled()) {
exceptions.add(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
master.sleeper.sleep();
}
return null;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Sleeper;
/**
* Abstract base class for operations that need to examine all HRegionInfo
@ -53,9 +54,11 @@ abstract class TableOperation implements HConstants {
protected Set<HRegionInfo> unservedRegions;
protected HMaster master;
protected final int numRetries;
protected final Sleeper sleeper;
protected TableOperation(final HMaster master, final Text tableName)
throws IOException {
this.sleeper = master.sleeper;
this.numRetries = master.numRetries;
this.master = master;
@ -78,93 +81,85 @@ abstract class TableOperation implements HConstants {
this.metaRegions = master.regionManager.getMetaRegionsForTable(tableName);
}
void process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
private class ProcessTableOperation extends RetryableMetaOperation<Boolean> {
ProcessTableOperation(MetaRegion m, HMaster master) {
super(m, master);
}
/** {@inheritDoc} */
public Boolean call() throws IOException {
boolean tableExists = false;
// Open a scanner on the meta region
long scannerId = server.openScanner(m.getRegionName(),
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
List<Text> emptyRows = new ArrayList<Text>();
try {
// Prevent meta scanner from running
synchronized(master.regionManager.metaScannerThread.scannerLock) {
for (MetaRegion m: metaRegions) {
// Get a connection to a meta server
HRegionInterface server =
master.connection.getHRegionConnection(m.getServer());
while (true) {
RowResult values = server.next(scannerId);
if(values == null || values.size() == 0) {
break;
}
HRegionInfo info = this.master.getHRegionInfo(values.getRow(), values);
if (info == null) {
emptyRows.add(values.getRow());
throw new IOException(COL_REGIONINFO + " not found on " +
values.getRow());
}
String serverName = Writables.cellToString(values.get(COL_SERVER));
long startCode = Writables.cellToLong(values.get(COL_STARTCODE));
if (info.getTableDesc().getName().compareTo(tableName) > 0) {
break; // Beyond any more entries for this table
}
// Open a scanner on the meta region
long scannerId =
server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY,
tableName, System.currentTimeMillis(), null);
List<Text> emptyRows = new ArrayList<Text>();
try {
while (true) {
RowResult values = server.next(scannerId);
if(values == null || values.size() == 0) {
break;
}
HRegionInfo info =
this.master.getHRegionInfo(values.getRow(), values);
if (info == null) {
emptyRows.add(values.getRow());
throw new IOException(COL_REGIONINFO + " not found on " +
values.getRow());
}
String serverName =
Writables.cellToString(values.get(COL_SERVER));
long startCode =
Writables.cellToLong(values.get(COL_STARTCODE));
if (info.getTableDesc().getName().compareTo(tableName) > 0) {
break; // Beyond any more entries for this table
}
tableExists = true;
if (!isBeingServed(serverName, startCode)) {
unservedRegions.add(info);
}
processScanItem(serverName, startCode, info);
} // while(true)
} finally {
if (scannerId != -1L) {
try {
server.close(scannerId);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("closing scanner", e);
}
}
scannerId = -1L;
}
// Get rid of any rows that have a null HRegionInfo
if (emptyRows.size() > 0) {
LOG.warn("Found " + emptyRows.size() +
" rows with empty HRegionInfo while scanning meta region " +
m.getRegionName());
master.deleteEmptyMetaRows(server, m.getRegionName(), emptyRows);
}
if (!tableExists) {
throw new IOException(tableName + " does not exist");
}
postProcessMeta(m, server);
unservedRegions.clear();
} // for(MetaRegion m:)
} // synchronized(metaScannerLock)
} catch (IOException e) {
if (tries == numRetries - 1) {
// No retries left
this.master.checkFileSystem();
throw RemoteExceptionHandler.checkIOException(e);
tableExists = true;
if (!isBeingServed(serverName, startCode)) {
unservedRegions.add(info);
}
processScanItem(serverName, startCode, info);
}
continue;
} finally {
if (scannerId != -1L) {
try {
server.close(scannerId);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("closing scanner", e);
}
}
scannerId = -1L;
}
break;
} // for(tries...)
// Get rid of any rows that have a null HRegionInfo
if (emptyRows.size() > 0) {
LOG.warn("Found " + emptyRows.size() +
" rows with empty HRegionInfo while scanning meta region " +
m.getRegionName());
master.deleteEmptyMetaRows(server, m.getRegionName(), emptyRows);
}
if (!tableExists) {
throw new IOException(tableName + " does not exist");
}
postProcessMeta(m, server);
unservedRegions.clear();
return true;
}
}
void process() throws IOException {
// Prevent meta scanner from running
synchronized(master.regionManager.metaScannerThread.scannerLock) {
for (MetaRegion m: metaRegions) {
new ProcessTableOperation(m, master).doWithRetries();
}
}
}
protected boolean isBeingServed(String serverName, long startCode) {
boolean result = false;
if (serverName != null && serverName.length() > 0 && startCode != -1L) {