HBASE-12958 SSH doing hbase:meta get but hbase:meta not assigned
This commit is contained in:
parent
8aeb3acaf9
commit
96cdc7987e
|
@ -202,6 +202,7 @@ public class MetaTableAccessor {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static Result get(final Table t, final Get g) throws IOException {
|
private static Result get(final Table t, final Get g) throws IOException {
|
||||||
|
if (t == null) return null;
|
||||||
try {
|
try {
|
||||||
return t.get(g);
|
return t.get(g);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -571,70 +571,87 @@ public class RegionStates {
|
||||||
/**
|
/**
|
||||||
* A server is offline, all regions on it are dead.
|
* A server is offline, all regions on it are dead.
|
||||||
*/
|
*/
|
||||||
public synchronized List<HRegionInfo> serverOffline(final ServerName sn) {
|
public List<HRegionInfo> serverOffline(final ServerName sn) {
|
||||||
// Offline all regions on this server not already in transition.
|
// Offline all regions on this server not already in transition.
|
||||||
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
|
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
|
||||||
Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
|
Set<HRegionInfo> regionsToCleanIfNoMetaEntry = new HashSet<HRegionInfo>();
|
||||||
if (assignedRegions == null) {
|
synchronized (this) {
|
||||||
assignedRegions = new HashSet<HRegionInfo>();
|
Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
|
||||||
}
|
if (assignedRegions == null) {
|
||||||
|
assignedRegions = new HashSet<HRegionInfo>();
|
||||||
// Offline regions outside the loop to avoid ConcurrentModificationException
|
|
||||||
Set<HRegionInfo> regionsToOffline = new HashSet<HRegionInfo>();
|
|
||||||
for (HRegionInfo region : assignedRegions) {
|
|
||||||
// Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE
|
|
||||||
if (isRegionOnline(region)) {
|
|
||||||
regionsToOffline.add(region);
|
|
||||||
} else if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
|
|
||||||
LOG.debug("Offline splitting/merging region " + getRegionState(region));
|
|
||||||
regionsToOffline.add(region);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for (RegionState state : regionsInTransition.values()) {
|
// Offline regions outside the loop to avoid ConcurrentModificationException
|
||||||
HRegionInfo hri = state.getRegion();
|
Set<HRegionInfo> regionsToOffline = new HashSet<HRegionInfo>();
|
||||||
if (assignedRegions.contains(hri)) {
|
for (HRegionInfo region : assignedRegions) {
|
||||||
// Region is open on this region server, but in transition.
|
// Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE
|
||||||
// This region must be moving away from this server, or splitting/merging.
|
if (isRegionOnline(region)) {
|
||||||
// SSH will handle it, either skip assigning, or re-assign.
|
regionsToOffline.add(region);
|
||||||
LOG.info("Transitioning " + state + " will be handled by SSH for " + sn);
|
} else if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
|
||||||
} else if (sn.equals(state.getServerName())) {
|
LOG.debug("Offline splitting/merging region " + getRegionState(region));
|
||||||
// Region is in transition on this region server, and this
|
regionsToOffline.add(region);
|
||||||
// region is not open on this server. So the region must be
|
|
||||||
// moving to this server from another one (i.e. opening or
|
|
||||||
// pending open on this server, was open on another one.
|
|
||||||
// Offline state is also kind of pending open if the region is in
|
|
||||||
// transition. The region could be in failed_close state too if we have
|
|
||||||
// tried several times to open it while this region server is not reachable)
|
|
||||||
if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN,
|
|
||||||
State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
|
|
||||||
LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn);
|
|
||||||
rits.add(hri);
|
|
||||||
} else if (isOneOfStates(state, State.SPLITTING_NEW)) {
|
|
||||||
try {
|
|
||||||
if (MetaTableAccessor.getRegion(server.getConnection(), state.getRegion()
|
|
||||||
.getEncodedNameAsBytes()) == null) {
|
|
||||||
regionsToOffline.add(state.getRegion());
|
|
||||||
FSUtils.deleteRegionDir(server.getConfiguration(), state.getRegion());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Got exception while deleting " + state.getRegion()
|
|
||||||
+ " directories from file system.", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for (HRegionInfo hri : regionsToOffline) {
|
for (RegionState state : regionsInTransition.values()) {
|
||||||
regionOffline(hri);
|
HRegionInfo hri = state.getRegion();
|
||||||
}
|
if (assignedRegions.contains(hri)) {
|
||||||
|
// Region is open on this region server, but in transition.
|
||||||
|
// This region must be moving away from this server, or splitting/merging.
|
||||||
|
// SSH will handle it, either skip assigning, or re-assign.
|
||||||
|
LOG.info("Transitioning " + state + " will be handled by SSH for " + sn);
|
||||||
|
} else if (sn.equals(state.getServerName())) {
|
||||||
|
// Region is in transition on this region server, and this
|
||||||
|
// region is not open on this server. So the region must be
|
||||||
|
// moving to this server from another one (i.e. opening or
|
||||||
|
// pending open on this server, was open on another one.
|
||||||
|
// Offline state is also kind of pending open if the region is in
|
||||||
|
// transition. The region could be in failed_close state too if we have
|
||||||
|
// tried several times to open it while this region server is not reachable)
|
||||||
|
if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN,
|
||||||
|
State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
|
||||||
|
LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn);
|
||||||
|
rits.add(hri);
|
||||||
|
} else if (isOneOfStates(state, State.SPLITTING_NEW)) {
|
||||||
|
regionsToCleanIfNoMetaEntry.add(state.getRegion());
|
||||||
|
} else {
|
||||||
|
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.notifyAll();
|
for (HRegionInfo hri : regionsToOffline) {
|
||||||
|
regionOffline(hri);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.notifyAll();
|
||||||
|
}
|
||||||
|
cleanIfNoMetaEntry(regionsToCleanIfNoMetaEntry);
|
||||||
return rits;
|
return rits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method does an RPC to hbase:meta. Do not call this method with a lock/synchronize held.
|
||||||
|
* @param hris The hris to check if empty in hbase:meta and if so, clean them up.
|
||||||
|
*/
|
||||||
|
private void cleanIfNoMetaEntry(Set<HRegionInfo> hris) {
|
||||||
|
if (hris.isEmpty()) return;
|
||||||
|
for (HRegionInfo hri: hris) {
|
||||||
|
try {
|
||||||
|
// This is RPC to meta table. It is done while we have a synchronize on
|
||||||
|
// regionstates. No progress will be made if meta is not available at this time.
|
||||||
|
// This is a cleanup task. Not critical.
|
||||||
|
if (MetaTableAccessor.getRegion(server.getConnection(), hri.getEncodedNameAsBytes()) ==
|
||||||
|
null) {
|
||||||
|
regionOffline(hri);
|
||||||
|
FSUtils.deleteRegionDir(server.getConfiguration(), hri);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Got exception while deleting " + hri + " directories from file system.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the online regions of the specified table.
|
* Gets the online regions of the specified table.
|
||||||
* This method looks at the in-memory state. It does not go to <code>hbase:meta</code>.
|
* This method looks at the in-memory state. It does not go to <code>hbase:meta</code>.
|
||||||
|
@ -1000,7 +1017,8 @@ public class RegionStates {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the HRegionInfo from cache, if not there, from the hbase:meta table
|
* Get the HRegionInfo from cache, if not there, from the hbase:meta table.
|
||||||
|
* Be careful. Does RPC. Do not hold a lock or synchronize when you call this method.
|
||||||
* @param regionName
|
* @param regionName
|
||||||
* @return HRegionInfo for the region
|
* @return HRegionInfo for the region
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -22,6 +22,12 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -31,7 +37,15 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static junit.framework.Assert.assertFalse;
|
import static junit.framework.Assert.assertFalse;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -39,7 +53,51 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
public class TestRegionStates {
|
public class TestRegionStates {
|
||||||
|
@Test (timeout=10000)
|
||||||
|
public void testCanMakeProgressThoughMetaIsDown()
|
||||||
|
throws IOException, InterruptedException, BrokenBarrierException {
|
||||||
|
Server server = mock(Server.class);
|
||||||
|
when(server.getServerName()).thenReturn(ServerName.valueOf("master,1,1"));
|
||||||
|
Connection connection = mock(ClusterConnection.class);
|
||||||
|
// Set up a table that gets 'stuck' when we try to fetch a row from the meta table.
|
||||||
|
// It is stuck on a CyclicBarrier latch. We use CyclicBarrier because it will tell us when
|
||||||
|
// thread is waiting on latch.
|
||||||
|
Table metaTable = Mockito.mock(Table.class);
|
||||||
|
final CyclicBarrier latch = new CyclicBarrier(2);
|
||||||
|
when(metaTable.get((Get)Mockito.any())).thenAnswer(new Answer<Result>() {
|
||||||
|
@Override
|
||||||
|
public Result answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
latch.await();
|
||||||
|
throw new java.net.ConnectException("Connection refused");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
when(connection.getTable(TableName.META_TABLE_NAME)).thenReturn(metaTable);
|
||||||
|
when(server.getConnection()).thenReturn((ClusterConnection)connection);
|
||||||
|
Configuration configuration = mock(Configuration.class);
|
||||||
|
when(server.getConfiguration()).thenReturn(configuration);
|
||||||
|
TableStateManager tsm = mock(TableStateManager.class);
|
||||||
|
ServerManager sm = mock(ServerManager.class);
|
||||||
|
when(sm.isServerOnline(isA(ServerName.class))).thenReturn(true);
|
||||||
|
|
||||||
|
RegionStateStore rss = mock(RegionStateStore.class);
|
||||||
|
final RegionStates regionStates = new RegionStates(server, tsm, sm, rss);
|
||||||
|
final ServerName sn = mockServer("one", 1);
|
||||||
|
regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.SPLITTING_NEW, sn);
|
||||||
|
Thread backgroundThread = new Thread("Get stuck setting server offline") {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
regionStates.serverOffline(sn);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assertTrue(latch.getNumberWaiting() == 0);
|
||||||
|
backgroundThread.start();
|
||||||
|
while (latch.getNumberWaiting() == 0);
|
||||||
|
// Verify I can do stuff with synchronized RegionStates methods, that I am not locked out.
|
||||||
|
// Below is a call that is synchronized. Can I do it and not block?
|
||||||
|
regionStates.getRegionServerOfRegion(HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
|
// Done. Trip the barrier on the background thread.
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWeDontReturnDrainingServersForOurBalancePlans() throws Exception {
|
public void testWeDontReturnDrainingServersForOurBalancePlans() throws Exception {
|
||||||
|
@ -84,4 +142,4 @@ public class TestRegionStates {
|
||||||
when(serverName.getPort()).thenReturn(fakePort);
|
when(serverName.getPort()).thenReturn(fakePort);
|
||||||
return serverName;
|
return serverName;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue