HADOOP-2555 Refactor the HTable#get and HTable#getRow methods to avoid repetition of retry-on-failure logic (thanks to Peter Dolan and Bryan Duxbury)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk/src/contrib/hbase@617670 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1edb70b86
commit
5a033b71f4
|
@ -29,6 +29,7 @@ import java.util.SortedMap;
|
|||
import java.util.TreeMap;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -275,38 +276,15 @@ public class HTable implements HConstants {
|
|||
* @return value for specified row/column
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[] get(Text row, Text column) throws IOException {
|
||||
checkClosed();
|
||||
byte [] value = null;
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
value = server.get(r.getRegionInfo().getRegionName(), row, column);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
public byte[] get(Text row, final Text column) throws IOException {
|
||||
checkClosed();
|
||||
|
||||
return getRegionServerWithRetries(new ServerCallable<byte[]>(row){
|
||||
public byte[] call() throws IOException {
|
||||
return server.get(location.getRegionInfo().getRegionName(), row, column);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the specified number of versions of the specified row and column
|
||||
|
@ -317,39 +295,17 @@ public class HTable implements HConstants {
|
|||
* @return - array byte values
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
|
||||
public byte[][] get(final Text row, final Text column, final int numVersions)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
byte [][] values = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
||||
numVersions);
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
|
||||
public byte [][] call() throws IOException {
|
||||
return server.get(location.getRegionInfo().getRegionName(), row,
|
||||
column, numVersions);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (values != null) {
|
||||
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
||||
|
@ -372,40 +328,18 @@ public class HTable implements HConstants {
|
|||
* @return - array of values that match the above criteria
|
||||
* @throws IOException
|
||||
*/
|
||||
public byte[][] get(Text row, Text column, long timestamp, int numVersions)
|
||||
public byte[][] get(final Text row, final Text column, final long timestamp,
|
||||
final int numVersions)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
byte [][] values = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
||||
timestamp, numVersions);
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
|
||||
public byte [][] call() throws IOException {
|
||||
return server.get(location.getRegionInfo().getRegionName(), row,
|
||||
column, timestamp, numVersions);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (values != null) {
|
||||
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
||||
|
@ -436,37 +370,17 @@ public class HTable implements HConstants {
|
|||
* @return Map of columns to values. Map is empty if row does not exist.
|
||||
* @throws IOException
|
||||
*/
|
||||
public SortedMap<Text, byte[]> getRow(Text row, long ts) throws IOException {
|
||||
public SortedMap<Text, byte[]> getRow(final Text row, final long ts)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
HbaseMapWritable value = null;
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
|
||||
try {
|
||||
value = server.getRow(r.getRegionInfo().getRegionName(), row, ts);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
// No more tries
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
value = getRegionServerWithRetries(new ServerCallable<HbaseMapWritable>(row) {
|
||||
public HbaseMapWritable call() throws IOException {
|
||||
return server.getRow(location.getRegionInfo().getRegionName(), row, ts);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
if (value != null && value.size() != 0) {
|
||||
for (Map.Entry<Writable, Writable> e: value.entrySet()) {
|
||||
|
@ -722,32 +636,14 @@ public class HTable implements HConstants {
|
|||
public void deleteAll(final Text row, final Text column, final long ts)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
try {
|
||||
server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
getRegionServerWithRetries(new ServerCallable<Boolean>(row) {
|
||||
public Boolean call() throws IOException {
|
||||
server.deleteAll(location.getRegionInfo().getRegionName(), row,
|
||||
column, ts);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -757,34 +653,15 @@ public class HTable implements HConstants {
|
|||
* @param ts Timestamp of cells to delete
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteAll(final Text row, long ts) throws IOException {
|
||||
public void deleteAll(final Text row, final long ts) throws IOException {
|
||||
checkClosed();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
try {
|
||||
server.deleteAll(r.getRegionInfo().getRegionName(), row, ts);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
getRegionServerWithRetries(new ServerCallable<Boolean>(row){
|
||||
public Boolean call() throws IOException {
|
||||
server.deleteAll(location.getRegionInfo().getRegionName(), row, ts);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -806,35 +683,18 @@ public class HTable implements HConstants {
|
|||
* @param timestamp Timestamp to match
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteFamily(final Text row, final Text family, long timestamp)
|
||||
public void deleteFamily(final Text row, final Text family,
|
||||
final long timestamp)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(row);
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
try {
|
||||
server.deleteFamily(r.getRegionInfo().getRegionName(), row, family, timestamp);
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(row, true);
|
||||
|
||||
getRegionServerWithRetries(new ServerCallable<Boolean>(row){
|
||||
public Boolean call() throws IOException {
|
||||
server.deleteFamily(location.getRegionInfo().getRegionName(), row,
|
||||
family, timestamp);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
} catch (InterruptedException x) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -891,7 +751,7 @@ public class HTable implements HConstants {
|
|||
* @param timestamp time to associate with the change
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void commit(long lockid, long timestamp)
|
||||
public synchronized void commit(long lockid, final long timestamp)
|
||||
throws IOException {
|
||||
checkClosed();
|
||||
updateInProgress(true);
|
||||
|
@ -900,34 +760,15 @@ public class HTable implements HConstants {
|
|||
}
|
||||
|
||||
try {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = getRegionLocation(batch.get().getRow());
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
try {
|
||||
server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp,
|
||||
batch.get());
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
}
|
||||
if (tries < numRetries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
r = getRegionLocation(batch.get().getRow(), true);
|
||||
} else {
|
||||
throw e;
|
||||
getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(batch.get().getRow()){
|
||||
public Boolean call() throws IOException {
|
||||
server.batchUpdate(location.getRegionInfo().getRegionName(),
|
||||
timestamp, batch.get());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
batch.set(null);
|
||||
}
|
||||
|
@ -1150,4 +991,56 @@ public class HTable implements HConstants {
|
|||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inherits from Callable, used to define the particular actions you would
|
||||
* like to take with retry logic.
|
||||
*/
|
||||
protected abstract class ServerCallable<T> implements Callable<T> {
|
||||
HRegionLocation location;
|
||||
HRegionInterface server;
|
||||
Text row;
|
||||
|
||||
protected ServerCallable(Text row) {
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
void instantiateServer(boolean reload) throws IOException {
|
||||
this.location = getRegionLocation(row, reload);
|
||||
this.server = connection.getHRegionConnection(location.getServerAddress());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass in a ServerCallable with your particular bit of logic defined and
|
||||
* this method will manage the process of doing retries with timed waits
|
||||
* and refinds of missing regions.
|
||||
*/
|
||||
protected <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException {
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
callable.instantiateServer(tries != 0);
|
||||
return callable.call();
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries == numRetries - 1) {
|
||||
throw e;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue