HBASE-2421 Put hangs for 10 retries on failed region servers; forward-port from branch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@945395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
23ee15f04e
commit
afd149c7aa
|
@ -325,6 +325,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2382 Don't rely on fs.getDefaultReplication() to roll HLogs
|
HBASE-2382 Don't rely on fs.getDefaultReplication() to roll HLogs
|
||||||
(Nicolas Spiegelberg via Stack)
|
(Nicolas Spiegelberg via Stack)
|
||||||
HBASE-2415 Disable META splitting in 0.20 (Todd Lipcon via Stack)
|
HBASE-2415 Disable META splitting in 0.20 (Todd Lipcon via Stack)
|
||||||
|
HBASE-2421 Put hangs for 10 retries on failed region servers
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -192,7 +192,7 @@ public interface HConnection {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
* @throws RuntimeException other unspecified error
|
* @throws RuntimeException other unspecified error
|
||||||
*/
|
*/
|
||||||
public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
|
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||||
throws IOException, RuntimeException;
|
throws IOException, RuntimeException;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -680,7 +680,7 @@ public class HConnectionManager implements HConstants {
|
||||||
// This block guards against two threads trying to load the meta
|
// This block guards against two threads trying to load the meta
|
||||||
// region at the same time. The first will load the meta region and
|
// region at the same time. The first will load the meta region and
|
||||||
// the second will use the value that the first one found.
|
// the second will use the value that the first one found.
|
||||||
synchronized(regionLockObject) {
|
synchronized (regionLockObject) {
|
||||||
// Check the cache again for a hit in case some other thread made the
|
// Check the cache again for a hit in case some other thread made the
|
||||||
// same query while we were waiting on the lock. If not supposed to
|
// same query while we were waiting on the lock. If not supposed to
|
||||||
// be using the cache, delete any existing cached location so it won't
|
// be using the cache, delete any existing cached location so it won't
|
||||||
|
@ -1077,15 +1077,19 @@ public class HConnectionManager implements HConstants {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
|
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||||
throws IOException, RuntimeException {
|
throws IOException, RuntimeException {
|
||||||
try {
|
try {
|
||||||
callable.instantiateServer(false);
|
callable.instantiateServer(false);
|
||||||
return callable.call();
|
return callable.call();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
t = translateException(t);
|
Throwable t2 = translateException(t);
|
||||||
|
if (t2 instanceof IOException) {
|
||||||
|
throw (IOException)t2;
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(t2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"ConstantConditions"})
|
@SuppressWarnings({"ConstantConditions"})
|
||||||
|
@ -1299,9 +1303,25 @@ public class HConnectionManager implements HConstants {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"ConstantConditions"})
|
/**
|
||||||
|
* Process a batch of Puts on the given executor service.
|
||||||
|
*
|
||||||
|
* @param list the puts to make - successful puts will be removed.
|
||||||
|
* @param pool thread pool to execute requests on
|
||||||
|
*
|
||||||
|
* In the case of an exception, we take different actions depending on the
|
||||||
|
* situation:
|
||||||
|
* - If the exception is a DoNotRetryException, we rethrow it and leave the
|
||||||
|
* 'list' parameter in an indeterminate state.
|
||||||
|
* - If the 'list' parameter is a singleton, we directly throw the specific
|
||||||
|
* exception for that put.
|
||||||
|
* - Otherwise, we throw a generic exception indicating that an error occurred.
|
||||||
|
* The 'list' parameter is mutated to contain those puts that did not succeed.
|
||||||
|
*/
|
||||||
public void processBatchOfPuts(List<Put> list,
|
public void processBatchOfPuts(List<Put> list,
|
||||||
final byte[] tableName, ExecutorService pool) throws IOException {
|
final byte[] tableName, ExecutorService pool) throws IOException {
|
||||||
|
boolean singletonList = list.size() == 1;
|
||||||
|
Throwable singleRowCause = null;
|
||||||
for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
|
for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
|
||||||
Collections.sort(list);
|
Collections.sort(list);
|
||||||
Map<HServerAddress, MultiPut> regionPuts =
|
Map<HServerAddress, MultiPut> regionPuts =
|
||||||
|
@ -1367,10 +1387,19 @@ public class HConnectionManager implements HConstants {
|
||||||
LOG.debug("Failed all from " + request.address, e);
|
LOG.debug("Failed all from " + request.address, e);
|
||||||
failed.addAll(request.allPuts());
|
failed.addAll(request.allPuts());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
System.out.println(e);
|
|
||||||
// all go into the failed list.
|
// all go into the failed list.
|
||||||
LOG.debug("Failed all from " + request.address, e);
|
LOG.debug("Failed all from " + request.address, e);
|
||||||
failed.addAll(request.allPuts());
|
failed.addAll(request.allPuts());
|
||||||
|
|
||||||
|
// Just give up, leaving the batch put list in an untouched/semi-committed state
|
||||||
|
if (e.getCause() instanceof DoNotRetryIOException) {
|
||||||
|
throw (DoNotRetryIOException) e.getCause();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (singletonList) {
|
||||||
|
// be richer for reporting in a 1 row case.
|
||||||
|
singleRowCause = e.getCause();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
list.clear();
|
list.clear();
|
||||||
|
@ -1391,9 +1420,13 @@ public class HConnectionManager implements HConstants {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!list.isEmpty()) {
|
if (!list.isEmpty()) {
|
||||||
|
if (singletonList && singleRowCause != null) {
|
||||||
|
throw new IOException(singleRowCause);
|
||||||
|
}
|
||||||
|
|
||||||
// ran out of retries and didnt succeed everything!
|
// ran out of retries and didnt succeed everything!
|
||||||
throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
|
throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
|
||||||
numRetries + " times. Should have detail on which Regions failed the most");
|
numRetries + " times.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1404,7 +1437,7 @@ public class HConnectionManager implements HConstants {
|
||||||
final HConnection connection = this;
|
final HConnection connection = this;
|
||||||
return new Callable<MultiPutResponse>() {
|
return new Callable<MultiPutResponse>() {
|
||||||
public MultiPutResponse call() throws IOException {
|
public MultiPutResponse call() throws IOException {
|
||||||
return getRegionServerWithRetries(
|
return getRegionServerWithoutRetries(
|
||||||
new ServerCallable<MultiPutResponse>(connection, tableName, null) {
|
new ServerCallable<MultiPutResponse>(connection, tableName, null) {
|
||||||
public MultiPutResponse call() throws IOException {
|
public MultiPutResponse call() throws IOException {
|
||||||
MultiPutResponse resp = server.multiPut(puts);
|
MultiPutResponse resp = server.multiPut(puts);
|
||||||
|
|
|
@ -542,8 +542,8 @@ public class HTable implements HTableInterface {
|
||||||
} finally {
|
} finally {
|
||||||
// the write buffer was adjusted by processBatchOfPuts
|
// the write buffer was adjusted by processBatchOfPuts
|
||||||
currentWriteBufferSize = 0;
|
currentWriteBufferSize = 0;
|
||||||
for (Put aWriteBuffer : writeBuffer) {
|
for (Put aPut : writeBuffer) {
|
||||||
currentWriteBufferSize += aWriteBuffer.heapSize();
|
currentWriteBufferSize += aPut.heapSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||||
|
|
||||||
// Run HDFS shutdown on exit if this is set. We clear this out when
|
// Run HDFS shutdown on exit if this is set. We clear this out when
|
||||||
// doing a restart() to prevent closing of HDFS.
|
// doing a restart() to prevent closing of HDFS.
|
||||||
private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true);
|
public final AtomicBoolean shutdownHDFS = new AtomicBoolean(true);
|
||||||
|
|
||||||
private final String machineName;
|
private final String machineName;
|
||||||
|
|
||||||
|
|
|
@ -20,15 +20,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class TestMultiParallelPut extends MultiRegionTable {
|
public class TestMultiParallelPut extends MultiRegionTable {
|
||||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||||
|
@ -58,7 +58,14 @@ public class TestMultiParallelPut extends MultiRegionTable {
|
||||||
|
|
||||||
List<byte[]> keys = new ArrayList<byte[]>();
|
List<byte[]> keys = new ArrayList<byte[]>();
|
||||||
|
|
||||||
public void testMultiPut() throws Exception {
|
public void testParallelPut() throws Exception {
|
||||||
|
doATest(false);
|
||||||
|
}
|
||||||
|
public void testParallelPutWithRSAbort() throws Exception {
|
||||||
|
doATest(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doATest(boolean doAbort) throws Exception {
|
||||||
|
|
||||||
HTable table = new HTable(TEST_TABLE);
|
HTable table = new HTable(TEST_TABLE);
|
||||||
table.setAutoFlush(false);
|
table.setAutoFlush(false);
|
||||||
|
@ -73,6 +80,19 @@ public class TestMultiParallelPut extends MultiRegionTable {
|
||||||
|
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
|
|
||||||
|
if (doAbort) {
|
||||||
|
cluster.abortRegionServer(0);
|
||||||
|
|
||||||
|
// try putting more keys after the abort.
|
||||||
|
for ( byte [] k : keys ) {
|
||||||
|
Put put = new Put(k);
|
||||||
|
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
|
||||||
|
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
table.flushCommits();
|
||||||
|
}
|
||||||
|
|
||||||
for (byte [] k : keys ) {
|
for (byte [] k : keys ) {
|
||||||
Get get = new Get(k);
|
Get get = new Get(k);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
|
@ -88,10 +108,15 @@ public class TestMultiParallelPut extends MultiRegionTable {
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
ClusterStatus cs = admin.getClusterStatus();
|
ClusterStatus cs = admin.getClusterStatus();
|
||||||
|
|
||||||
assertEquals(2, cs.getServers());
|
int expectedServerCount = 2;
|
||||||
|
if (doAbort)
|
||||||
|
expectedServerCount = 1;
|
||||||
|
|
||||||
|
assertEquals(expectedServerCount, cs.getServers());
|
||||||
for ( HServerInfo info : cs.getServerInfo()) {
|
for ( HServerInfo info : cs.getServerInfo()) {
|
||||||
System.out.println(info);
|
System.out.println(info);
|
||||||
assertTrue( info.getLoad().getNumberOfRegions() > 10);
|
assertTrue( info.getLoad().getNumberOfRegions() > 10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue