HBASE-9988 DOn't use HRI#getEncodedName in the client
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1543427 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5a6a0fd78b
commit
b2582665f1
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.cloudera.htrace.Trace;
|
||||
|
@ -95,13 +97,12 @@ class AsyncProcess<CResult> {
|
|||
protected final ExecutorService pool;
|
||||
protected final AsyncProcessCallback<CResult> callback;
|
||||
protected final BatchErrors errors = new BatchErrors();
|
||||
protected final BatchErrors retriedErrors = new BatchErrors();
|
||||
protected final AtomicBoolean hasError = new AtomicBoolean(false);
|
||||
protected final AtomicLong tasksSent = new AtomicLong(0);
|
||||
protected final AtomicLong tasksDone = new AtomicLong(0);
|
||||
protected final AtomicLong retriesCnt = new AtomicLong(0);
|
||||
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
|
||||
new ConcurrentHashMap<String, AtomicInteger>();
|
||||
protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
|
||||
new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
|
||||
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
|
||||
new ConcurrentHashMap<ServerName, AtomicInteger>();
|
||||
|
||||
|
@ -290,7 +291,7 @@ class AsyncProcess<CResult> {
|
|||
|
||||
// Remember the previous decisions about regions or region servers we put in the
|
||||
// final multi.
|
||||
Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
|
||||
Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
|
||||
Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
|
||||
|
||||
int posInList = -1;
|
||||
|
@ -376,15 +377,14 @@ class AsyncProcess<CResult> {
|
|||
* We're taking into account the past decision; if we have already accepted
|
||||
* operation on a given region, we accept all operations for this region.
|
||||
*
|
||||
*
|
||||
* @param loc; the region and the server name we want to use.
|
||||
* @return true if this region is considered as busy.
|
||||
*/
|
||||
protected boolean canTakeOperation(HRegionLocation loc,
|
||||
Map<String, Boolean> regionsIncluded,
|
||||
Map<Long, Boolean> regionsIncluded,
|
||||
Map<ServerName, Boolean> serversIncluded) {
|
||||
String encodedRegionName = loc.getRegionInfo().getEncodedName();
|
||||
Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
|
||||
long regionId = loc.getRegionInfo().getRegionId();
|
||||
Boolean regionPrevious = regionsIncluded.get(regionId);
|
||||
|
||||
if (regionPrevious != null) {
|
||||
// We already know what to do with this region.
|
||||
|
@ -394,14 +394,14 @@ class AsyncProcess<CResult> {
|
|||
Boolean serverPrevious = serversIncluded.get(loc.getServerName());
|
||||
if (Boolean.FALSE.equals(serverPrevious)) {
|
||||
// It's a new region, on a region server that we have already excluded.
|
||||
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||
regionsIncluded.put(regionId, Boolean.FALSE);
|
||||
return false;
|
||||
}
|
||||
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
|
||||
if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
|
||||
// Too many tasks on this region already.
|
||||
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||
regionsIncluded.put(regionId, Boolean.FALSE);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -424,7 +424,7 @@ class AsyncProcess<CResult> {
|
|||
}
|
||||
|
||||
if (!ok) {
|
||||
regionsIncluded.put(encodedRegionName, Boolean.FALSE);
|
||||
regionsIncluded.put(regionId, Boolean.FALSE);
|
||||
serversIncluded.put(loc.getServerName(), Boolean.FALSE);
|
||||
return false;
|
||||
}
|
||||
|
@ -434,7 +434,7 @@ class AsyncProcess<CResult> {
|
|||
assert serverPrevious.equals(Boolean.TRUE);
|
||||
}
|
||||
|
||||
regionsIncluded.put(encodedRegionName, Boolean.TRUE);
|
||||
regionsIncluded.put(regionId, Boolean.TRUE);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -597,18 +597,18 @@ class AsyncProcess<CResult> {
|
|||
if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
|
||||
canRetry = false;
|
||||
}
|
||||
byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
|
||||
|
||||
byte[] region = null;
|
||||
if (canRetry && callback != null) {
|
||||
region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
|
||||
canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
|
||||
}
|
||||
|
||||
if (canRetry) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
retriedErrors.add(throwable, row, location);
|
||||
}
|
||||
} else {
|
||||
if (!canRetry) {
|
||||
if (callback != null) {
|
||||
if (region == null && location != null) {
|
||||
region = location.getRegionInfo().getEncodedNameAsBytes();
|
||||
}
|
||||
callback.failure(originalIndex, region, row, throwable);
|
||||
}
|
||||
errors.add(throwable, row, location);
|
||||
|
@ -890,7 +890,6 @@ class AsyncProcess<CResult> {
|
|||
*/
|
||||
public void clearErrors() {
|
||||
errors.clear();
|
||||
retriedErrors.clear();
|
||||
hasError.set(false);
|
||||
}
|
||||
|
||||
|
@ -912,11 +911,13 @@ class AsyncProcess<CResult> {
|
|||
serverCnt.incrementAndGet();
|
||||
|
||||
for (byte[] regBytes : regions) {
|
||||
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
|
||||
if (regionCnt == null) {
|
||||
taskCounterPerRegion.putIfAbsent(encodedRegionName, new AtomicInteger());
|
||||
regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||
regionCnt = new AtomicInteger();
|
||||
AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
|
||||
if (oldCnt != null) {
|
||||
regionCnt = oldCnt;
|
||||
}
|
||||
}
|
||||
regionCnt.incrementAndGet();
|
||||
}
|
||||
|
@ -927,8 +928,7 @@ class AsyncProcess<CResult> {
|
|||
*/
|
||||
protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
|
||||
for (byte[] regBytes : regions) {
|
||||
String encodedRegionName = HRegionInfo.encodeRegionName(regBytes);
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
|
||||
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
|
||||
regionCnt.decrementAndGet();
|
||||
}
|
||||
|
||||
|
|
|
@ -60,11 +60,12 @@ public class TestAsyncProcess {
|
|||
|
||||
private static ServerName sn = new ServerName("localhost:10,1254");
|
||||
private static ServerName sn2 = new ServerName("localhost:140,12540");
|
||||
private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
|
||||
private static HRegionInfo hri1 =
|
||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
|
||||
private static HRegionInfo hri2 =
|
||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
|
||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
|
||||
private static HRegionInfo hri3 =
|
||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW);
|
||||
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
|
||||
private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
|
||||
private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
|
||||
private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
|
||||
|
@ -264,7 +265,7 @@ public class TestAsyncProcess {
|
|||
puts.add(createPut(2, true)); // <== new region, but the rs is ok
|
||||
|
||||
ap.submit(puts, false);
|
||||
Assert.assertEquals(1, puts.size());
|
||||
Assert.assertEquals(" puts=" + puts, 1, puts.size());
|
||||
|
||||
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
|
||||
ap.submit(puts, false);
|
||||
|
@ -338,7 +339,7 @@ public class TestAsyncProcess {
|
|||
final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
|
||||
ap.tasksSent.incrementAndGet();
|
||||
final AtomicInteger ai = new AtomicInteger(1);
|
||||
ap.taskCounterPerRegion.put(hri1.getEncodedName(), ai);
|
||||
ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
|
||||
|
||||
final AtomicBoolean checkPoint = new AtomicBoolean(false);
|
||||
final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
|
||||
|
@ -716,7 +717,7 @@ public class TestAsyncProcess {
|
|||
List<Get> gets = new ArrayList<Get>(NB_REGS);
|
||||
for (int i = 0; i < NB_REGS; i++) {
|
||||
HRegionInfo hri = new HRegionInfo(
|
||||
DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L));
|
||||
DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
|
||||
HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
|
||||
hrls.add(hrl);
|
||||
|
||||
|
|
Loading…
Reference in New Issue