HBASE-26029 It is not reliable to use nodeDeleted event to track region server's death (#3430)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2021-06-30 08:44:19 +08:00 committed by GitHub
parent 64d4915ca8
commit 51893b9ba3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 856 additions and 1205 deletions

View File

@ -312,8 +312,9 @@ enum ServerCrashState {
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
SERVER_CRASH_SPLIT_META_LOGS = 10;
SERVER_CRASH_ASSIGN_META = 11;
SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR = 12;
SERVER_CRASH_DELETE_SPLIT_WALS_DIR = 13;
SERVER_CRASH_CLAIM_REPLICATION_QUEUES = 14;
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
SERVER_CRASH_FINISH = 100;
}
@ -624,3 +625,23 @@ enum SplitWALState{
DISPATCH_WAL_TO_WORKER = 2;
RELEASE_SPLIT_WORKER = 3;
}
message ClaimReplicationQueuesStateData {
required ServerName crashed_server = 1;
}
message ClaimReplicationQueueRemoteStateData {
required ServerName crashed_server = 1;
required string queue = 2;
required ServerName target_server = 3;
}
message ClaimReplicationQueueRemoteParameter {
required ServerName crashed_server = 1;
required string queue = 2;
}
enum ClaimReplicationQueuesState {
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
CLAIM_REPLICATION_QUEUES_FINISH = 2;
}

View File

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link ReplicationTracker} implementation which polls the region servers list periodically from
* master.
*/
@InterfaceAudience.Private
class MasterReplicationTracker extends ReplicationTrackerBase {
private static final Logger LOG = LoggerFactory.getLogger(MasterReplicationTracker.class);
static final String REFRESH_INTERVAL_SECONDS =
"hbase.replication.tracker.master.refresh.interval.secs";
// default to refresh every 5 seconds
static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
private final ChoreService choreService;
private final ScheduledChore chore;
private final Admin admin;
private volatile Set<ServerName> regionServers;
MasterReplicationTracker(ReplicationTrackerParams params) {
try {
this.admin = params.connection().getAdmin();
} catch (IOException e) {
// should not happen
throw new AssertionError(e);
}
this.choreService = params.choreService();
int refreshIntervalSecs =
params.conf().getInt(REFRESH_INTERVAL_SECONDS, REFRESH_INTERVAL_SECONDS_DEFAULT);
this.chore = new ScheduledChore(getClass().getSimpleName(), params.stopptable(),
refreshIntervalSecs, 0, TimeUnit.SECONDS) {
@Override
protected void chore() {
try {
refresh();
} catch (IOException e) {
LOG.warn("failed to refresh region server list for replication", e);
}
}
};
}
private Set<ServerName> reload() throws IOException {
return Collections.unmodifiableSet(new HashSet<>(admin.getRegionServers()));
}
private void refresh() throws IOException {
Set<ServerName> newRegionServers = reload();
for (ServerName oldRs : regionServers) {
if (!newRegionServers.contains(oldRs)) {
notifyListeners(oldRs);
}
}
this.regionServers = newRegionServers;
}
@Override
protected Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException {
Set<ServerName> newRegionServers = reload();
this.regionServers = newRegionServers;
choreService.scheduleChore(chore);
return newRegionServers;
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -36,10 +35,4 @@ public final class ReplicationFactory {
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
return new ReplicationPeers(zk, conf);
}
public static ReplicationTracker getReplicationTracker(ReplicationTrackerParams params) {
Class<? extends ReplicationTracker> clazz = params.conf().getClass(REPLICATION_TRACKER_IMPL,
ZKReplicationTracker.class, ReplicationTracker.class);
return ReflectionUtils.newInstance(clazz, params);
}
}

View File

@ -1,61 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This is the interface for a Replication Tracker.
* <p/>
* A replication tracker provides the facility to subscribe and track events that reflect a change
* in replication state. These events are used by the ReplicationSourceManager to coordinate
* replication tasks such as addition/deletion of queues and queue failover. These events are
* defined in the ReplicationListener interface. If a class would like to listen to replication
* events it must implement the ReplicationListener interface and register itself with a Replication
* Tracker.
*/
@InterfaceAudience.Private
public interface ReplicationTracker {
/**
* Register a replication listener to receive replication events.
* @param listener the listener to register
*/
void registerListener(ReplicationListener listener);
/**
* Remove a replication listener
* @param listener the listener to remove
*/
void removeListener(ReplicationListener listener);
/**
* In this method, you need to load the newest list of region server list and return it, and all
* later changes to the region server list must be passed to the listeners.
* <p/>
* This is very important for us to not miss a region server crash.
* <p/>
* Notice that this method can only be called once.
* @return Set of region servers.
*/
Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws IOException;
}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base implementation class for replication tracker.
*/
@InterfaceAudience.Private
abstract class ReplicationTrackerBase implements ReplicationTracker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerBase.class);
// listeners to be notified
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public void registerListener(ReplicationListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(ReplicationListener listener) {
listeners.remove(listener);
}
protected final void notifyListeners(ServerName regionServer) {
LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
for (ReplicationListener listener : listeners) {
listener.regionServerRemoved(regionServer);
}
}
@Override
public final Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException(
"loadLiveRegionServersAndInitializeListeners can only be called once");
}
return internalLoadLiveRegionServersAndInitializeListeners();
}
protected abstract Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException;
}

View File

@ -1,98 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Parameters for constructing a {@link ReplicationTracker}.
*/
@InterfaceAudience.Private
public final class ReplicationTrackerParams {
private final Configuration conf;
private final Stoppable stopptable;
private ZKWatcher zookeeper;
private Abortable abortable;
private Connection conn;
private ChoreService choreService;
private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
this.conf = conf;
this.stopptable = stopptable;
}
public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
this.zookeeper = zookeeper;
return this;
}
public ReplicationTrackerParams abortable(Abortable abortable) {
this.abortable = abortable;
return this;
}
public ReplicationTrackerParams connection(Connection conn) {
this.conn = conn;
return this;
}
public ReplicationTrackerParams choreService(ChoreService choreService) {
this.choreService = choreService;
return this;
}
public Configuration conf() {
return conf;
}
public Stoppable stopptable() {
return stopptable;
}
public ZKWatcher zookeeper() {
return zookeeper;
}
public Abortable abortable() {
return abortable;
}
public Connection connection() {
return conn;
}
public ChoreService choreService() {
return choreService;
}
public static ReplicationTrackerParams create(Configuration conf, Stoppable stopptable) {
return new ReplicationTrackerParams(conf, stopptable);
}
}

View File

@ -1,175 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
/**
* This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is
* responsible for handling replication events that are defined in the ReplicationListener
* interface.
*/
@InterfaceAudience.Private
class ZKReplicationTracker extends ReplicationTrackerBase {
// Zookeeper
private final ZKWatcher zookeeper;
// Server to abort.
private final Abortable abortable;
// All about stopping
private final Stoppable stopper;
// List of all the other region servers in this cluster
private final Set<ServerName> regionServers = new HashSet<>();
ZKReplicationTracker(ReplicationTrackerParams params) {
this.zookeeper = params.zookeeper();
this.abortable = params.abortable();
this.stopper = params.stopptable();
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
}
/**
* Watcher used to be notified of the other region server's death in the local cluster. It
* initiates the process to transfer the queues if it is able to grab the lock.
*/
public class OtherRegionServerWatcher extends ZKListener {
/**
* Construct a ZooKeeper event listener.
*/
public OtherRegionServerWatcher(ZKWatcher watcher) {
super(watcher);
}
/**
* Called when a new node has been created.
* @param path full path of the new node
*/
@Override
public void nodeCreated(String path) {
if (stopper.isStopped()) {
return;
}
refreshListIfRightPath(path);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
@Override
public void nodeDeleted(String path) {
if (stopper.isStopped()) {
return;
}
if (!refreshListIfRightPath(path)) {
return;
}
notifyListeners(ServerName.valueOf(getZNodeName(path)));
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
if (stopper.isStopped()) {
return;
}
refreshListIfRightPath(path);
}
private boolean refreshListIfRightPath(String path) {
if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
return false;
}
return refreshRegionServerList();
}
}
/**
* Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from
* @return the id or an empty string if path is invalid
*/
private String getZNodeName(String fullPath) {
List<String> parts = Splitter.on('/').splitToList(fullPath);
return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
}
/**
* Reads the list of region servers from ZK and atomically clears our local view of it and
* replaces it with the updated list.
* @return true if the local list of the other region servers was updated with the ZK data (even
* if it was empty), false if the data was missing in ZK
*/
private boolean refreshRegionServerList() {
Set<ServerName> newRsList = getRegisteredRegionServers();
if (newRsList == null) {
return false;
} else {
synchronized (regionServers) {
regionServers.clear();
regionServers.addAll(newRsList);
}
}
return true;
}
/**
* Get a list of all the other region servers in this cluster and set a watch
* @return a list of server nanes
*/
private Set<ServerName> getRegisteredRegionServers() {
List<String> result = null;
try {
result =
ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
return result == null ? null :
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
}
@Override
protected Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException {
if (!refreshRegionServerList()) {
throw new IOException("failed to refresh region server list");
}
synchronized (regionServers) {
return new HashSet<>(regionServers);
}
}
}

View File

@ -1,110 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for testing {@link ReplicationTracker} and {@link ReplicationListener}.
*/
public abstract class ReplicationTrackerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
private ReplicationTracker rt;
private AtomicInteger rsRemovedCount;
private volatile ServerName rsRemovedData;
@Before
public void setUp() {
ReplicationTrackerParams params = createParams();
rt = ReplicationFactory.getReplicationTracker(params);
rsRemovedCount = new AtomicInteger(0);
rsRemovedData = null;
}
protected abstract ReplicationTrackerParams createParams();
protected abstract void addServer(ServerName sn) throws Exception;
protected abstract void removeServer(ServerName sn) throws Exception;
@Test
public void testWatchRegionServers() throws Exception {
ServerName sn =
ServerName.valueOf("hostname2.example.org,1234," + EnvironmentEdgeManager.currentTime());
addServer(sn);
rt.registerListener(new DummyReplicationListener());
assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
// delete one
removeServer(sn);
// wait for event
Waiter.waitFor(HBaseConfiguration.create(), 15000, () -> rsRemovedCount.get() >= 1);
assertEquals(sn, rsRemovedData);
}
private class DummyReplicationListener implements ReplicationListener {
@Override
public void regionServerRemoved(ServerName regionServer) {
rsRemovedData = regionServer;
rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer);
}
}
protected static class WarnOnlyStoppable implements Stoppable {
@Override
public void stop(String why) {
LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " + why);
}
@Override
public boolean isStopped() {
return false;
}
}
protected static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " + why);
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -1,87 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
private static Configuration CONF;
private static Connection CONN;
private static ChoreService CHORE_SERVICE;
private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
@BeforeClass
public static void setUpBeforeClass() throws IOException {
CONF = HBaseConfiguration.create();
CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, MasterReplicationTracker.class,
ReplicationTracker.class);
Admin admin = mock(Admin.class);
when(admin.getRegionServers()).thenReturn(SERVERS);
CONN = mock(Connection.class);
when(CONN.getAdmin()).thenReturn(admin);
CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
}
@AfterClass
public static void tearDownAfterClass() {
CHORE_SERVICE.shutdown();
}
@Override
protected ReplicationTrackerParams createParams() {
return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
}
@Override
protected void addServer(ServerName sn) throws Exception {
SERVERS.add(sn);
}
@Override
protected void removeServer(ServerName sn) throws Exception {
SERVERS.remove(sn);
}
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
private static Configuration CONF;
private static HBaseZKTestingUtility UTIL;
private static ZKWatcher ZKW;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL = new HBaseZKTestingUtility();
UTIL.startMiniZKCluster();
CONF = UTIL.getConfiguration();
CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, ZKReplicationTracker.class,
ReplicationTracker.class);
ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
}
@Override
protected ReplicationTrackerParams createParams() {
return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(ZKW, true);
UTIL.shutdownMiniZKCluster();
}
@Override
protected void addServer(ServerName sn) throws Exception {
ZKUtil.createAndWatch(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
HConstants.EMPTY_BYTE_ARRAY);
}
@Override
protected void removeServer(ServerName sn) throws Exception {
ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()));
}
}

View File

@ -294,7 +294,14 @@ public enum EventType {
*
* RS_REPLAY_SYNC_REPLICATION_WAL
*/
RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL);
RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL),
/**
* RS claim replication queue.<br>
*
* RS_CLAIM_REPLICATION_QUEUE
*/
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
private final int code;
private final ExecutorType executor;

View File

@ -52,7 +52,8 @@ public enum ExecutorType {
RS_REFRESH_PEER(31),
RS_REPLAY_SYNC_REPLICATION_WAL(32),
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34);
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35);
ExecutorType(int value) {
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.SplitWALManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -234,11 +235,15 @@ public class ServerCrashProcedure
}
assignRegions(env, regionsOnCrashedServer);
}
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
break;
case SERVER_CRASH_HANDLE_RIT2:
// Noop. Left in place because we used to call handleRIT here for a second time
// but no longer necessary since HBASE-20634.
setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
break;
case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
addChildProcedure(new ClaimReplicationQueuesProcedure(serverName));
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
case SERVER_CRASH_FINISH:

View File

@ -27,18 +27,30 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface ServerProcedureInterface {
public enum ServerOperationType {
CRASH_HANDLER, SWITCH_RPC_THROTTLE,
CRASH_HANDLER,
SWITCH_RPC_THROTTLE,
/**
* help find a available region server as worker and release worker after task done
* invoke SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker
* manage the split wal task flow, will retry if SPLIT_WAL_REMOTE failed
* help find a available region server as worker and release worker after task done invoke
* SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker manage the split wal
* task flow, will retry if SPLIT_WAL_REMOTE failed
*/
SPLIT_WAL,
/**
* send the split WAL request to region server and handle the response
*/
SPLIT_WAL_REMOTE
SPLIT_WAL_REMOTE,
/**
* Get all the replication queues of a crash server and assign them to other region servers
*/
CLAIM_REPLICATION_QUEUES,
/**
* send the claim replication queue request to region server to actually assign it
*/
CLAIM_REPLICATION_QUEUE_REMOTE
}
/**

View File

@ -38,6 +38,8 @@ class ServerQueue extends Queue<ServerName> {
case SWITCH_RPC_THROTTLE:
case SPLIT_WAL:
case SPLIT_WAL_REMOTE:
case CLAIM_REPLICATION_QUEUES:
case CLAIM_REPLICATION_QUEUE_REMOTE:
return false;
default:
break;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
@InterfaceAudience.Private
public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
private static final Logger LOG =
LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class);
private ServerName crashedServer;
private String queue;
public ClaimReplicationQueueRemoteProcedure() {
}
public ClaimReplicationQueueRemoteProcedure(ServerName crashedServer, String queue,
ServerName targetServer) {
this.crashedServer = crashedServer;
this.queue = queue;
this.targetServer = targetServer;
}
@Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class,
ClaimReplicationQueueRemoteParameter.newBuilder()
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build()
.toByteArray()));
}
@Override
public ServerName getServerName() {
// return crashed server here, as we are going to recover its replication queues so we should
// use its scheduler queue instead of the one for the target server.
return crashedServer;
}
@Override
public boolean hasMetaTableRegion() {
return false;
}
@Override
public ServerOperationType getServerOperationType() {
return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE;
}
@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to claim replication queue {} of crashed server on server {} ", queue,
crashedServer, targetServer, error);
this.succ = false;
} else {
this.succ = true;
}
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(MasterProcedureEnv env) {
return false;
}
@Override
protected boolean waitInitialized(MasterProcedureEnv env) {
return env.waitInitialized(this);
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder()
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue)
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
ClaimReplicationQueueRemoteStateData data =
serializer.deserialize(ClaimReplicationQueueRemoteStateData.class);
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
queue = data.getQueue();
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* Used to assign the replication queues of a dead server to other region servers.
*/
@InterfaceAudience.Private
public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv>
implements ServerProcedureInterface {
private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
private ServerName crashedServer;
private RetryCounter retryCounter;
public ClaimReplicationQueuesProcedure() {
}
public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
this.crashedServer = crashedServer;
}
@Override
public ServerName getServerName() {
return crashedServer;
}
@Override
public boolean hasMetaTableRegion() {
return false;
}
@Override
public ServerOperationType getServerOperationType() {
return ServerOperationType.CLAIM_REPLICATION_QUEUES;
}
@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
try {
List<String> queues = storage.getAllQueues(crashedServer);
if (queues.isEmpty()) {
LOG.debug("Finish claiming replication queues for {}", crashedServer);
storage.removeReplicatorIfQueueIsEmpty(crashedServer);
// we are done
return null;
}
LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
crashedServer);
List<ServerName> targetServers =
env.getMasterServices().getServerManager().getOnlineServersList();
if (targetServers.isEmpty()) {
throw new ReplicationException("no region server available");
}
Collections.shuffle(targetServers);
ClaimReplicationQueueRemoteProcedure[] procs =
new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())];
for (int i = 0; i < procs.length; i++) {
procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i),
targetServers.get(i));
}
return procs;
} catch (ReplicationException e) {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer,
backoff / 1000, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
}
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(MasterProcedureEnv env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(ClaimReplicationQueuesStateData.newBuilder()
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
ClaimReplicationQueuesStateData data =
serializer.deserialize(ClaimReplicationQueuesStateData.class);
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public abstract class BaseRSProcedureCallable implements RSProcedureCallable {
protected HRegionServer rs;
private Exception initError;
@Override
public final Void call() throws Exception {
if (initError != null) {
throw initError;
}
doCall();
return null;
}
@Override
public final void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
initParameter(parameter);
} catch (Exception e) {
initError = e;
}
}
protected abstract void doCall() throws Exception;
protected abstract void initParameter(byte[] parameter) throws Exception;
}

View File

@ -2104,6 +2104,10 @@ public class HRegionServer extends Thread implements
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
final int claimReplicationQueueThreads =
conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);

View File

@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
/**
@ -43,27 +43,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
* we switch to procedure-based WAL splitting.
*/
@InterfaceAudience.Private
public class SplitWALCallable implements RSProcedureCallable {
private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class);
public class SplitWALCallable extends BaseRSProcedureCallable {
private String walPath;
private Exception initError;
private HRegionServer rs;
private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
private volatile Lock splitWALLock = null;
@Override
public void init(byte[] parameter, HRegionServer rs) {
try {
this.rs = rs;
MasterProcedureProtos.SplitWALParameter param =
MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
this.walPath = param.getWalPath();
} catch (InvalidProtocolBufferException e) {
LOG.error("Parse proto buffer of split WAL request failed ", e);
initError = e;
}
protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException {
MasterProcedureProtos.SplitWALParameter param =
MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
this.walPath = param.getWalPath();
}
@Override
@ -90,10 +81,7 @@ public class SplitWALCallable implements RSProcedureCallable {
}
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
protected void doCall() throws Exception {
//grab a lock
splitWALLock = splitWALLocks.acquireLock(walPath);
try {
@ -110,7 +98,6 @@ public class SplitWALCallable implements RSProcedureCallable {
} finally {
splitWALLock.unlock();
}
return null;
}
public String getWalPath() {

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
@InterfaceAudience.Private
public class ClaimReplicationQueueCallable extends BaseRSProcedureCallable {
private ServerName crashedServer;
private String queue;
@Override
public EventType getEventType() {
return EventType.RS_CLAIM_REPLICATION_QUEUE;
}
@Override
protected void doCall() throws Exception {
PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler();
handler.claimReplicationQueue(crashedServer, queue);
}
@Override
protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException {
ClaimReplicationQueueRemoteParameter param =
ClaimReplicationQueueRemoteParameter.parseFrom(parameter);
crashedServer = ProtobufUtil.toServerName(param.getCrashedServer());
queue = param.getQueue();
}
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -36,21 +35,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -302,18 +297,13 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
boolean hdfs) throws Exception {
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
ReplicationTracker replicationTracker;
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationTracker = ReplicationFactory
.getReplicationTracker(ReplicationTrackerParams.create(getConf(), new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).zookeeper(zkw));
Set<ServerName> liveRegionServers =
new HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)
.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
// Loops each peer on each RS and dumps the queues
List<ServerName> regionservers = queueStorage.getListOfReplicators();
@ -416,16 +406,4 @@ public class DumpReplicationQueues extends Configured implements Tool {
return false;
}
}
private static class WarnOnlyStoppable implements Stoppable {
@Override
public void stop(String why) {
LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why);
}
@Override
public boolean isStopped() {
return false;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
@ -40,4 +41,7 @@ public interface PeerProcedureHandler {
void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
throws ReplicationException, IOException;
void claimReplicationQueue(ServerName crashedServer, String queue)
throws ReplicationException, IOException;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.replication.ReplicationException;
@ -221,4 +222,10 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
peerLock.unlock();
}
}
@Override
public void claimReplicationQueue(ServerName crashedServer, String queue)
throws ReplicationException, IOException {
replicationSourceManager.claimQueue(crashedServer, queue);
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,26 +32,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
* The callable executed at RS side to refresh the peer config/state. <br/>
*/
@InterfaceAudience.Private
public class RefreshPeerCallable implements RSProcedureCallable {
public class RefreshPeerCallable extends BaseRSProcedureCallable {
private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerCallable.class);
private HRegionServer rs;
private String peerId;
private PeerModificationType type;
private int stage;
private Exception initError;
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
protected void doCall() throws Exception {
LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type);
PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler();
switch (type) {
@ -77,20 +68,14 @@ public class RefreshPeerCallable implements RSProcedureCallable {
default:
throw new IllegalArgumentException("Unknown peer modification type: " + type);
}
return null;
}
@Override
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
this.stage = param.getStage();
} catch (InvalidProtocolBufferException e) {
initError = e;
}
protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException {
RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
this.stage = param.getStage();
}
@Override

View File

@ -22,15 +22,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@ -53,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
* This callable executed at RS side to replay sync replication wal.
*/
@InterfaceAudience.Private
public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable {
private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class);
@ -62,27 +59,16 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024;
private HRegionServer rs;
private FileSystem fs;
private Configuration conf;
private String peerId;
private List<String> wals = new ArrayList<>();
private Exception initError;
private long batchSize;
private final KeyLocker<String> peersLock = new KeyLocker<>();
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
protected void doCall() throws Exception {
LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId);
if (rs.getReplicationSinkService() != null) {
Lock peerLock = peersLock.acquireLock(wals.get(0));
@ -94,24 +80,16 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
peerLock.unlock();
}
}
return null;
}
@Override
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
this.fs = rs.getWALFileSystem();
this.conf = rs.getConfiguration();
try {
ReplaySyncReplicationWALParameter param =
ReplaySyncReplicationWALParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
param.getWalList().forEach(this.wals::add);
this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
} catch (InvalidProtocolBufferException e) {
initError = e;
}
protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException {
ReplaySyncReplicationWALParameter param =
ReplaySyncReplicationWALParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
param.getWalList().forEach(this.wals::add);
this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
}
@Override
@ -139,7 +117,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
Path path = new Path(rs.getWALRootDir(), wal);
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
try {
RecoverLeaseFSUtils.recoverFileLease(fs, path, conf);
RecoverLeaseFSUtils.recoverFileLease(rs.getWALFileSystem(), path, rs.getConfiguration());
return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration());
} catch (EOFException e) {
if (length <= 0) {

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -37,8 +36,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
@ -62,7 +59,6 @@ public class Replication implements ReplicationSourceService {
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
@ -102,10 +98,6 @@ public class Replication implements ReplicationSourceService {
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init();
this.replicationTracker = ReplicationFactory
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(), server)
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
.connection(server.getConnection()));
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
@ -118,9 +110,8 @@ public class Replication implements ReplicationSourceService {
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
mapping, globalMetricsSource);
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf,
this.server, fs, logDir, oldLogDir, clusterId, walFactory, mapping, globalMetricsSource);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;

View File

@ -34,9 +34,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -46,7 +44,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@ -55,7 +52,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -63,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
@ -111,23 +106,23 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
* will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
* {@link ReplicationSourceInterface}. So there is no race here. For
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
* is already synchronized on {@link #oldsources}. So no need synchronized on
* {@link #walsByIdRecoveredQueues}.</li>
* {@link #walsByIdRecoveredQueues}. And
* {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to
* {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
* there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and
* {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
* synchronized on {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
* </ul>
*/
@InterfaceAudience.Private
public class ReplicationSourceManager implements ReplicationListener {
public class ReplicationSourceManager {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one replication source
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@ -142,7 +137,6 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
private final ReplicationQueueStorage queueStorage;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
private final UUID clusterId;
@ -208,7 +202,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param oldLogDir the directory where old logs are archived
*/
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
ReplicationPeers replicationPeers, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFactory walFactory,
SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
@ -216,7 +210,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
this.walsById = new ConcurrentHashMap<>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
@ -230,7 +223,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.clusterId = clusterId;
this.walFactory = walFactory;
this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1);
@ -254,12 +246,9 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* Adds a normal source per registered peer cluster and tries to process all old region server wal
* queues
* <p>
* The returned future is for adoptAbandonedQueues task.
* Adds a normal source per registered peer cluster.
*/
Future<?> init() throws IOException {
void init() throws IOException {
for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@ -268,38 +257,6 @@ public class ReplicationSourceManager implements ReplicationListener {
throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
}
}
return this.executor.submit(this::adoptAbandonedQueues);
}
private void adoptAbandonedQueues() {
List<ServerName> currentReplicators = null;
try {
currentReplicators = queueStorage.getListOfReplicators();
} catch (ReplicationException e) {
server.abort("Failed to get all replicators", e);
return;
}
Set<ServerName> liveRegionServers;
try {
// must call this method to load the first snapshot of live region servers and initialize
// listeners
liveRegionServers = replicationTracker.loadLiveRegionServersAndInitializeListeners();
} catch (IOException e) {
server.abort("Failed load live region server list for replication", e);
return;
}
LOG.info("Current list of replicators: {}, live RSes: {}", currentReplicators,
liveRegionServers);
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
if (!liveRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
/**
@ -838,185 +795,116 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
@Override
public void regionServerRemoved(ServerName regionserver) {
transferQueues(regionserver);
}
/**
* Transfer all the queues of the specified to this region server. First it tries to grab a lock
* and if it works it will move the old queues and finally will delete the old queues.
* <p>
* It creates one old source for any type of source of the old rs.
*/
private void transferQueues(ServerName deadRS) {
if (server.getServerName().equals(deadRS)) {
// it's just us, give up
void claimQueue(ServerName deadRS, String queue) {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover +
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (server.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
// After claim the queues from dead region server, wewill skip to start the
// RecoveredReplicationSource if the peer has been removed. but there's possible that remove a
// peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get
// a copy of the replication peer first to decide whether we should start the
// RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
// start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
String peerId = new ReplicationQueueInfo(queue).getPeerId();
ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
if (oldPeer == null) {
LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
peerId, queue);
return;
}
Pair<String, SortedSet<String>> claimedQueue;
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName());
} catch (ReplicationException e) {
LOG.error(
"ReplicationException: cannot claim dead region ({})'s " +
"replication queue. Znode : ({})" +
" Possible solution: check if znode size exceeds jute.maxBuffer value. " +
" If so, increase it for both client and server side.",
deadRS, queueStorage.getRsNode(deadRS), e);
server.abort("Failed to claim queue from dead regionserver.", e);
return;
}
}
/**
* Class responsible to setup new ReplicationSources to take care of the queues from dead region
* servers.
*/
class NodeFailoverWorker extends Thread {
private final ServerName deadRS;
// After claim the queues from dead region server, the NodeFailoverWorker will skip to start
// the RecoveredReplicationSource if the peer has been removed. but there's possible that
// remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
// NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
// should start the RecoveredReplicationSource. If the latest peer is not the old peer when
// NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
// the rs will abort (See HBASE-20475).
private final Map<String, ReplicationPeerImpl> peersSnapshot;
public NodeFailoverWorker(ServerName deadRS) {
super("Failover-for-" + deadRS);
this.deadRS = deadRS;
peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
if (claimedQueue.getSecond().isEmpty()) {
return;
}
String queueId = claimedQueue.getFirst();
Set<String> walsSet = claimedQueue.getSecond();
ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null || peer != oldPeer) {
LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
return;
}
if (server instanceof ReplicationSyncUp.DummyServer &&
peer.getPeerState().equals(PeerState.DISABLED)) {
LOG.warn(
"Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.",
peerId);
return;
}
private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
return oldPeerRef != null && oldPeerRef == newPeerRef;
ReplicationSourceInterface src;
try {
src = createSource(queueId, peer);
} catch (IOException e) {
LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e);
server.abort("Failed to create replication source after claiming queue.", e);
return;
}
@Override
public void run() {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover +
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
// We try to lock that rs' queue directory
if (server.isStopped()) {
LOG.info("Not transferring queue since we are shutting down");
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) {
peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || peer != oldPeer) {
src.terminate("Recovered queue doesn't belong to any current peer");
deleteQueue(queueId);
return;
}
Map<String, Set<String>> newQueues = new HashMap<>();
try {
List<String> queues = queueStorage.getAllQueues(deadRS);
while (!queues.isEmpty()) {
Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName());
long sleep = sleepBeforeFailover / 2;
if (!peer.getSecond().isEmpty()) {
newQueues.put(peer.getFirst(), peer.getSecond());
sleep = sleepBeforeFailover;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
queues = queueStorage.getAllQueues(deadRS);
}
if (queues.isEmpty()) {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
} catch (ReplicationException e) {
LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
"replication queue. Znode : (%s)" +
" Possible solution: check if znode size exceeds jute.maxBuffer value. " +
" If so, increase it for both client and server side.",
deadRS, queueStorage.getRsNode(deadRS)), e);
server.abort("Failed to claim queue from dead regionserver.", e);
return;
}
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
// We either didn't get the lock or the failed region server didn't have any outstanding
// WALs to replicate, so we are done.
return;
}
for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
String queueId = entry.getKey();
Set<String> walsSet = entry.getValue();
try {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
if (peer == null || !isOldPeer(actualPeerId, peer)) {
LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
continue;
}
if (server instanceof ReplicationSyncUp.DummyServer
&& peer.getPeerState().equals(PeerState.DISABLED)) {
LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
+ "replicating data to this peer.",
actualPeerId);
continue;
}
ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) {
peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
src.terminate("Recovered queue doesn't belong to any current peer");
deleteQueue(queueId);
continue;
}
// Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
// transiting to STANDBY state. The only exception is we are in STANDBY state and
// transiting to DA, under this state we will replay the remote WAL and they need to be
// replicated back.
if (peer.getPeerConfig().isSyncReplication()) {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
peer.getSyncReplicationStateAndNewState();
if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) &&
stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) ||
stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
src.terminate("Sync replication peer is in STANDBY state");
deleteQueue(queueId);
continue;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
LOG.info("Added source for recovered queue {}", src.getQueueId());
for (String wal : walsSet) {
LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
}
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
// Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
// transiting to STANDBY state. The only exception is we are in STANDBY state and
// transiting to DA, under this state we will replay the remote WAL and they need to be
// replicated back.
if (peer.getPeerConfig().isSyncReplication()) {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
peer.getSyncReplicationStateAndNewState();
if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) &&
stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) ||
stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
src.terminate("Sync replication peer is in STANDBY state");
deleteQueue(queueId);
return;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
LOG.info("Added source for recovered queue {}", src.getQueueId());
for (String wal : walsSet) {
LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
}
}
@ -1249,4 +1137,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
return crs.startup();
}
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
}

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@ -31,18 +35,21 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
* tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
* will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
* will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase
* crashes
*
* <pre>
@ -62,6 +69,29 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.exit(ret);
}
private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException {
List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
return rsZNodes == null ? Collections.emptySet() :
rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
}
// When using this tool, usually the source cluster is unhealthy, so we should try to claim the
// replication queues for the dead region servers first and then replicate the data out.
private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr)
throws ReplicationException, KeeperException {
List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators();
Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
for (ServerName sn : replicators) {
if (!liveRegionServers.contains(sn)) {
List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn);
System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
for (String queue : replicationQueues) {
mgr.claimQueue(sn, queue);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() {
@ -88,7 +118,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
new WALFactory(conf, "test", null, false));
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init().get();
manager.init();
claimReplicationQueues(zkw, manager);
while (manager.activeFailoverTaskCount() > 0) {
Thread.sleep(SLEEP_TIME);
}

View File

@ -18,41 +18,30 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
/**
* The callable executed at RS side to switch rpc throttle state. <br/>
*/
@InterfaceAudience.Private
public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
private HRegionServer rs;
public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable {
private boolean rpcThrottleEnabled;
private Exception initError;
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
protected void doCall() throws Exception {
rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
return null;
}
@Override
public void init(byte[] parameter, HRegionServer rs) {
this.rs = rs;
try {
SwitchRpcThrottleRemoteStateData param =
SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
rpcThrottleEnabled = param.getRpcThrottleEnabled();
} catch (InvalidProtocolBufferException e) {
initError = e;
}
protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException {
SwitchRpcThrottleRemoteStateData param = SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
rpcThrottleEnabled = param.getRpcThrottleEnabled();
}
@Override

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP,
* this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works correctly.
*/
@Category({ ReplicationTests.class, LargeTests.class })
public class TestClaimReplicationQueue extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClaimReplicationQueue.class);
private static final TableName tableName3 = TableName.valueOf("test3");
private static final String PEER_ID3 = "3";
private static Table table3;
private static Table table4;
private static volatile boolean EMPTY = false;
public static final class ServerManagerForTest extends ServerManager {
public ServerManagerForTest(MasterServices master) {
super(master);
}
@Override
public List<ServerName> getOnlineServersList() {
// return no region server to make the procedure hang
if (EMPTY) {
for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
if (e.getClassName().equals(ClaimReplicationQueuesProcedure.class.getName())) {
return Collections.emptyList();
}
}
}
return super.getOnlineServersList();
}
}
public static final class HMasterForTest extends HMaster {
public HMasterForTest(Configuration conf) throws IOException {
super(conf);
}
@Override
protected ServerManager createServerManager(MasterServices master) throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master);
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
TestReplicationBase.setUpBeforeClass();
createTable(tableName3);
table3 = connection1.getTable(tableName3);
table4 = connection2.getTable(tableName3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(table3, true);
Closeables.close(table4, true);
TestReplicationBase.tearDownAfterClass();
}
@Override
public void setUpBase() throws Exception {
super.setUpBase();
// set up two replication peers and only 1 rs to test claim replication queue with multiple
// round
addPeer(PEER_ID3, tableName3);
}
@Override
public void tearDownBase() throws Exception {
super.tearDownBase();
removePeer(PEER_ID3);
}
@Test
public void testClaim() throws Exception {
// disable the peers
hbaseAdmin.disableReplicationPeer(PEER_ID2);
hbaseAdmin.disableReplicationPeer(PEER_ID3);
// put some data
int count1 = UTIL1.loadTable(htable1, famName);
int count2 = UTIL1.loadTable(table3, famName);
EMPTY = true;
UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
UTIL1.getMiniHBaseCluster().startRegionServer();
// since there is no active region server to get the replication queue, the procedure should be
// in WAITING_TIMEOUT state for most time to retry
HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
UTIL1.waitFor(30000,
() -> master.getProcedures().stream()
.filter(p -> p instanceof ClaimReplicationQueuesProcedure)
.anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
hbaseAdmin.enableReplicationPeer(PEER_ID2);
hbaseAdmin.enableReplicationPeer(PEER_ID3);
EMPTY = false;
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
// we should get all the data in the target cluster
waitForReplication(htable2, count1, NB_RETRIES);
waitForReplication(table4, count2, NB_RETRIES);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -55,6 +56,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -68,8 +70,8 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
*/
public class TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
private static Connection connection1;
private static Connection connection2;
protected static Connection connection1;
protected static Connection connection2;
protected static Configuration CONF_WITH_LOCALFS;
protected static Admin hbaseAdmin;
@ -147,19 +149,22 @@ public class TestReplicationBase {
waitForReplication(htable2, expectedRows, retries);
}
protected static void waitForReplication(Table htable2, int expectedRows, int retries)
throws IOException, InterruptedException {
protected static void waitForReplication(Table table, int expectedRows, int retries)
throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();
if (i== retries -1) {
if (i == retries - 1) {
fail("Waited too much time for normal batch replication");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(expectedRows);
scanner.close();
if (res.length != expectedRows) {
LOG.info("Only got " + res.length + " rows");
int count = 0;
try (ResultScanner scanner = table.getScanner(scan)) {
while (scanner.next() != null) {
count++;
}
}
if (count != expectedRows) {
LOG.info("Only got " + count + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
@ -235,6 +240,18 @@ public class TestReplicationBase {
htable2 = UTIL2.getConnection().getTable(tableName);
}
protected static void createTable(TableName tableName)
throws IOException {
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
UTIL1.waitUntilAllRegionsAssigned(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
}
private static void startClusters() throws Exception {
UTIL1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
@ -253,22 +270,9 @@ public class TestReplicationBase {
connection2 = ConnectionFactory.createConnection(CONF2);
hbaseAdmin = connection1.getAdmin();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
try (
Admin admin1 = connection1.getAdmin();
Admin admin2 = connection2.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
UTIL1.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
UTIL2.waitUntilAllRegionsAssigned(tableName);
htable2 = connection2.getTable(tableName);
}
createTable(tableName);
htable1 = connection1.getTable(tableName);
htable2 = connection2.getTable(tableName);
}
@BeforeClass
@ -281,12 +285,11 @@ public class TestReplicationBase {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
}
@Before
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
protected final void addPeer(String peerId, TableName tableName) throws Exception {
if (!peerExist(peerId)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
ReplicationEndpointTest.class.getName());
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
.setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
if (isSyncPeer()) {
FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to
@ -297,15 +300,24 @@ public class TestReplicationBase {
.setRemoteWALDir(new Path("/RemoteWAL")
.makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
}
hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
hbaseAdmin.addReplicationPeer(peerId, builder.build());
}
}
@Before
public void setUpBase() throws Exception {
addPeer(PEER_ID2, tableName);
}
protected final void removePeer(String peerId) throws Exception {
if (peerExist(peerId)) {
hbaseAdmin.removeReplicationPeer(peerId);
}
}
@After
public void tearDownBase() throws Exception {
if (peerExist(PEER_ID2)) {
hbaseAdmin.removeReplicationPeer(PEER_ID2);
}
removePeer(PEER_ID2);
}
protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {

View File

@ -75,7 +75,6 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -406,9 +405,7 @@ public abstract class TestReplicationSourceManager {
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
rp1.init();
NodeFailoverWorker w1 =
manager.new NodeFailoverWorker(server.getServerName());
w1.run();
manager.claimQueue(server.getServerName(), "1");
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
@ -432,8 +429,7 @@ public abstract class TestReplicationSourceManager {
rq.addWAL(server.getServerName(), "2", group + ".log1");
rq.addWAL(server.getServerName(), "2", group + ".log2");
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
w1.run();
manager.claimQueue(server.getServerName(), "2");
// The log of the unknown peer should be removed from zk
for (String peer : manager.getAllQueues()) {