HBASE-22527 [hbck2] Add a master web ui to show the problematic regions
This commit is contained in:
parent
df2cdc8e5d
commit
727ebbddac
|
@ -17,27 +17,108 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
</%doc>
|
||||
<%import>
|
||||
org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
|
||||
org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
|
||||
org.apache.hadoop.hbase.master.RegionState;
|
||||
java.util.Map;
|
||||
java.util.Set;
|
||||
java.util.SortedSet;
|
||||
java.util.concurrent.atomic.AtomicInteger;
|
||||
java.util.stream.Collectors;
|
||||
org.apache.hadoop.conf.Configuration;
|
||||
org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
org.apache.hadoop.hbase.HConstants;
|
||||
org.apache.hadoop.hbase.ServerName;
|
||||
org.apache.hadoop.hbase.client.RegionInfo;
|
||||
org.apache.hadoop.hbase.client.RegionInfoDisplay;
|
||||
java.util.HashSet;
|
||||
java.util.SortedSet;
|
||||
java.util.Map;
|
||||
java.util.concurrent.atomic.AtomicInteger;
|
||||
org.apache.hadoop.hbase.master.RegionState;
|
||||
org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
|
||||
org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
|
||||
org.apache.hadoop.hbase.util.Pair;
|
||||
</%import>
|
||||
<%args>
|
||||
AssignmentManager assignmentManager;
|
||||
int limit = 100;
|
||||
</%args>
|
||||
|
||||
<%java SortedSet<RegionState> rit = assignmentManager
|
||||
.getRegionStates().getRegionsInTransitionOrderedByTimestamp();
|
||||
%>
|
||||
<%java>
|
||||
SortedSet<RegionState> rit = assignmentManager.getRegionStates()
|
||||
.getRegionsInTransitionOrderedByTimestamp();
|
||||
Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = assignmentManager
|
||||
.getProblematicRegions();
|
||||
</%java>
|
||||
|
||||
<%if !problematicRegions.isEmpty() %>
|
||||
<%java>
|
||||
int totalSize = problematicRegions.size();
|
||||
int sizePerPage = Math.min(10, totalSize);
|
||||
int numOfPages = (int) Math.ceil(totalSize * 1.0 / sizePerPage);
|
||||
</%java>
|
||||
<section>
|
||||
<h2><a name="problem-regions">Problematic Regions</a></h2>
|
||||
<p>
|
||||
<span>
|
||||
<% problematicRegions.size() %> problematic region(s). There are three case: 1. Master
|
||||
thought this region opened, but no regionserver reported it. 2. Master thought this
|
||||
region opened on Server1, but regionserver reported Server2. 3. More than one
|
||||
regionservers reported opened this region. Notice: the reported online regionservers
|
||||
may be not right when there are regions in transition. Please check them in
|
||||
regionserver's web UI.
|
||||
</span>
|
||||
</p>
|
||||
<div class="tabbable">
|
||||
<div class="tab-content">
|
||||
<%java int recordItr = 0; %>
|
||||
<%for Map.Entry<String, Pair<ServerName, Set<ServerName>>> entry : problematicRegions.entrySet() %>
|
||||
<%if (recordItr % sizePerPage) == 0 %>
|
||||
<%if recordItr == 0 %>
|
||||
<div class="tab-pane active" id="tab_prs<% (recordItr / sizePerPage) + 1 %>">
|
||||
<%else>
|
||||
<div class="tab-pane" id="tab_prs<% (recordItr / sizePerPage) + 1 %>">
|
||||
</%if>
|
||||
<table class="table table-striped" style="margin-bottom:0px;">
|
||||
<tr>
|
||||
<th>Region</th>
|
||||
<th>Location in META</th>
|
||||
<th>Reported Online Region Servers</th>
|
||||
</tr>
|
||||
</%if>
|
||||
|
||||
<tr>
|
||||
<td><% entry.getKey() %></td>
|
||||
<td><% entry.getValue().getFirst() %></td>
|
||||
<td><% entry.getValue().getSecond().stream().map(ServerName::getServerName)
|
||||
.collect(Collectors.joining(", ")) %></td>
|
||||
</tr>
|
||||
<%java recordItr++; %>
|
||||
<%if (recordItr % sizePerPage) == 0 %>
|
||||
</table>
|
||||
</div>
|
||||
</%if>
|
||||
</%for>
|
||||
|
||||
<%if (recordItr % sizePerPage) != 0 %>
|
||||
<%for ; (recordItr % sizePerPage) != 0 ; recordItr++ %>
|
||||
<tr><td colspan="3" style="height:61px"></td></tr>
|
||||
</%for>
|
||||
</table>
|
||||
</div>
|
||||
</%if>
|
||||
|
||||
</div>
|
||||
<nav>
|
||||
<ul class="nav nav-pills pagination">
|
||||
<%for int i = 1 ; i <= numOfPages; i++ %>
|
||||
<%if i == 1 %>
|
||||
<li class="active">
|
||||
<%else>
|
||||
<li>
|
||||
</%if>
|
||||
<a href="#tab_prs<% i %>"><% i %></a></li>
|
||||
</%for>
|
||||
</ul>
|
||||
</nav>
|
||||
</div>
|
||||
</section>
|
||||
</%if>
|
||||
|
||||
<%if !rit.isEmpty() %>
|
||||
<%java>
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -156,6 +157,8 @@ public class AssignmentManager implements ServerListener {
|
|||
private final RegionStates regionStates = new RegionStates();
|
||||
private final RegionStateStore regionStateStore;
|
||||
|
||||
private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
|
||||
|
||||
private final boolean shouldAssignRegionsWithFavoredNodes;
|
||||
private final int assignDispatchWaitQueueMaxSize;
|
||||
private final int assignDispatchWaitMillis;
|
||||
|
@ -962,6 +965,11 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
}
|
||||
|
||||
// Track the regionserver reported online regions in memory.
|
||||
synchronized (rsReports) {
|
||||
rsReports.put(serverName, regionNames);
|
||||
}
|
||||
|
||||
if (regionNames.isEmpty()) {
|
||||
// nothing to do if we don't have regions
|
||||
LOG.trace("no online region found on " + serverName);
|
||||
|
@ -1882,4 +1890,53 @@ public class AssignmentManager implements ServerListener {
|
|||
MasterServices getMaster() {
|
||||
return master;
|
||||
}
|
||||
|
||||
/**
|
||||
* Found the potentially problematic opened regions. There are three case:
|
||||
* case 1. Master thought this region opened, but no regionserver reported it.
|
||||
* case 2. Master thought this region opened on Server1, but regionserver reported Server2
|
||||
* case 3. More than one regionservers reported opened this region
|
||||
*
|
||||
* @return the map of potentially problematic opened regions. Key is the region name. Value is
|
||||
* a pair of location in meta and the regionservers which reported opened this region.
|
||||
*/
|
||||
public Map<String, Pair<ServerName, Set<ServerName>>> getProblematicRegions() {
|
||||
Map<String, Set<ServerName>> reportedOnlineRegions = new HashMap<>();
|
||||
synchronized (rsReports) {
|
||||
for (Map.Entry<ServerName, Set<byte[]>> entry : rsReports.entrySet()) {
|
||||
for (byte[] regionName : entry.getValue()) {
|
||||
reportedOnlineRegions
|
||||
.computeIfAbsent(RegionInfo.getRegionNameAsString(regionName), r -> new HashSet<>())
|
||||
.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = new HashMap<>();
|
||||
List<RegionState> rits = regionStates.getRegionsStateInTransition();
|
||||
for (RegionState regionState : regionStates.getRegionStates()) {
|
||||
// Only consider the opened region and not in transition
|
||||
if (!rits.contains(regionState) && regionState.isOpened()) {
|
||||
String regionName = regionState.getRegion().getRegionNameAsString();
|
||||
ServerName serverName = regionState.getServerName();
|
||||
if (reportedOnlineRegions.containsKey(regionName)) {
|
||||
Set<ServerName> reportedServers = reportedOnlineRegions.get(regionName);
|
||||
if (reportedServers.contains(serverName)) {
|
||||
if (reportedServers.size() > 1) {
|
||||
// More than one regionserver reported opened this region
|
||||
problematicRegions.put(regionName, new Pair<>(serverName, reportedServers));
|
||||
}
|
||||
} else {
|
||||
// Master thought this region opened on Server1, but regionserver reported Server2
|
||||
problematicRegions.put(regionName, new Pair<>(serverName, reportedServers));
|
||||
}
|
||||
} else {
|
||||
// Master thought this region opened, but no regionserver reported it.
|
||||
problematicRegions.put(regionName, new Pair<>(serverName, new HashSet<>()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return problematicRegions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestAMProblematicRegions extends TestAssignmentManagerBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAMProblematicRegions.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAMProblematicRegions.class);
|
||||
|
||||
@Test
|
||||
public void testForMeta() throws Exception {
|
||||
byte[] metaRegionNameAsBytes = RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName();
|
||||
String metaRegionName = RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionNameAsString();
|
||||
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
|
||||
assertEquals(NSERVERS, serverNames.size());
|
||||
|
||||
Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = am.getProblematicRegions();
|
||||
|
||||
// Test for case1: Master thought this region opened, but no regionserver reported it.
|
||||
assertTrue(problematicRegions.containsKey(metaRegionName));
|
||||
Pair<ServerName, Set<ServerName>> pair = problematicRegions.get(metaRegionName);
|
||||
ServerName locationInMeta = pair.getFirst();
|
||||
Set<ServerName> reportedRegionServers = pair.getSecond();
|
||||
assertTrue(serverNames.contains(locationInMeta));
|
||||
assertEquals(0, reportedRegionServers.size());
|
||||
|
||||
// Reported right region location. Then not in problematic regions.
|
||||
am.reportOnlineRegions(locationInMeta, Collections.singleton(metaRegionNameAsBytes));
|
||||
problematicRegions = am.getProblematicRegions();
|
||||
assertFalse(problematicRegions.containsKey(metaRegionName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForUserTable() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testForUserTable");
|
||||
RegionInfo hri = createRegionInfo(tableName, 1);
|
||||
String regionName = hri.getRegionNameAsString();
|
||||
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
|
||||
Future<byte[]> future = submitProcedure(am.createAssignProcedure(hri));
|
||||
waitOnFuture(future);
|
||||
|
||||
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
|
||||
assertEquals(NSERVERS, serverNames.size());
|
||||
|
||||
// Test for case1: Master thought this region opened, but no regionserver reported it.
|
||||
Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = am.getProblematicRegions();
|
||||
assertTrue(problematicRegions.containsKey(regionName));
|
||||
Pair<ServerName, Set<ServerName>> pair = problematicRegions.get(regionName);
|
||||
ServerName locationInMeta = pair.getFirst();
|
||||
Set<ServerName> reportedRegionServers = pair.getSecond();
|
||||
assertTrue(serverNames.contains(locationInMeta));
|
||||
assertEquals(0, reportedRegionServers.size());
|
||||
|
||||
// Test for case2: Master thought this region opened on Server1, but regionserver reported
|
||||
// Server2
|
||||
final ServerName tempLocationInMeta = locationInMeta;
|
||||
final ServerName anotherServer =
|
||||
serverNames.stream().filter(s -> !s.equals(tempLocationInMeta)).findFirst().get();
|
||||
am.reportOnlineRegions(anotherServer, Collections.singleton(hri.getRegionName()));
|
||||
problematicRegions = am.getProblematicRegions();
|
||||
assertTrue(problematicRegions.containsKey(regionName));
|
||||
pair = problematicRegions.get(regionName);
|
||||
locationInMeta = pair.getFirst();
|
||||
reportedRegionServers = pair.getSecond();
|
||||
assertEquals(1, reportedRegionServers.size());
|
||||
assertFalse(reportedRegionServers.contains(locationInMeta));
|
||||
assertTrue(reportedRegionServers.contains(anotherServer));
|
||||
|
||||
// Test for case3: More than one regionservers reported opened this region.
|
||||
am.reportOnlineRegions(locationInMeta, Collections.singleton(hri.getRegionName()));
|
||||
problematicRegions = am.getProblematicRegions();
|
||||
assertTrue(problematicRegions.containsKey(regionName));
|
||||
pair = problematicRegions.get(regionName);
|
||||
locationInMeta = pair.getFirst();
|
||||
reportedRegionServers = pair.getSecond();
|
||||
assertEquals(2, reportedRegionServers.size());
|
||||
assertTrue(reportedRegionServers.contains(locationInMeta));
|
||||
assertTrue(reportedRegionServers.contains(anotherServer));
|
||||
|
||||
// Reported right region location. Then not in problematic regions.
|
||||
am.reportOnlineRegions(anotherServer, Collections.EMPTY_SET);
|
||||
problematicRegions = am.getProblematicRegions();
|
||||
assertFalse(problematicRegions.containsKey(regionName));
|
||||
}
|
||||
}
|
|
@ -18,154 +18,47 @@
|
|||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
|
||||
@Category({MasterTests.class, LargeTests.class})
|
||||
public class TestAssignmentManager {
|
||||
public class TestAssignmentManager extends TestAssignmentManagerBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAssignmentManager.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
@Rule public final ExpectedException exception = ExpectedException.none();
|
||||
|
||||
private static final int PROC_NTHREADS = 64;
|
||||
private static final int NREGIONS = 1 * 1000;
|
||||
private static final int NSERVERS = Math.max(1, NREGIONS / 100);
|
||||
|
||||
private HBaseTestingUtility UTIL;
|
||||
private MockRSProcedureDispatcher rsDispatcher;
|
||||
private MockMasterServices master;
|
||||
private AssignmentManager am;
|
||||
private NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
|
||||
new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>();
|
||||
// Simple executor to run some simple tasks.
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
private ProcedureMetrics assignProcMetrics;
|
||||
private ProcedureMetrics unassignProcMetrics;
|
||||
|
||||
private long assignSubmittedCount = 0;
|
||||
private long assignFailedCount = 0;
|
||||
private long unassignSubmittedCount = 0;
|
||||
private long unassignFailedCount = 0;
|
||||
|
||||
private void setupConfiguration(Configuration conf) throws Exception {
|
||||
FSUtils.setRootDir(conf, UTIL.getDataTestDir());
|
||||
conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
|
||||
conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
|
||||
conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
|
||||
conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
UTIL = new HBaseTestingUtility();
|
||||
this.executor = Executors.newSingleThreadScheduledExecutor();
|
||||
setupConfiguration(UTIL.getConfiguration());
|
||||
master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
|
||||
rsDispatcher = new MockRSProcedureDispatcher(master);
|
||||
master.start(NSERVERS, rsDispatcher);
|
||||
am = master.getAssignmentManager();
|
||||
assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
|
||||
unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
|
||||
setUpMeta();
|
||||
}
|
||||
|
||||
private void setUpMeta() throws Exception {
|
||||
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
|
||||
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
am.wakeMetaLoadedEvent();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
master.stop("tearDown");
|
||||
this.executor.shutdownNow();
|
||||
}
|
||||
|
||||
@Test (expected=NullPointerException.class)
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException {
|
||||
// Test what happens if we pass in null server. I'd expect it throws NPE.
|
||||
if (this.am.waitServerReportEvent(null, null)) throw new UnexpectedStateException();
|
||||
if (this.am.waitServerReportEvent(null, null)) {
|
||||
throw new UnexpectedStateException();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -471,484 +364,4 @@ public class TestAssignmentManager {
|
|||
assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
|
||||
assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
|
||||
}
|
||||
|
||||
|
||||
private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
|
||||
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
||||
private byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
|
||||
try {
|
||||
return future.get(5, TimeUnit.SECONDS);
|
||||
} catch (ExecutionException e) {
|
||||
LOG.info("ExecutionException", e);
|
||||
Exception ee = (Exception)e.getCause();
|
||||
if (ee instanceof InterruptedIOException) {
|
||||
for (Procedure<?> p: this.master.getMasterProcedureExecutor().getProcedures()) {
|
||||
LOG.info(p.toStringDetails());
|
||||
}
|
||||
}
|
||||
throw (Exception)e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
// Helpers
|
||||
// ============================================================================================
|
||||
private void bulkSubmit(final AssignProcedure[] procs) throws Exception {
|
||||
final Thread[] threads = new Thread[PROC_NTHREADS];
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
final int threadId = i;
|
||||
threads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
TableName tableName = TableName.valueOf("table-" + threadId);
|
||||
int n = (procs.length / threads.length);
|
||||
int start = threadId * n;
|
||||
int stop = start + n;
|
||||
for (int j = start; j < stop; ++j) {
|
||||
procs[j] = createAndSubmitAssign(tableName, j);
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i].start();
|
||||
}
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
|
||||
procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
|
||||
}
|
||||
}
|
||||
|
||||
private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) {
|
||||
RegionInfo hri = createRegionInfo(tableName, regionId);
|
||||
AssignProcedure proc = am.createAssignProcedure(hri);
|
||||
master.getMasterProcedureExecutor().submitProcedure(proc);
|
||||
return proc;
|
||||
}
|
||||
|
||||
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
|
||||
return RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes(regionId))
|
||||
.setEndKey(Bytes.toBytes(regionId + 1))
|
||||
.setSplit(false)
|
||||
.setRegionId(0)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void sendTransitionReport(final ServerName serverName,
|
||||
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
|
||||
final TransitionCode state) throws IOException {
|
||||
ReportRegionStateTransitionRequest.Builder req =
|
||||
ReportRegionStateTransitionRequest.newBuilder();
|
||||
req.setServer(ProtobufUtil.toServerName(serverName));
|
||||
req.addTransition(RegionStateTransition.newBuilder()
|
||||
.addRegionInfo(regionInfo)
|
||||
.setTransitionCode(state)
|
||||
.setOpenSeqNum(1)
|
||||
.build());
|
||||
am.reportRegionStateTransition(req.build());
|
||||
}
|
||||
|
||||
private void doCrash(final ServerName serverName) {
|
||||
this.am.submitServerCrash(serverName, false/*No WALs here*/);
|
||||
}
|
||||
|
||||
private void doRestart(final ServerName serverName) {
|
||||
try {
|
||||
this.master.restartRegionServer(serverName);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Can not restart RS with new startcode");
|
||||
}
|
||||
}
|
||||
|
||||
private class NoopRsExecutor implements MockRSExecutor {
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
ExecuteProceduresRequest request) throws IOException {
|
||||
if (request.getOpenRegionCount() > 0) {
|
||||
for (OpenRegionRequest req : request.getOpenRegionList()) {
|
||||
for (RegionOpenInfo openReq : req.getOpenInfoList()) {
|
||||
execOpenRegion(server, openReq);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (request.getCloseRegionCount() > 0) {
|
||||
for (CloseRegionRequest req : request.getCloseRegionList()) {
|
||||
execCloseRegion(server, req.getRegion().getValue().toByteArray());
|
||||
}
|
||||
}
|
||||
return ExecuteProceduresResponse.newBuilder().build();
|
||||
}
|
||||
|
||||
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private class GoodRsExecutor extends NoopRsExecutor {
|
||||
@Override
|
||||
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
// Concurrency?
|
||||
// Now update the state of our cluster in regionsToRegionServers.
|
||||
SortedSet<byte []> regions = regionsToRegionServers.get(server);
|
||||
if (regions == null) {
|
||||
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
regionsToRegionServers.put(server, regions);
|
||||
}
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||
if (regions.contains(hri.getRegionName())) {
|
||||
throw new UnsupportedOperationException(hri.getRegionNameAsString());
|
||||
}
|
||||
regions.add(hri.getRegionName());
|
||||
return RegionOpeningState.OPENED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
||||
return CloseRegionResponse.newBuilder().setClosed(true).build();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
throw new ServerNotRunningYetException("wait on server startup");
|
||||
}
|
||||
}
|
||||
|
||||
private static class FaultyRsExecutor implements MockRSExecutor {
|
||||
private final IOException exception;
|
||||
|
||||
public FaultyRsExecutor(final IOException exception) {
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
private class SocketTimeoutRsExecutor extends GoodRsExecutor {
|
||||
private final int maxSocketTimeoutRetries;
|
||||
private final int maxServerRetries;
|
||||
|
||||
private ServerName lastServer;
|
||||
private int sockTimeoutRetries;
|
||||
private int serverRetries;
|
||||
|
||||
public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
|
||||
this.maxServerRetries = maxServerRetries;
|
||||
this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
// SocketTimeoutException should be a temporary problem
|
||||
// unless the server will be declared dead.
|
||||
if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
|
||||
if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
|
||||
lastServer = server;
|
||||
LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
} else if (serverRetries++ < maxServerRetries) {
|
||||
LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
|
||||
master.getServerManager().moveFromOnlineToDeadServers(server);
|
||||
sockTimeoutRetries = 0;
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
} else {
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes open request and then returns nothing so acts like a RS that went zombie.
|
||||
* No response (so proc is stuck/suspended on the Master and won't wake up.). We
|
||||
* then send in a crash for this server after a few seconds; crash is supposed to
|
||||
* take care of the suspended procedures.
|
||||
*/
|
||||
private class HangThenRSCrashExecutor extends GoodRsExecutor {
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
if (this.invocations++ > 0) {
|
||||
// Return w/o problem the second time through here.
|
||||
return super.execOpenRegion(server, openReq);
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes open request and then returns nothing so acts like a RS that went zombie.
|
||||
* No response (so proc is stuck/suspended on the Master and won't wake up.).
|
||||
* Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create
|
||||
* ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor
|
||||
* will restart RS directly, situation for RS crashed when SCP is not enabled.
|
||||
*/
|
||||
private class HangThenRSRestartExecutor extends GoodRsExecutor {
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
if (this.invocations++ > 0) {
|
||||
// Return w/o problem the second time through here.
|
||||
return super.execOpenRegion(server, openReq);
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Restarting RS of " + server);
|
||||
doRestart(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
|
||||
public static final int TYPES_OF_FAILURE = 6;
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
switch (this.invocations++) {
|
||||
case 0: throw new NotServingRegionException("Fake");
|
||||
case 1:
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
throw new RegionServerAbortedException("Fake!");
|
||||
case 2:
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
throw new RegionServerStoppedException("Fake!");
|
||||
case 3: throw new ServerNotRunningYetException("Fake!");
|
||||
case 4:
|
||||
LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
default:
|
||||
return super.execCloseRegion(server, regionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class RandRsExecutor extends NoopRsExecutor {
|
||||
private final Random rand = new Random();
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
switch (rand.nextInt(5)) {
|
||||
case 0: throw new ServerNotRunningYetException("wait on server startup");
|
||||
case 1: throw new SocketTimeoutException("simulate socket timeout");
|
||||
case 2: throw new RemoteException("java.io.IOException", "unexpected exception");
|
||||
default:
|
||||
// fall out
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
switch (rand.nextInt(6)) {
|
||||
case 0:
|
||||
LOG.info("Return OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
return OpenRegionResponse.RegionOpeningState.OPENED;
|
||||
case 1:
|
||||
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
|
||||
return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
|
||||
case 2:
|
||||
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
|
||||
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
|
||||
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
|
||||
default:
|
||||
// fall out
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds...");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Delayed CRASHING of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
|
||||
boolean closed = rand.nextBoolean();
|
||||
if (closed) {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
|
||||
}
|
||||
resp.setClosed(closed);
|
||||
return resp.build();
|
||||
}
|
||||
}
|
||||
|
||||
protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private boolean invoked = false;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
if (!invoked) {
|
||||
lastServer = server;
|
||||
invoked = true;
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
// better select another server since the server is over loaded, but anyway, it is fine to
|
||||
// still select the same server since it is not dead yet...
|
||||
if (lastServer.equals(server)) {
|
||||
LOG.warn("We still select the same server, which is not good.");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private final int queueFullTimes;
|
||||
|
||||
private int retries;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
|
||||
this.queueFullTimes = queueFullTimes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException {
|
||||
retries++;
|
||||
if (retries == 1) {
|
||||
lastServer = server;
|
||||
throw new CallTimeoutException("simulate call timeout");
|
||||
}
|
||||
// should always retry on the same server
|
||||
assertEquals(lastServer, server);
|
||||
if (retries < queueFullTimes) {
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
private interface MockRSExecutor {
|
||||
ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
private class MockRSProcedureDispatcher extends RSProcedureDispatcher {
|
||||
private MockRSExecutor mockRsExec;
|
||||
|
||||
public MockRSProcedureDispatcher(final MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
|
||||
this.mockRsExec = mockRsExec;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
|
||||
submitTask(new MockRemoteCall(serverName, remoteProcedures));
|
||||
}
|
||||
|
||||
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
|
||||
public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
|
||||
super(serverName, operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchOpenRequests(MasterProcedureEnv env,
|
||||
List<RegionOpenOperation> operations) {
|
||||
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchCloseRequests(MasterProcedureEnv env,
|
||||
List<RegionCloseOperation> operations) {
|
||||
for (RegionCloseOperation op : operations) {
|
||||
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
|
||||
final ExecuteProceduresRequest request) throws IOException {
|
||||
return mockRsExec.sendRequest(serverName, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void collectAssignmentManagerMetrics() {
|
||||
assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
|
||||
assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
|
||||
unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
|
||||
unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,629 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
||||
|
||||
/**
|
||||
* Base class for AM test.
|
||||
*/
|
||||
public class TestAssignmentManagerBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
@Rule
|
||||
public final ExpectedException exception = ExpectedException.none();
|
||||
|
||||
private static final int PROC_NTHREADS = 64;
|
||||
protected static final int NREGIONS = 1 * 1000;
|
||||
protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
|
||||
|
||||
protected HBaseTestingUtility UTIL;
|
||||
protected MockRSProcedureDispatcher rsDispatcher;
|
||||
protected MockMasterServices master;
|
||||
protected AssignmentManager am;
|
||||
protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
|
||||
new ConcurrentSkipListMap<>();
|
||||
// Simple executor to run some simple tasks.
|
||||
protected ScheduledExecutorService executor;
|
||||
|
||||
protected ProcedureMetrics assignProcMetrics;
|
||||
protected ProcedureMetrics unassignProcMetrics;
|
||||
|
||||
protected long assignSubmittedCount = 0;
|
||||
protected long assignFailedCount = 0;
|
||||
protected long unassignSubmittedCount = 0;
|
||||
protected long unassignFailedCount = 0;
|
||||
|
||||
protected void setupConfiguration(Configuration conf) throws Exception {
|
||||
FSUtils.setRootDir(conf, UTIL.getDataTestDir());
|
||||
conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
|
||||
conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
|
||||
conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
|
||||
conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
UTIL = new HBaseTestingUtility();
|
||||
this.executor = Executors.newSingleThreadScheduledExecutor();
|
||||
setupConfiguration(UTIL.getConfiguration());
|
||||
master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
|
||||
rsDispatcher = new MockRSProcedureDispatcher(master);
|
||||
master.start(NSERVERS, rsDispatcher);
|
||||
am = master.getAssignmentManager();
|
||||
assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
|
||||
unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
|
||||
setUpMeta();
|
||||
}
|
||||
|
||||
private void setUpMeta() throws Exception {
|
||||
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
|
||||
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
am.wakeMetaLoadedEvent();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
master.stop("tearDown");
|
||||
this.executor.shutdownNow();
|
||||
}
|
||||
|
||||
protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
|
||||
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
||||
protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
|
||||
try {
|
||||
return future.get(5, TimeUnit.SECONDS);
|
||||
} catch (ExecutionException e) {
|
||||
LOG.info("ExecutionException", e);
|
||||
Exception ee = (Exception) e.getCause();
|
||||
if (ee instanceof InterruptedIOException) {
|
||||
for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) {
|
||||
LOG.info(p.toStringDetails());
|
||||
}
|
||||
}
|
||||
throw (Exception) e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
// Helpers
|
||||
// ============================================================================================
|
||||
protected void bulkSubmit(final AssignProcedure[] procs) throws Exception {
|
||||
final Thread[] threads = new Thread[PROC_NTHREADS];
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
final int threadId = i;
|
||||
threads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
TableName tableName = TableName.valueOf("table-" + threadId);
|
||||
int n = (procs.length / threads.length);
|
||||
int start = threadId * n;
|
||||
int stop = start + n;
|
||||
for (int j = start; j < stop; ++j) {
|
||||
procs[j] = createAndSubmitAssign(tableName, j);
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i].start();
|
||||
}
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
|
||||
procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
|
||||
}
|
||||
}
|
||||
|
||||
private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) {
|
||||
RegionInfo hri = createRegionInfo(tableName, regionId);
|
||||
AssignProcedure proc = am.createAssignProcedure(hri);
|
||||
master.getMasterProcedureExecutor().submitProcedure(proc);
|
||||
return proc;
|
||||
}
|
||||
|
||||
protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
|
||||
return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
|
||||
.setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build();
|
||||
}
|
||||
|
||||
private void sendTransitionReport(final ServerName serverName,
|
||||
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
|
||||
final RegionServerStatusProtos.RegionStateTransition.TransitionCode state)
|
||||
throws IOException {
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req =
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder();
|
||||
req.setServer(ProtobufUtil.toServerName(serverName));
|
||||
req.addTransition(
|
||||
RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
|
||||
.setTransitionCode(state).setOpenSeqNum(1).build());
|
||||
am.reportRegionStateTransition(req.build());
|
||||
}
|
||||
|
||||
private void doCrash(final ServerName serverName) {
|
||||
this.am.submitServerCrash(serverName, false/*No WALs here*/);
|
||||
}
|
||||
|
||||
private void doRestart(final ServerName serverName) {
|
||||
try {
|
||||
this.master.restartRegionServer(serverName);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Can not restart RS with new startcode");
|
||||
}
|
||||
}
|
||||
|
||||
private class NoopRsExecutor implements MockRSExecutor {
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest request) throws IOException {
|
||||
if (request.getOpenRegionCount() > 0) {
|
||||
for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) {
|
||||
for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : req.getOpenInfoList()) {
|
||||
execOpenRegion(server, openReq);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (request.getCloseRegionCount() > 0) {
|
||||
for (AdminProtos.CloseRegionRequest req : request.getCloseRegionList()) {
|
||||
execCloseRegion(server, req.getRegion().getValue().toByteArray());
|
||||
}
|
||||
}
|
||||
return AdminProtos.ExecuteProceduresResponse.newBuilder().build();
|
||||
}
|
||||
|
||||
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
|
||||
AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected class GoodRsExecutor extends NoopRsExecutor {
|
||||
@Override
|
||||
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
|
||||
AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws IOException {
|
||||
sendTransitionReport(server, openReq.getRegion(),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
|
||||
// Concurrency?
|
||||
// Now update the state of our cluster in regionsToRegionServers.
|
||||
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
|
||||
if (regions == null) {
|
||||
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
regionsToRegionServers.put(server, regions);
|
||||
}
|
||||
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
|
||||
if (regions.contains(hri.getRegionName())) {
|
||||
throw new UnsupportedOperationException(hri.getRegionNameAsString());
|
||||
}
|
||||
regions.add(hri.getRegionName());
|
||||
return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
|
||||
return AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build();
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
throw new ServerNotRunningYetException("wait on server startup");
|
||||
}
|
||||
}
|
||||
|
||||
protected static class FaultyRsExecutor implements MockRSExecutor {
|
||||
private final IOException exception;
|
||||
|
||||
public FaultyRsExecutor(final IOException exception) {
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
|
||||
private final int maxSocketTimeoutRetries;
|
||||
private final int maxServerRetries;
|
||||
|
||||
private ServerName lastServer;
|
||||
private int sockTimeoutRetries;
|
||||
private int serverRetries;
|
||||
|
||||
public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
|
||||
this.maxServerRetries = maxServerRetries;
|
||||
this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
// SocketTimeoutException should be a temporary problem
|
||||
// unless the server will be declared dead.
|
||||
if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
|
||||
if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
|
||||
lastServer = server;
|
||||
LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
} else if (serverRetries++ < maxServerRetries) {
|
||||
LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
|
||||
master.getServerManager().moveFromOnlineToDeadServers(server);
|
||||
sockTimeoutRetries = 0;
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
} else {
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes open request and then returns nothing so acts like a RS that went zombie.
|
||||
* No response (so proc is stuck/suspended on the Master and won't wake up.). We
|
||||
* then send in a crash for this server after a few seconds; crash is supposed to
|
||||
* take care of the suspended procedures.
|
||||
*/
|
||||
protected class HangThenRSCrashExecutor extends GoodRsExecutor {
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
|
||||
final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
if (this.invocations++ > 0) {
|
||||
// Return w/o problem the second time through here.
|
||||
return super.execOpenRegion(server, openReq);
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes open request and then returns nothing so acts like a RS that went zombie.
|
||||
* No response (so proc is stuck/suspended on the Master and won't wake up.).
|
||||
* Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create
|
||||
* ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor
|
||||
* will restart RS directly, situation for RS crashed when SCP is not enabled.
|
||||
*/
|
||||
protected class HangThenRSRestartExecutor extends GoodRsExecutor {
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
|
||||
final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
if (this.invocations++ > 0) {
|
||||
// Return w/o problem the second time through here.
|
||||
return super.execOpenRegion(server, openReq);
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Restarting RS of " + server);
|
||||
doRestart(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
|
||||
public static final int TYPES_OF_FAILURE = 6;
|
||||
private int invocations;
|
||||
|
||||
@Override
|
||||
protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
switch (this.invocations++) {
|
||||
case 0:
|
||||
throw new NotServingRegionException("Fake");
|
||||
case 1:
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
throw new RegionServerAbortedException("Fake!");
|
||||
case 2:
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
throw new RegionServerStoppedException("Fake!");
|
||||
case 3:
|
||||
throw new ServerNotRunningYetException("Fake!");
|
||||
case 4:
|
||||
LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Sending in CRASH of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
return null;
|
||||
default:
|
||||
return super.execCloseRegion(server, regionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected class RandRsExecutor extends NoopRsExecutor {
|
||||
private final Random rand = new Random();
|
||||
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
switch (rand.nextInt(5)) {
|
||||
case 0:
|
||||
throw new ServerNotRunningYetException("wait on server startup");
|
||||
case 1:
|
||||
throw new SocketTimeoutException("simulate socket timeout");
|
||||
case 2:
|
||||
throw new RemoteException("java.io.IOException", "unexpected exception");
|
||||
default:
|
||||
// fall out
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
|
||||
final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
|
||||
throws IOException {
|
||||
switch (rand.nextInt(6)) {
|
||||
case 0:
|
||||
LOG.info("Return OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
|
||||
return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
|
||||
case 1:
|
||||
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
|
||||
sendTransitionReport(server, openReq.getRegion(),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
|
||||
return AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
|
||||
case 2:
|
||||
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
|
||||
sendTransitionReport(server, openReq.getRegion(),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN);
|
||||
return AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
|
||||
default:
|
||||
// fall out
|
||||
}
|
||||
// The procedure on master will just hang forever because nothing comes back
|
||||
// from the RS in this case.
|
||||
LOG.info(
|
||||
"Return null as response; means proc stuck so we send in a crash report after a few seconds...");
|
||||
executor.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Delayed CRASHING of " + server);
|
||||
doCrash(server);
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
|
||||
throws IOException {
|
||||
AdminProtos.CloseRegionResponse.Builder resp = AdminProtos.CloseRegionResponse.newBuilder();
|
||||
boolean closed = rand.nextBoolean();
|
||||
if (closed) {
|
||||
RegionInfo hri = am.getRegionInfo(regionName);
|
||||
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
|
||||
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
|
||||
}
|
||||
resp.setClosed(closed);
|
||||
return resp.build();
|
||||
}
|
||||
}
|
||||
|
||||
protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private boolean invoked = false;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
if (!invoked) {
|
||||
lastServer = server;
|
||||
invoked = true;
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
// better select another server since the server is over loaded, but anyway, it is fine to
|
||||
// still select the same server since it is not dead yet...
|
||||
if (lastServer.equals(server)) {
|
||||
LOG.warn("We still select the same server, which is not good.");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
|
||||
|
||||
private final int queueFullTimes;
|
||||
|
||||
private int retries;
|
||||
|
||||
private ServerName lastServer;
|
||||
|
||||
public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
|
||||
this.queueFullTimes = queueFullTimes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException {
|
||||
retries++;
|
||||
if (retries == 1) {
|
||||
lastServer = server;
|
||||
throw new CallTimeoutException("simulate call timeout");
|
||||
}
|
||||
// should always retry on the same server
|
||||
assertEquals(lastServer, server);
|
||||
if (retries < queueFullTimes) {
|
||||
throw new CallQueueTooBigException("simulate queue full");
|
||||
}
|
||||
return super.sendRequest(server, req);
|
||||
}
|
||||
}
|
||||
|
||||
protected interface MockRSExecutor {
|
||||
AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
|
||||
AdminProtos.ExecuteProceduresRequest req) throws IOException;
|
||||
}
|
||||
|
||||
protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
|
||||
private MockRSExecutor mockRsExec;
|
||||
|
||||
public MockRSProcedureDispatcher(final MasterServices master) {
|
||||
super(master);
|
||||
}
|
||||
|
||||
public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
|
||||
this.mockRsExec = mockRsExec;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
|
||||
submitTask(new MockRemoteCall(serverName, remoteProcedures));
|
||||
}
|
||||
|
||||
private class MockRemoteCall extends ExecuteProceduresRemoteCall {
|
||||
public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
|
||||
super(serverName, operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchOpenRequests(MasterProcedureEnv env,
|
||||
List<RegionOpenOperation> operations) {
|
||||
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchCloseRequests(MasterProcedureEnv env,
|
||||
List<RegionCloseOperation> operations) {
|
||||
for (RegionCloseOperation op : operations) {
|
||||
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
|
||||
final AdminProtos.ExecuteProceduresRequest request) throws IOException {
|
||||
return mockRsExec.sendRequest(serverName, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void collectAssignmentManagerMetrics() {
|
||||
assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
|
||||
assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
|
||||
unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
|
||||
unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue