HBASE-22553 NPE in RegionReplicaReplicationEndpoint
This commit is contained in:
parent
6278c98f5d
commit
621dc88c79
|
@ -32,12 +32,15 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.TableDescriptors;
|
import org.apache.hadoop.hbase.TableDescriptors;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||||
|
@ -162,9 +165,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// check if the number of region replicas is correct, and also the primary region name
|
// check if the number of region replicas is correct, and also the primary region name
|
||||||
// matches, and also there is no null elements in the returned RegionLocations
|
// matches.
|
||||||
if (locs.size() == tableDesc.getRegionReplication() &&
|
if (locs.size() == tableDesc.getRegionReplication() &&
|
||||||
locs.size() == locs.numNonNullElements() &&
|
locs.getDefaultRegionLocation() != null &&
|
||||||
Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
|
Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
|
||||||
encodedRegionName)) {
|
encodedRegionName)) {
|
||||||
future.complete(locs);
|
future.complete(locs);
|
||||||
|
@ -182,8 +185,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
future.complete(Long.valueOf(entries.size()));
|
future.complete(Long.valueOf(entries.size()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
|
RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
|
||||||
encodedRegionName)) {
|
if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
|
||||||
// the region name is not equal, this usually means the region has been split or merged, so
|
// the region name is not equal, this usually means the region has been split or merged, so
|
||||||
// give up replicating as the new region(s) should already have all the data of the parent
|
// give up replicating as the new region(s) should already have all the data of the parent
|
||||||
// region(s).
|
// region(s).
|
||||||
|
@ -191,7 +194,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
LOG.trace(
|
LOG.trace(
|
||||||
"Skipping {} entries in table {} because located region {} is different than" +
|
"Skipping {} entries in table {} because located region {} is different than" +
|
||||||
" the original region {} from WALEdit",
|
" the original region {} from WALEdit",
|
||||||
tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
|
tableDesc.getTableName(), defaultReplica.getEncodedName(),
|
||||||
Bytes.toStringBinary(encodedRegionName));
|
Bytes.toStringBinary(encodedRegionName));
|
||||||
}
|
}
|
||||||
future.complete(Long.valueOf(entries.size()));
|
future.complete(Long.valueOf(entries.size()));
|
||||||
|
@ -202,12 +205,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
AtomicLong skippedEdits = new AtomicLong(0);
|
AtomicLong skippedEdits = new AtomicLong(0);
|
||||||
|
|
||||||
for (int i = 1, n = locs.size(); i < n; i++) {
|
for (int i = 1, n = locs.size(); i < n; i++) {
|
||||||
final int replicaId = i;
|
// Do not use the elements other than the default replica as they may be null. We will fail
|
||||||
FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
|
// earlier if the location for default replica is null.
|
||||||
locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
|
final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
|
||||||
replicaId, numRetries, operationTimeoutNs), (r, e) -> {
|
FutureUtils
|
||||||
|
.addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
|
||||||
|
row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
|
LOG.warn("Failed to replicate to {}", replica, e);
|
||||||
error.compareAndSet(null, e);
|
error.compareAndSet(null, e);
|
||||||
} else {
|
} else {
|
||||||
AtomicUtils.updateMax(skippedEdits, r.longValue());
|
AtomicUtils.updateMax(skippedEdits, r.longValue());
|
||||||
|
@ -245,6 +250,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
FutureUtils.addListener(locateFuture, (locs, error) -> {
|
FutureUtils.addListener(locateFuture, (locs, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
future.completeExceptionally(error);
|
future.completeExceptionally(error);
|
||||||
|
} else if (locs.getDefaultRegionLocation() == null) {
|
||||||
|
future.completeExceptionally(
|
||||||
|
new HBaseIOException("No location found for default replica of table=" +
|
||||||
|
tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
|
||||||
} else {
|
} else {
|
||||||
replicate(future, locs, tableDesc, encodedRegionName, row, entries);
|
replicate(future, locs, tableDesc, encodedRegionName, row, entries);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue