HBASE-11719 Remove some unused paths in AsyncClient
This commit is contained in:
parent
8b80819a6f
commit
2b9123f938
|
@ -583,8 +583,8 @@ class AsyncProcess {
|
||||||
addReplicaActions(i, actionsByServer, unknownLocActions);
|
addReplicaActions(i, actionsByServer, unknownLocActions);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < replicaGetIndices.length; ++i) {
|
for (int replicaGetIndice : replicaGetIndices) {
|
||||||
addReplicaActions(replicaGetIndices[i], actionsByServer, unknownLocActions);
|
addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!actionsByServer.isEmpty()) {
|
if (!actionsByServer.isEmpty()) {
|
||||||
|
@ -1001,8 +1001,6 @@ class AsyncProcess {
|
||||||
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
|
||||||
|
|
||||||
int failed = 0, stopped = 0;
|
int failed = 0, stopped = 0;
|
||||||
boolean isReplica = false;
|
|
||||||
boolean firstAction = false;
|
|
||||||
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
|
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
|
||||||
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
|
||||||
byte[] regionName = e.getKey();
|
byte[] regionName = e.getKey();
|
||||||
|
@ -1012,10 +1010,6 @@ class AsyncProcess {
|
||||||
// TODO: depending on type of exception we might not want to update cache at all?
|
// TODO: depending on type of exception we might not want to update cache at all?
|
||||||
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
|
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
|
||||||
for (Action<Row> action : e.getValue()) {
|
for (Action<Row> action : e.getValue()) {
|
||||||
if (firstAction) {
|
|
||||||
firstAction = false;
|
|
||||||
isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
|
|
||||||
}
|
|
||||||
Retry retry = manageError(
|
Retry retry = manageError(
|
||||||
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
|
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
|
||||||
if (retry == Retry.YES) {
|
if (retry == Retry.YES) {
|
||||||
|
@ -1029,19 +1023,18 @@ class AsyncProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toReplay.isEmpty()) {
|
if (toReplay.isEmpty()) {
|
||||||
logNoResubmit(server, numAttempt, rsActions.size(), t, isReplica, failed, stopped);
|
logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
|
||||||
} else {
|
} else {
|
||||||
resubmit(server, toReplay, numAttempt, rsActions.size(), t, isReplica);
|
resubmit(server, toReplay, numAttempt, rsActions.size(), t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log as much info as possible, and, if there is something to replay,
|
* Log as much info as possible, and, if there is something to replay,
|
||||||
* submit it again after a back off sleep.
|
* submit it again after a back off sleep.
|
||||||
* @param isReplica
|
|
||||||
*/
|
*/
|
||||||
private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
|
private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
|
||||||
int numAttempt, int failureCount, Throwable throwable, boolean isReplica) {
|
int numAttempt, int failureCount, Throwable throwable) {
|
||||||
// We have something to replay. We're going to sleep a little before.
|
// We have something to replay. We're going to sleep a little before.
|
||||||
|
|
||||||
// We have two contradicting needs here:
|
// We have two contradicting needs here:
|
||||||
|
@ -1054,7 +1047,7 @@ class AsyncProcess {
|
||||||
// We use this value to have some logs when we have multiple failures, but not too many
|
// We use this value to have some logs when we have multiple failures, but not too many
|
||||||
// logs, as errors are to be expected when a region moves, splits and so on
|
// logs, as errors are to be expected when a region moves, splits and so on
|
||||||
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
|
||||||
oldServer, throwable, backOffTime, true, null, isReplica, -1, -1));
|
oldServer, throwable, backOffTime, true, null, -1, -1));
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -1069,11 +1062,11 @@ class AsyncProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logNoResubmit(ServerName oldServer, int numAttempt,
|
private void logNoResubmit(ServerName oldServer, int numAttempt,
|
||||||
int failureCount, Throwable throwable, boolean isReplica, int failed, int stopped) {
|
int failureCount, Throwable throwable, int failed, int stopped) {
|
||||||
if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
|
if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
|
||||||
String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
|
String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
|
||||||
String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
|
String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
|
||||||
throwable, -1, false, timeStr, isReplica, failed, stopped);
|
throwable, -1, false, timeStr, failed, stopped);
|
||||||
if (failed != 0) {
|
if (failed != 0) {
|
||||||
// Only log final failures as warning
|
// Only log final failures as warning
|
||||||
LOG.warn(logMessage);
|
LOG.warn(logMessage);
|
||||||
|
@ -1108,8 +1101,6 @@ class AsyncProcess {
|
||||||
|
|
||||||
// Go by original action.
|
// Go by original action.
|
||||||
int failed = 0, stopped = 0;
|
int failed = 0, stopped = 0;
|
||||||
boolean isReplica = false;
|
|
||||||
boolean firstAction = false;
|
|
||||||
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
|
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
|
||||||
byte[] regionName = regionEntry.getKey();
|
byte[] regionName = regionEntry.getKey();
|
||||||
Map<Integer, Object> regionResults = responses.getResults().get(regionName);
|
Map<Integer, Object> regionResults = responses.getResults().get(regionName);
|
||||||
|
@ -1123,10 +1114,6 @@ class AsyncProcess {
|
||||||
}
|
}
|
||||||
boolean regionFailureRegistered = false;
|
boolean regionFailureRegistered = false;
|
||||||
for (Action<Row> sentAction : regionEntry.getValue()) {
|
for (Action<Row> sentAction : regionEntry.getValue()) {
|
||||||
if (firstAction) {
|
|
||||||
firstAction = false;
|
|
||||||
isReplica = !RegionReplicaUtil.isDefaultReplica(sentAction.getReplicaId());
|
|
||||||
}
|
|
||||||
Object result = regionResults.get(sentAction.getOriginalIndex());
|
Object result = regionResults.get(sentAction.getOriginalIndex());
|
||||||
// Failure: retry if it's make sense else update the errors lists
|
// Failure: retry if it's make sense else update the errors lists
|
||||||
if (result == null || result instanceof Throwable) {
|
if (result == null || result instanceof Throwable) {
|
||||||
|
@ -1188,10 +1175,6 @@ class AsyncProcess {
|
||||||
failureCount += actions.size();
|
failureCount += actions.size();
|
||||||
|
|
||||||
for (Action<Row> action : actions) {
|
for (Action<Row> action : actions) {
|
||||||
if (firstAction) {
|
|
||||||
firstAction = false;
|
|
||||||
isReplica = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
|
|
||||||
}
|
|
||||||
Row row = action.getAction();
|
Row row = action.getAction();
|
||||||
Retry retry = manageError(action.getOriginalIndex(), row,
|
Retry retry = manageError(action.getOriginalIndex(), row,
|
||||||
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
|
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
|
||||||
|
@ -1206,18 +1189,18 @@ class AsyncProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toReplay.isEmpty()) {
|
if (toReplay.isEmpty()) {
|
||||||
logNoResubmit(server, numAttempt, failureCount, throwable, isReplica, failed, stopped);
|
logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
|
||||||
} else {
|
} else {
|
||||||
resubmit(server, toReplay, numAttempt, failureCount, throwable, isReplica);
|
resubmit(server, toReplay, numAttempt, failureCount, throwable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
|
private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
|
||||||
Throwable error, long backOffTime, boolean willRetry, String startTime,
|
Throwable error, long backOffTime, boolean willRetry, String startTime,
|
||||||
boolean isReplica, int failed, int stopped) {
|
int failed, int stopped) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("#").append(id).append(", table=").append(tableName).append(", ")
|
sb.append("#").append(id).append(", table=").append(tableName).append(", ")
|
||||||
.append(isReplica ? "replica, " : "primary, ").append("attempt=").append(numAttempt)
|
.append("attempt=").append(numAttempt)
|
||||||
.append("/").append(numTries).append(" ");
|
.append("/").append(numTries).append(" ");
|
||||||
|
|
||||||
if (failureCount > 0 || error != null){
|
if (failureCount > 0 || error != null){
|
||||||
|
|
Loading…
Reference in New Issue