HBASE-538 Improve exceptions that come out on client-side

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@656341 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-05-14 18:07:03 +00:00
parent 8d5e1e7025
commit 595b75b105
9 changed files with 395 additions and 287 deletions

View File

@ -53,6 +53,7 @@ Hbase Change Log
HBASE-611 regionserver should do basic health check before reporting
alls-well to the master
HBASE-614 Retiring regions is not used; exploit or remove
HBASE-538 Improve exceptions that come out on client-side
Release 0.1.1 - 04/11/2008

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.SortedMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
@ -69,6 +68,7 @@ public interface HConnection {
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the reigon in
* question
* @throws IOException
*/
public HRegionLocation locateRegion(Text tableName, Text row)
throws IOException;
@ -80,6 +80,7 @@ public interface HConnection {
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the reigon in
* question
* @throws IOException
*/
public HRegionLocation relocateRegion(Text tableName, Text row)
throws IOException;
@ -92,4 +93,29 @@ public interface HConnection {
*/
public HRegionInterface getHRegionConnection(HServerAddress regionServer)
throws IOException;
}
/**
* Find region location hosting passed row
* @param tableName
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException
*/
HRegionLocation getRegionLocation(Text tableName, Text row, boolean reload)
throws IOException;
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will manage the process of doing retries with timed waits
* and refinds of missing regions.
*
* @param <T> the type of the return value
* @param callable
* @return an object of type T
* @throws IOException
* @throws RuntimeException
*/
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
}

View File

@ -20,9 +20,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -239,63 +242,47 @@ public class HConnectionManager implements HConstants {
return exists;
}
/** {@inheritDoc} */
public HRegionLocation getRegionLocation(Text tableName, Text row,
boolean reload) throws IOException {
return reload ?
relocateRegion(tableName, row) :
locateRegion(tableName, row);
}
/** {@inheritDoc} */
public HTableDescriptor[] listTables() throws IOException {
HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
long scannerId = -1L;
HRegionInterface server = null;
Text startRow = EMPTY_START_ROW;
HRegionLocation metaLocation = null;
// scan over the each meta region
do {
for (int triesSoFar = 0; triesSoFar < numRetries; triesSoFar++) {
try{
// turn the start row into a location
metaLocation = locateRegion(META_TABLE_NAME, startRow);
// connect to the server hosting the .META. region
server = getHRegionConnection(metaLocation.getServerAddress());
// open a scanner over the meta region
scannerId = server.openScanner(
metaLocation.getRegionInfo().getRegionName(),
new Text[]{COL_REGIONINFO}, startRow, LATEST_TIMESTAMP, null);
// iterate through the scanner, accumulating unique table names
while (true) {
RowResult values = server.next(scannerId);
if (values == null || values.size() == 0) {
break;
}
HRegionInfo info =
Writables.getHRegionInfo(values.get(COL_REGIONINFO));
// Only examine the rows where the startKey is zero length
if (info.getStartKey().getLength() == 0) {
uniqueTables.add(info.getTableDesc());
}
ScannerCallable callable = new ScannerCallable(this, META_TABLE_NAME,
COL_REGIONINFO_ARRAY, startRow, LATEST_TIMESTAMP, null);
try {
// open scanner
getRegionServerWithRetries(callable);
// iterate through the scanner, accumulating unique table names
while (true) {
RowResult values = getRegionServerWithRetries(callable);
if (values == null || values.size() == 0) {
break;
}
server.close(scannerId);
scannerId = -1L;
// advance the startRow to the end key of the current region
startRow = metaLocation.getRegionInfo().getEndKey();
// break out of retry loop
break;
} catch (IOException e) {
// Retry once.
metaLocation = relocateRegion(META_TABLE_NAME, startRow);
continue;
}
finally {
if (scannerId != -1L && server != null) {
server.close(scannerId);
HRegionInfo info =
Writables.getHRegionInfo(values.get(COL_REGIONINFO));
// Only examine the rows where the startKey is zero length
if (info.getStartKey().getLength() == 0) {
uniqueTables.add(info.getTableDesc());
}
}
// advance the startRow to the end key of the current region
startRow = callable.getHRegionInfo().getEndKey();
} finally {
// close scanner
callable.setClose();
getRegionServerWithRetries(callable);
}
} while (startRow.compareTo(LAST_ROW) != 0);
@ -723,5 +710,38 @@ public class HConnectionManager implements HConstants {
return new HRegionLocation(
HRegionInfo.rootRegionInfo, rootRegionAddress);
}
/** {@inheritDoc} */
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
List<Throwable> exceptions = new ArrayList<Throwable>();
for(int tries = 0; tries < numRetries; tries++) {
try {
callable.instantiateServer(tries != 0);
return callable.call();
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof RemoteException) {
t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t);
}
exceptions.add(t);
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries, exceptions);
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + t.getMessage());
}
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
return null;
}
}
}

View File

@ -25,8 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,7 +33,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.StopRowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
@ -45,20 +42,18 @@ import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
/**
* Used to communicate with a single HBase table
*/
public class HTable implements HConstants {
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
protected final Log LOG = LogFactory.getLog(this.getClass());
protected final HConnection connection;
protected final Text tableName;
protected final long pause;
protected final int numRetries;
protected Random rand;
protected AtomicReference<BatchUpdate> batch;
protected volatile boolean tableDoesNotExist;
@ -77,7 +72,6 @@ public class HTable implements HConstants {
this.pause = conf.getLong("hbase.client.pause", 10 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
this.rand = new Random();
this.batch = new AtomicReference<BatchUpdate>();
this.connection.locateRegion(tableName, EMPTY_START_ROW);
}
@ -88,19 +82,7 @@ public class HTable implements HConstants {
* @throws IOException
*/
public HRegionLocation getRegionLocation(Text row) throws IOException {
return this.connection.locateRegion(this.tableName, row);
}
/**
* Find region location hosting passed row
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
*/
HRegionLocation getRegionLocation(Text row, boolean reload) throws IOException {
return reload?
this.connection.relocateRegion(this.tableName, row):
this.connection.locateRegion(tableName, row);
return connection.getRegionLocation(tableName, row, false);
}
@ -109,36 +91,6 @@ public class HTable implements HConstants {
return connection;
}
/**
* Verifies that no update is in progress
*/
public synchronized void checkUpdateInProgress() {
updateInProgress(false);
}
/*
* Checks to see if an update is in progress
*
* @param updateMustBeInProgress
* If true, an update must be in progress. An IllegalStateException will be
* thrown if not.
*
* If false, an update must not be in progress. An IllegalStateException
* will be thrown if an update is in progress.
*/
private void updateInProgress(boolean updateMustBeInProgress) {
if (updateMustBeInProgress) {
if (batch.get() == null) {
throw new IllegalStateException("no update in progress");
}
} else {
if (batch.get() != null) {
throw new IllegalStateException("update in progress");
}
}
}
/** @return the table name */
public Text getTableName() {
return this.tableName;
@ -241,11 +193,14 @@ public class HTable implements HConstants {
* @throws IOException
*/
public Cell get(final Text row, final Text column) throws IOException {
return getRegionServerWithRetries(new ServerCallable<Cell>(row){
public Cell call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row, column);
}
});
return connection.getRegionServerWithRetries(
new ServerCallable<Cell>(connection, tableName, row) {
public Cell call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row,
column);
}
}
);
}
/**
@ -261,12 +216,14 @@ public class HTable implements HConstants {
throws IOException {
Cell[] values = null;
values = getRegionServerWithRetries(new ServerCallable<Cell[]>(row) {
public Cell[] call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row,
column, numVersions);
}
});
values = connection.getRegionServerWithRetries(
new ServerCallable<Cell[]>(connection, tableName, row) {
public Cell[] call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row,
column, numVersions);
}
}
);
if (values != null) {
ArrayList<Cell> cellValues = new ArrayList<Cell>();
@ -294,12 +251,14 @@ public class HTable implements HConstants {
throws IOException {
Cell[] values = null;
values = getRegionServerWithRetries(new ServerCallable<Cell[]>(row) {
public Cell[] call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row,
column, timestamp, numVersions);
}
});
values = connection.getRegionServerWithRetries(
new ServerCallable<Cell[]>(connection, tableName, row) {
public Cell[] call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), row,
column, timestamp, numVersions);
}
}
);
if (values != null) {
ArrayList<Cell> cellValues = new ArrayList<Cell>();
@ -332,11 +291,14 @@ public class HTable implements HConstants {
*/
public Map<Text, Cell> getRow(final Text row, final long ts)
throws IOException {
return getRegionServerWithRetries(new ServerCallable<RowResult>(row) {
public RowResult call() throws IOException {
return server.getRow(location.getRegionInfo().getRegionName(), row, ts);
}
});
return connection.getRegionServerWithRetries(
new ServerCallable<RowResult>(connection, tableName, row) {
public RowResult call() throws IOException {
return server.getRow(location.getRegionInfo().getRegionName(), row,
ts);
}
}
);
}
/**
@ -364,12 +326,14 @@ public class HTable implements HConstants {
public Map<Text, Cell> getRow(final Text row, final Text[] columns,
final long ts)
throws IOException {
return getRegionServerWithRetries(new ServerCallable<RowResult>(row) {
public RowResult call() throws IOException {
return server.getRow(location.getRegionInfo().getRegionName(), row,
columns, ts);
}
});
return connection.getRegionServerWithRetries(
new ServerCallable<RowResult>(connection, tableName, row) {
public RowResult call() throws IOException {
return server.getRow(location.getRegionInfo().getRegionName(), row,
columns, ts);
}
}
);
}
/**
@ -520,13 +484,15 @@ public class HTable implements HConstants {
*/
public void deleteAll(final Text row, final Text column, final long ts)
throws IOException {
getRegionServerWithRetries(new ServerCallable<Boolean>(row) {
public Boolean call() throws IOException {
server.deleteAll(location.getRegionInfo().getRegionName(), row,
column, ts);
return null;
}
});
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row) {
public Boolean call() throws IOException {
server.deleteAll(location.getRegionInfo().getRegionName(), row,
column, ts);
return null;
}
}
);
}
/**
@ -537,12 +503,14 @@ public class HTable implements HConstants {
* @throws IOException
*/
public void deleteAll(final Text row, final long ts) throws IOException {
getRegionServerWithRetries(new ServerCallable<Boolean>(row){
public Boolean call() throws IOException {
server.deleteAll(location.getRegionInfo().getRegionName(), row, ts);
return null;
}
});
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row) {
public Boolean call() throws IOException {
server.deleteAll(location.getRegionInfo().getRegionName(), row, ts);
return null;
}
}
);
}
/**
@ -567,13 +535,15 @@ public class HTable implements HConstants {
public void deleteFamily(final Text row, final Text family,
final long timestamp)
throws IOException {
getRegionServerWithRetries(new ServerCallable<Boolean>(row){
public Boolean call() throws IOException {
server.deleteFamily(location.getRegionInfo().getRegionName(), row,
family, timestamp);
return null;
}
});
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row) {
public Boolean call() throws IOException {
server.deleteFamily(location.getRegionInfo().getRegionName(), row,
family, timestamp);
return null;
}
}
);
}
/**
@ -594,8 +564,8 @@ public class HTable implements HConstants {
*/
public synchronized void commit(final BatchUpdate batchUpdate)
throws IOException {
getRegionServerWithRetries(
new ServerCallable<Boolean>(batchUpdate.getRow()){
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
public Boolean call() throws IOException {
server.batchUpdate(location.getRegionInfo().getRegionName(),
batchUpdate);
@ -610,28 +580,25 @@ public class HTable implements HConstants {
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
protected class ClientScanner implements Scanner {
private Text[] columns;
private class ClientScanner implements Scanner {
protected Text[] columns;
private Text startRow;
private long scanTime;
protected long scanTime;
@SuppressWarnings("hiding")
private boolean closed;
private HRegionLocation currentRegionLocation;
private HRegionInterface server;
private long scannerId;
private RowFilterInterface filter;
private boolean closed = false;
private HRegionInfo currentRegion = null;
private ScannerCallable callable = null;
protected RowFilterInterface filter;
protected ClientScanner(Text[] columns, Text startRow, long timestamp,
RowFilterInterface filter)
throws IOException {
LOG.debug("Creating scanner over " + tableName + " starting at key " + startRow);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating scanner over " + tableName + " starting at key '" +
startRow + "'");
}
// defaults
this.closed = false;
this.server = null;
this.scannerId = -1L;
// save off the simple parameters
this.columns = columns;
this.startRow = startRow;
@ -653,74 +620,42 @@ public class HTable implements HConstants {
*/
private boolean nextScanner() throws IOException {
// close the previous scanner if it's open
if (this.scannerId != -1L) {
this.server.close(this.scannerId);
this.scannerId = -1L;
if (this.callable != null) {
this.callable.setClose();
connection.getRegionServerWithRetries(callable);
this.callable = null;
}
// if we're at the end of the table, then close and return false
// to stop iterating
if (currentRegionLocation != null){
LOG.debug("Advancing forward from region "
+ currentRegionLocation.getRegionInfo());
if (currentRegion != null){
if (LOG.isDebugEnabled()) {
LOG.debug("Advancing forward from region " + currentRegion);
}
Text endKey = currentRegionLocation.getRegionInfo().getEndKey();
Text endKey = currentRegion.getEndKey();
if (endKey == null || endKey.equals(EMPTY_TEXT)) {
close();
return false;
}
}
HRegionLocation oldLocation = this.currentRegionLocation;
Text localStartKey = oldLocation == null ?
startRow : oldLocation.getRegionInfo().getEndKey();
HRegionInfo oldRegion = this.currentRegion;
Text localStartKey = oldRegion == null ? startRow : oldRegion.getEndKey();
// advance to the region that starts with the current region's end key
currentRegionLocation = getRegionLocation(localStartKey);
LOG.debug("Advancing internal scanner to startKey " + localStartKey
+ ", new region: " + currentRegionLocation);
if (LOG.isDebugEnabled()) {
LOG.debug("Advancing internal scanner to startKey " + localStartKey);
}
try {
for (int tries = 0; tries < numRetries; tries++) {
// connect to the server
server = connection.getHRegionConnection(
this.currentRegionLocation.getServerAddress());
try {
// open a scanner on the region server starting at the
// beginning of the region
scannerId = server.openScanner(
this.currentRegionLocation.getRegionInfo().getRegionName(),
this.columns, localStartKey, scanTime, filter);
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
}
if (tries == numRetries - 1) {
// No more tries
throw e;
}
try {
Thread.sleep(pause);
} catch (InterruptedException ie) {
// continue
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
currentRegionLocation = getRegionLocation(localStartKey, true);
}
}
callable = new ScannerCallable(connection, tableName, columns,
localStartKey, scanTime, filter);
// open a scanner on the region server starting at the
// beginning of the region
connection.getRegionServerWithRetries(callable);
currentRegion = callable.getHRegionInfo();
} catch (IOException e) {
close();
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
throw e;
}
return true;
@ -734,7 +669,7 @@ public class HTable implements HConstants {
RowResult values = null;
do {
values = server.next(scannerId);
values = connection.getRegionServerWithRetries(callable);
} while (values != null && values.size() == 0 && nextScanner());
if (values != null && values.size() != 0) {
@ -748,18 +683,18 @@ public class HTable implements HConstants {
* {@inheritDoc}
*/
public void close() {
if (scannerId != -1L) {
if (callable != null) {
callable.setClose();
try {
server.close(scannerId);
connection.getRegionServerWithRetries(callable);
} catch (IOException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
// throw exceptions. Chances are it was just an UnknownScanner
// exception due to lease time out.
}
scannerId = -1L;
callable = null;
}
server = null;
closed = true;
}
@ -808,58 +743,4 @@ public class HTable implements HConstants {
};
}
}
/**
* Inherits from Callable, used to define the particular actions you would
* like to take with retry logic.
*/
protected abstract class ServerCallable<T> implements Callable<T> {
HRegionLocation location;
HRegionInterface server;
Text row;
protected ServerCallable(Text row) {
this.row = row;
}
void instantiateServer(boolean reload) throws IOException {
this.location = getRegionLocation(row, reload);
this.server = connection.getHRegionConnection(location.getServerAddress());
}
}
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will manage the process of doing retries with timed waits
* and refinds of missing regions.
*/
protected <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
List<Exception> exceptions = new ArrayList<Exception>();
for(int tries = 0; tries < numRetries; tries++) {
try {
callable.instantiateServer(tries != 0);
return callable.call();
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.row, tries, exceptions);
}
exceptions.add(e);
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
return null;
}
}

View File

@ -23,7 +23,8 @@ import java.io.IOException;
/** Thrown when a table can not be located */
public class RegionOfflineException extends IOException {
/** default constructor */
private static final long serialVersionUID = 466008402L;
/** default constructor */
public RegionOfflineException() {
super();
}

View File

@ -24,25 +24,36 @@ import org.apache.hadoop.io.Text;
* commit changes) fails after a bunch of retries.
*/
public class RetriesExhaustedException extends IOException {
private static final long serialVersionUID = 1876775844L;
/**
* Create a new RetriesExhaustedException from the list of prior failures.
* @param serverName name of HRegionServer
* @param regionName name of region
* @param row The row we were pursuing when we ran out of retries
* @param numTries The number of tries we made
* @param exceptions List of exceptions that failed before giving up
*/
public RetriesExhaustedException(Text row, int numTries,
List<Exception> exceptions) {
super(getMessage(row, numTries, exceptions));
public RetriesExhaustedException(String serverName, Text regionName, Text row,
int numTries, List<Throwable> exceptions) {
super(getMessage(serverName, regionName, row, numTries, exceptions));
}
private static String getMessage(Text row, int numTries,
List<Exception> exceptions) {
String buffer = "Trying to contact region server for row '" +
row + "', but failed after " + (numTries + 1) + " attempts.\nExceptions:\n";
private static String getMessage(String serverName, Text regionName, Text row,
int numTries, List<Throwable> exceptions) {
StringBuilder buffer = new StringBuilder("Trying to contact region server ");
buffer.append(serverName);
buffer.append(" for region ");
buffer.append(regionName);
buffer.append(", row '");
buffer.append(row);
buffer.append("', but failed after ");
buffer.append(numTries + 1);
buffer.append(" attempts.\nExceptions:\n");
for (Exception e : exceptions) {
buffer += e.toString() + "\n";
for (Throwable t : exceptions) {
buffer.append(t.toString());
buffer.append("\n");
}
return buffer;
return buffer.toString();
}
}

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.SortedMap;
import java.util.Iterator;
import org.apache.hadoop.hbase.io.RowResult;
/**

View File

@ -0,0 +1,92 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.io.Text;
/**
* Retryable scanner
*/
public class ScannerCallable extends ServerCallable<RowResult> {
private long scannerId = -1L;
private boolean instantiated = false;
private boolean closed = false;
private final Text[] columns;
private final long timestamp;
private final RowFilterInterface filter;
ScannerCallable (HConnection connection, Text tableName, Text[] columns,
Text startRow, long timestamp, RowFilterInterface filter) {
super(connection, tableName, startRow);
this.columns = columns;
this.timestamp = timestamp;
this.filter = filter;
}
/**
* @param reload
* @throws IOException
*/
@Override
public void instantiateServer(boolean reload) throws IOException {
if (!instantiated || reload) {
super.instantiateServer(reload);
instantiated = true;
}
}
/** {@inheritDoc} */
public RowResult call() throws IOException {
if (scannerId != -1L && closed) {
server.close(scannerId);
scannerId = -1L;
} else if (scannerId == -1L && !closed) {
// open the scanner
scannerId = server.openScanner(
this.location.getRegionInfo().getRegionName(), columns, row,
timestamp, filter);
} else {
return server.next(scannerId);
}
return null;
}
/**
* Call this when the next invocation of call should close the scanner
*/
public void setClose() {
closed = true;
}
/**
* @return the HRegionInfo for the current region
*/
public HRegionInfo getHRegionInfo() {
if (!instantiated) {
return null;
}
return location.getRegionInfo();
}
}

View File

@ -0,0 +1,78 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.io.Text;
/**
* Implements Callable, used to define the particular actions you would
* like to take with retry logic.
* @param <T> the class that the ServerCallable handles
*
*/
public abstract class ServerCallable<T> implements Callable<T> {
protected final HConnection connection;
protected final Text tableName;
protected final Text row;
protected HRegionLocation location;
protected HRegionInterface server;
/**
* @param connection
* @param tableName
* @param row
*/
public ServerCallable(HConnection connection, Text tableName, Text row) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
}
/**
*
* @param reload set this to true if connection should re-find the region
* @throws IOException
*/
public void instantiateServer(boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload);
this.server = connection.getHRegionConnection(location.getServerAddress());
}
/** @return the server name */
public String getServerName() {
return location.getServerAddress().toString();
}
/** @return the region name */
public Text getRegionName() {
return location.getRegionInfo().getRegionName();
}
/** @return the row */
public Text getRow() {
return row;
}
}