HBASE-12561 Replicas of regions can be cached from different instances of the table in MetaCache
This commit is contained in:
parent
6a4bca86e2
commit
e405017a31
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase;
|
|||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
|
@ -200,9 +201,15 @@ public class RegionLocations {
|
|||
// in case of region replication going down, we might have a leak here.
|
||||
int max = other.locations.length;
|
||||
|
||||
HRegionInfo regionInfo = null;
|
||||
for (int i = 0; i < max; i++) {
|
||||
HRegionLocation thisLoc = this.getRegionLocation(i);
|
||||
HRegionLocation otherLoc = other.getRegionLocation(i);
|
||||
if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
|
||||
// regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
|
||||
// all replica region infos belong to the same region with same region id.
|
||||
regionInfo = otherLoc.getRegionInfo();
|
||||
}
|
||||
|
||||
HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
|
||||
otherLoc, true, false);
|
||||
|
@ -218,6 +225,18 @@ public class RegionLocations {
|
|||
}
|
||||
}
|
||||
|
||||
// ensure that all replicas share the same start code. Otherwise delete them
|
||||
if (newLocations != null && regionInfo != null) {
|
||||
for (int i=0; i < newLocations.length; i++) {
|
||||
if (newLocations[i] != null) {
|
||||
if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
|
||||
newLocations[i].getRegionInfo())) {
|
||||
newLocations[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return newLocations == null ? this : new RegionLocations(newLocations);
|
||||
}
|
||||
|
||||
|
@ -264,6 +283,15 @@ public class RegionLocations {
|
|||
HRegionLocation[] newLocations = new HRegionLocation[Math.max(locations.length, replicaId +1)];
|
||||
System.arraycopy(locations, 0, newLocations, 0, locations.length);
|
||||
newLocations[replicaId] = location;
|
||||
// ensure that all replicas share the same start code. Otherwise delete them
|
||||
for (int i=0; i < newLocations.length; i++) {
|
||||
if (newLocations[i] != null) {
|
||||
if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
|
||||
newLocations[i].getRegionInfo())) {
|
||||
newLocations[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new RegionLocations(newLocations);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Iterator;
|
|||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Utility methods which contain the logic for regions and replicas.
|
||||
|
@ -92,4 +93,46 @@ public class RegionReplicaUtil {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isReplicasForSameRegion(HRegionInfo regionInfoA, HRegionInfo regionInfoB) {
|
||||
return compareRegionInfosWithoutReplicaId(regionInfoA, regionInfoB) == 0;
|
||||
}
|
||||
|
||||
private static int compareRegionInfosWithoutReplicaId(HRegionInfo regionInfoA,
|
||||
HRegionInfo regionInfoB) {
|
||||
int result = regionInfoA.getTable().compareTo(regionInfoB.getTable());
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Compare start keys.
|
||||
result = Bytes.compareTo(regionInfoA.getStartKey(), regionInfoB.getStartKey());
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Compare end keys.
|
||||
result = Bytes.compareTo(regionInfoA.getEndKey(), regionInfoB.getEndKey());
|
||||
|
||||
if (result != 0) {
|
||||
if (regionInfoA.getStartKey().length != 0
|
||||
&& regionInfoA.getEndKey().length == 0) {
|
||||
return 1; // this is last region
|
||||
}
|
||||
if (regionInfoB.getStartKey().length != 0
|
||||
&& regionInfoB.getEndKey().length == 0) {
|
||||
return -1; // o is the last region
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// regionId is usually milli timestamp -- this defines older stamps
|
||||
// to be "smaller" than newer stamps in sort order.
|
||||
if (regionInfoA.getRegionId() > regionInfoB.getRegionId()) {
|
||||
return 1;
|
||||
} else if (regionInfoA.getRegionId() < regionInfoB.getRegionId()) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -41,6 +42,9 @@ public class TestRegionLocations {
|
|||
HRegionInfo info2 = hri(2);
|
||||
HRegionInfo info9 = hri(9);
|
||||
|
||||
long regionId1 = 1000;
|
||||
long regionId2 = 2000;
|
||||
|
||||
@Test
|
||||
public void testSizeMethods() {
|
||||
RegionLocations list = new RegionLocations();
|
||||
|
@ -72,10 +76,13 @@ public class TestRegionLocations {
|
|||
}
|
||||
|
||||
private HRegionInfo hri(int replicaId) {
|
||||
return hri(regionId1, replicaId);
|
||||
}
|
||||
|
||||
private HRegionInfo hri(long regionId, int replicaId) {
|
||||
TableName table = TableName.valueOf("table");
|
||||
byte[] startKey = HConstants.EMPTY_START_ROW;
|
||||
byte[] endKey = HConstants.EMPTY_END_ROW;
|
||||
long regionId = System.currentTimeMillis();
|
||||
HRegionInfo info = new HRegionInfo(table, startKey, endKey, false, regionId, replicaId);
|
||||
return info;
|
||||
}
|
||||
|
@ -276,6 +283,54 @@ public class TestRegionLocations {
|
|||
assertEquals(sn3, list1.getRegionLocation(9).getServerName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeLocationsWithDifferentRegionId() {
|
||||
RegionLocations list1, list2;
|
||||
|
||||
// test merging two lists. But the list2 contains region replicas with a different region id
|
||||
HRegionInfo info0 = hri(regionId1, 0);
|
||||
HRegionInfo info1 = hri(regionId1, 1);
|
||||
HRegionInfo info2 = hri(regionId2, 2);
|
||||
|
||||
list1 = hrll(hrl(info2, sn1));
|
||||
list2 = hrll(hrl(info0, sn2), hrl(info1, sn2));
|
||||
list1 = list2.mergeLocations(list1);
|
||||
assertNull(list1.getRegionLocation(0));
|
||||
assertNull(list1.getRegionLocation(1));
|
||||
assertNotNull(list1.getRegionLocation(2));
|
||||
assertEquals(sn1, list1.getRegionLocation(2).getServerName());
|
||||
assertEquals(3, list1.size());
|
||||
|
||||
// try the other way merge
|
||||
list1 = hrll(hrl(info2, sn1));
|
||||
list2 = hrll(hrl(info0, sn2), hrl(info1, sn2));
|
||||
list2 = list1.mergeLocations(list2);
|
||||
assertNotNull(list2.getRegionLocation(0));
|
||||
assertNotNull(list2.getRegionLocation(1));
|
||||
assertNull(list2.getRegionLocation(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateLocationWithDifferentRegionId() {
|
||||
RegionLocations list;
|
||||
|
||||
HRegionInfo info0 = hri(regionId1, 0);
|
||||
HRegionInfo info1 = hri(regionId2, 1);
|
||||
HRegionInfo info2 = hri(regionId1, 2);
|
||||
|
||||
list = new RegionLocations(hrl(info0, sn1), hrl(info2, sn1));
|
||||
|
||||
list = list.updateLocation(hrl(info1, sn2), false, true); // force update
|
||||
|
||||
// the other locations should be removed now
|
||||
assertNull(list.getRegionLocation(0));
|
||||
assertNotNull(list.getRegionLocation(1));
|
||||
assertNull(list.getRegionLocation(2));
|
||||
assertEquals(sn2, list.getRegionLocation(1).getServerName());
|
||||
assertEquals(3, list.size());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConstructWithNullElements() {
|
||||
// RegionLocations can contain null elements as well. These null elements can
|
||||
|
|
|
@ -3550,6 +3550,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
}
|
||||
|
||||
public void setReadsEnabled(boolean readsEnabled) {
|
||||
if (readsEnabled && !this.writestate.readsEnabled) {
|
||||
LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
|
||||
}
|
||||
this.writestate.setReadsEnabled(readsEnabled);
|
||||
}
|
||||
|
||||
|
|
|
@ -192,7 +192,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
keepAliveTime,
|
||||
TimeUnit.SECONDS,
|
||||
workQueue,
|
||||
Threads.newDaemonThreadFactory(this.getClass().toString() + "-rpc-shared-"));
|
||||
Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
|
||||
tpe.allowCoreThreadTimeOut(true);
|
||||
return tpe;
|
||||
}
|
||||
|
@ -342,6 +342,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
|
||||
+ " is cached as a disabled or dropped table");
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
sink.getSkippedEditsCounter().addAndGet(entries.size());
|
||||
return;
|
||||
|
@ -365,6 +368,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
|
||||
+ " is dropped. Adding table to cache.");
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
|
||||
// skip this entry
|
||||
|
@ -383,9 +389,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
||||
+ " because located region region " + primaryLocation.getRegionInfo().getEncodedName()
|
||||
+ " because located region " + primaryLocation.getRegionInfo().getEncodedName()
|
||||
+ " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
|
||||
+ " from WALEdit");
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
sink.getSkippedEditsCounter().addAndGet(entries.size());
|
||||
return;
|
||||
|
@ -438,6 +447,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
||||
+ " because received exception for dropped or disabled table", cause);
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
|
||||
if (!tasksCancelled) {
|
||||
|
@ -518,9 +530,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
skip = true;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
||||
+ " because located region region " + location.getRegionInfo().getEncodedName()
|
||||
+ " because located region " + location.getRegionInfo().getEncodedName()
|
||||
+ " is different than the original region "
|
||||
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue