diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 76b085d43c8..4f1a3cf1635 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -312,8 +312,9 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; - SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12; - SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13; + SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR = 12; + SERVER_CRASH_DELETE_SPLIT_WALS_DIR = 13; + SERVER_CRASH_CLAIM_REPLICATION_QUEUES = 14; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -624,3 +625,23 @@ enum SplitWALState{ DISPATCH_WAL_TO_WORKER = 2; RELEASE_SPLIT_WORKER = 3; } + +message ClaimReplicationQueuesStateData { + required ServerName crashed_server = 1; +} + +message ClaimReplicationQueueRemoteStateData { + required ServerName crashed_server = 1; + required string queue = 2; + required ServerName target_server = 3; +} + +message ClaimReplicationQueueRemoteParameter { + required ServerName crashed_server = 1; + required string queue = 2; +} + +enum ClaimReplicationQueuesState { + CLAIM_REPLICATION_QUEUES_DISPATCH = 1; + CLAIM_REPLICATION_QUEUES_FINISH = 2; +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java deleted file mode 100644 index c55a82ee33d..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link ReplicationTracker} implementation which polls the region servers list periodically from - * master. - */ -@InterfaceAudience.Private -class MasterReplicationTracker extends ReplicationTrackerBase { - - private static final Logger LOG = LoggerFactory.getLogger(MasterReplicationTracker.class); - - static final String REFRESH_INTERVAL_SECONDS = - "hbase.replication.tracker.master.refresh.interval.secs"; - - // default to refresh every 5 seconds - static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5; - - private final ChoreService choreService; - - private final ScheduledChore chore; - - private final Admin admin; - - private volatile Set regionServers; - - MasterReplicationTracker(ReplicationTrackerParams params) { - try { - this.admin = params.connection().getAdmin(); - } catch (IOException e) { - // should not happen - throw new AssertionError(e); - } - this.choreService = params.choreService(); - int refreshIntervalSecs = - params.conf().getInt(REFRESH_INTERVAL_SECONDS, REFRESH_INTERVAL_SECONDS_DEFAULT); - this.chore = new ScheduledChore(getClass().getSimpleName(), params.stopptable(), - refreshIntervalSecs, 0, TimeUnit.SECONDS) { - - @Override - protected void chore() { - try { - refresh(); - } catch (IOException e) { - LOG.warn("failed to refresh region server list for replication", e); - } - } - }; - } - - private Set reload() throws IOException { - return Collections.unmodifiableSet(new HashSet<>(admin.getRegionServers())); - - } - - private void refresh() throws IOException { - Set newRegionServers = reload(); - for (ServerName oldRs : regionServers) { - if (!newRegionServers.contains(oldRs)) { - notifyListeners(oldRs); - } - } - this.regionServers = newRegionServers; - } - - @Override - protected Set internalLoadLiveRegionServersAndInitializeListeners() - throws IOException { - Set newRegionServers = reload(); - this.regionServers = newRegionServers; - choreService.scheduleChore(chore); - return newRegionServers; - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index b4d33d674de..83421600aa0 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -36,10 +35,4 @@ public final class ReplicationFactory { public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) { return new ReplicationPeers(zk, conf); } - - public static ReplicationTracker getReplicationTracker(ReplicationTrackerParams params) { - Class clazz = params.conf().getClass(REPLICATION_TRACKER_IMPL, - ZKReplicationTracker.class, ReplicationTracker.class); - return ReflectionUtils.newInstance(clazz, params); - } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java deleted file mode 100644 index 2ffc984396e..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import java.util.Set; -import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * This is the interface for a Replication Tracker. - *

- * A replication tracker provides the facility to subscribe and track events that reflect a change - * in replication state. These events are used by the ReplicationSourceManager to coordinate - * replication tasks such as addition/deletion of queues and queue failover. These events are - * defined in the ReplicationListener interface. If a class would like to listen to replication - * events it must implement the ReplicationListener interface and register itself with a Replication - * Tracker. - */ -@InterfaceAudience.Private -public interface ReplicationTracker { - - /** - * Register a replication listener to receive replication events. - * @param listener the listener to register - */ - void registerListener(ReplicationListener listener); - - /** - * Remove a replication listener - * @param listener the listener to remove - */ - void removeListener(ReplicationListener listener); - - /** - * In this method, you need to load the newest list of region server list and return it, and all - * later changes to the region server list must be passed to the listeners. - *

- * This is very important for us to not miss a region server crash. - *

- * Notice that this method can only be called once. - * @return Set of region servers. - */ - Set loadLiveRegionServersAndInitializeListeners() throws IOException; -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java deleted file mode 100644 index 96a306110f7..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base implementation class for replication tracker. - */ -@InterfaceAudience.Private -abstract class ReplicationTrackerBase implements ReplicationTracker { - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerBase.class); - - // listeners to be notified - private final List listeners = new CopyOnWriteArrayList<>(); - - private final AtomicBoolean initialized = new AtomicBoolean(false); - - @Override - public void registerListener(ReplicationListener listener) { - listeners.add(listener); - } - - @Override - public void removeListener(ReplicationListener listener) { - listeners.remove(listener); - } - - protected final void notifyListeners(ServerName regionServer) { - LOG.info("{} is dead, triggering replicatorRemoved event", regionServer); - for (ReplicationListener listener : listeners) { - listener.regionServerRemoved(regionServer); - } - } - - @Override - public final Set loadLiveRegionServersAndInitializeListeners() throws IOException { - if (!initialized.compareAndSet(false, true)) { - throw new IllegalStateException( - "loadLiveRegionServersAndInitializeListeners can only be called once"); - } - return internalLoadLiveRegionServersAndInitializeListeners(); - } - - protected abstract Set internalLoadLiveRegionServersAndInitializeListeners() - throws IOException; - -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java deleted file mode 100644 index 9aeedcf4740..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Parameters for constructing a {@link ReplicationTracker}. - */ -@InterfaceAudience.Private -public final class ReplicationTrackerParams { - - private final Configuration conf; - - private final Stoppable stopptable; - - private ZKWatcher zookeeper; - - private Abortable abortable; - - private Connection conn; - - private ChoreService choreService; - - private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) { - this.conf = conf; - this.stopptable = stopptable; - } - - public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) { - this.zookeeper = zookeeper; - return this; - } - - public ReplicationTrackerParams abortable(Abortable abortable) { - this.abortable = abortable; - return this; - } - - public ReplicationTrackerParams connection(Connection conn) { - this.conn = conn; - return this; - } - - public ReplicationTrackerParams choreService(ChoreService choreService) { - this.choreService = choreService; - return this; - } - - public Configuration conf() { - return conf; - } - - public Stoppable stopptable() { - return stopptable; - } - - public ZKWatcher zookeeper() { - return zookeeper; - } - - public Abortable abortable() { - return abortable; - } - - public Connection connection() { - return conn; - } - - public ChoreService choreService() { - return choreService; - } - - public static ReplicationTrackerParams create(Configuration conf, Stoppable stopptable) { - return new ReplicationTrackerParams(conf, stopptable); - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java deleted file mode 100644 index b74187a4590..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -import org.apache.hbase.thirdparty.com.google.common.base.Splitter; - -/** - * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is - * responsible for handling replication events that are defined in the ReplicationListener - * interface. - */ -@InterfaceAudience.Private -class ZKReplicationTracker extends ReplicationTrackerBase { - - // Zookeeper - private final ZKWatcher zookeeper; - // Server to abort. - private final Abortable abortable; - // All about stopping - private final Stoppable stopper; - // List of all the other region servers in this cluster - private final Set regionServers = new HashSet<>(); - - ZKReplicationTracker(ReplicationTrackerParams params) { - this.zookeeper = params.zookeeper(); - this.abortable = params.abortable(); - this.stopper = params.stopptable(); - this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); - } - - /** - * Watcher used to be notified of the other region server's death in the local cluster. It - * initiates the process to transfer the queues if it is able to grab the lock. - */ - public class OtherRegionServerWatcher extends ZKListener { - - /** - * Construct a ZooKeeper event listener. - */ - public OtherRegionServerWatcher(ZKWatcher watcher) { - super(watcher); - } - - /** - * Called when a new node has been created. - * @param path full path of the new node - */ - @Override - public void nodeCreated(String path) { - if (stopper.isStopped()) { - return; - } - refreshListIfRightPath(path); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - @Override - public void nodeDeleted(String path) { - if (stopper.isStopped()) { - return; - } - if (!refreshListIfRightPath(path)) { - return; - } - notifyListeners(ServerName.valueOf(getZNodeName(path))); - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - @Override - public void nodeChildrenChanged(String path) { - if (stopper.isStopped()) { - return; - } - refreshListIfRightPath(path); - } - - private boolean refreshListIfRightPath(String path) { - if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) { - return false; - } - return refreshRegionServerList(); - } - } - - /** - * Extracts the znode name of a peer cluster from a ZK path - * @param fullPath Path to extract the id from - * @return the id or an empty string if path is invalid - */ - private String getZNodeName(String fullPath) { - List parts = Splitter.on('/').splitToList(fullPath); - return parts.size() > 0 ? parts.get(parts.size() - 1) : ""; - } - - /** - * Reads the list of region servers from ZK and atomically clears our local view of it and - * replaces it with the updated list. - * @return true if the local list of the other region servers was updated with the ZK data (even - * if it was empty), false if the data was missing in ZK - */ - private boolean refreshRegionServerList() { - Set newRsList = getRegisteredRegionServers(); - if (newRsList == null) { - return false; - } else { - synchronized (regionServers) { - regionServers.clear(); - regionServers.addAll(newRsList); - } - } - return true; - } - - /** - * Get a list of all the other region servers in this cluster and set a watch - * @return a list of server nanes - */ - private Set getRegisteredRegionServers() { - List result = null; - try { - result = - ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode); - } catch (KeeperException e) { - this.abortable.abort("Get list of registered region servers", e); - } - return result == null ? null : - result.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); - } - - @Override - protected Set internalLoadLiveRegionServersAndInitializeListeners() - throws IOException { - if (!refreshRegionServerList()) { - throw new IOException("failed to refresh region server list"); - } - synchronized (regionServers) { - return new HashSet<>(regionServers); - } - } -} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java deleted file mode 100644 index 270e9f62eb4..00000000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.assertEquals; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for testing {@link ReplicationTracker} and {@link ReplicationListener}. - */ -public abstract class ReplicationTrackerTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerTestBase.class); - - private ReplicationTracker rt; - - private AtomicInteger rsRemovedCount; - - private volatile ServerName rsRemovedData; - - @Before - public void setUp() { - ReplicationTrackerParams params = createParams(); - rt = ReplicationFactory.getReplicationTracker(params); - rsRemovedCount = new AtomicInteger(0); - rsRemovedData = null; - } - - protected abstract ReplicationTrackerParams createParams(); - - protected abstract void addServer(ServerName sn) throws Exception; - - protected abstract void removeServer(ServerName sn) throws Exception; - - @Test - public void testWatchRegionServers() throws Exception { - ServerName sn = - ServerName.valueOf("hostname2.example.org,1234," + EnvironmentEdgeManager.currentTime()); - addServer(sn); - rt.registerListener(new DummyReplicationListener()); - assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size()); - // delete one - removeServer(sn); - // wait for event - Waiter.waitFor(HBaseConfiguration.create(), 15000, () -> rsRemovedCount.get() >= 1); - assertEquals(sn, rsRemovedData); - } - - private class DummyReplicationListener implements ReplicationListener { - - @Override - public void regionServerRemoved(ServerName regionServer) { - rsRemovedData = regionServer; - rsRemovedCount.getAndIncrement(); - LOG.debug("Received regionServerRemoved event: " + regionServer); - } - } - - protected static class WarnOnlyStoppable implements Stoppable { - - @Override - public void stop(String why) { - LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " + why); - } - - @Override - public boolean isStopped() { - return false; - } - } - - protected static class WarnOnlyAbortable implements Abortable { - - @Override - public void abort(String why, Throwable e) { - LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " + why); - } - - @Override - public boolean isAborted() { - return false; - } - } -} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java deleted file mode 100644 index 357908f540e..00000000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestMasterReplicationTracker extends ReplicationTrackerTestBase { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMasterReplicationTracker.class); - - private static Configuration CONF; - - private static Connection CONN; - - private static ChoreService CHORE_SERVICE; - - private static List SERVERS = new CopyOnWriteArrayList<>(); - - @BeforeClass - public static void setUpBeforeClass() throws IOException { - CONF = HBaseConfiguration.create(); - CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, MasterReplicationTracker.class, - ReplicationTracker.class); - Admin admin = mock(Admin.class); - when(admin.getRegionServers()).thenReturn(SERVERS); - CONN = mock(Connection.class); - when(CONN.getAdmin()).thenReturn(admin); - CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker"); - } - - @AfterClass - public static void tearDownAfterClass() { - CHORE_SERVICE.shutdown(); - } - - @Override - protected ReplicationTrackerParams createParams() { - return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable()) - .abortable(new WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE); - } - - @Override - protected void addServer(ServerName sn) throws Exception { - SERVERS.add(sn); - } - - @Override - protected void removeServer(ServerName sn) throws Exception { - SERVERS.remove(sn); - } -} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java deleted file mode 100644 index 4ef42e33272..00000000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationTracker extends ReplicationTrackerTestBase { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKReplicationTracker.class); - - private static Configuration CONF; - - private static HBaseZKTestingUtility UTIL; - - private static ZKWatcher ZKW; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - UTIL = new HBaseZKTestingUtility(); - UTIL.startMiniZKCluster(); - CONF = UTIL.getConfiguration(); - CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, ZKReplicationTracker.class, - ReplicationTracker.class); - ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL); - ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); - ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL); - } - - @Override - protected ReplicationTrackerParams createParams() { - return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable()) - .abortable(new WarnOnlyAbortable()).zookeeper(ZKW); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - Closeables.close(ZKW, true); - UTIL.shutdownMiniZKCluster(); - } - - @Override - protected void addServer(ServerName sn) throws Exception { - ZKUtil.createAndWatch(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()), - HConstants.EMPTY_BYTE_ARRAY); - } - - @Override - protected void removeServer(ServerName sn) throws Exception { - ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString())); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 600c96cc026..a39493cc262 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -294,7 +294,14 @@ public enum EventType { * * RS_REPLAY_SYNC_REPLICATION_WAL */ - RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL), + + /** + * RS claim replication queue.
+ * + * RS_CLAIM_REPLICATION_QUEUE + */ + RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 36958c518a6..120f9bea5b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -52,7 +52,8 @@ public enum ExecutorType { RS_REFRESH_PEER(31), RS_REPLAY_SYNC_REPLICATION_WAL(32), RS_SWITCH_RPC_THROTTLE(33), - RS_IN_MEMORY_COMPACTION(34); + RS_IN_MEMORY_COMPACTION(34), + RS_CLAIM_REPLICATION_QUEUE(35); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index e7fba555c9c..49449e397f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.SplitWALManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -234,11 +235,15 @@ public class ServerCrashProcedure } assignRegions(env, regionsOnCrashedServer); } - setNextState(ServerCrashState.SERVER_CRASH_FINISH); + setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES); break; case SERVER_CRASH_HANDLE_RIT2: // Noop. Left in place because we used to call handleRIT here for a second time // but no longer necessary since HBASE-20634. + setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES); + break; + case SERVER_CRASH_CLAIM_REPLICATION_QUEUES: + addChildProcedure(new ClaimReplicationQueuesProcedure(serverName)); setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index eb0583bf6ea..a7abfdc13f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,18 +27,30 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER, SWITCH_RPC_THROTTLE, + CRASH_HANDLER, + SWITCH_RPC_THROTTLE, + /** - * help find a available region server as worker and release worker after task done - * invoke SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker - * manage the split wal task flow, will retry if SPLIT_WAL_REMOTE failed + * help find a available region server as worker and release worker after task done invoke + * SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker manage the split wal + * task flow, will retry if SPLIT_WAL_REMOTE failed */ SPLIT_WAL, /** * send the split WAL request to region server and handle the response */ - SPLIT_WAL_REMOTE + SPLIT_WAL_REMOTE, + + /** + * Get all the replication queues of a crash server and assign them to other region servers + */ + CLAIM_REPLICATION_QUEUES, + + /** + * send the claim replication queue request to region server to actually assign it + */ + CLAIM_REPLICATION_QUEUE_REMOTE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 1659ab5a3f8..726ee14c979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -38,6 +38,8 @@ class ServerQueue extends Queue { case SWITCH_RPC_THROTTLE: case SPLIT_WAL: case SPLIT_WAL_REMOTE: + case CLAIM_REPLICATION_QUEUES: + case CLAIM_REPLICATION_QUEUE_REMOTE: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java new file mode 100644 index 00000000000..c8c5704dfc0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; + +@InterfaceAudience.Private +public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface, RemoteProcedure { + + private static final Logger LOG = + LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class); + + private ServerName crashedServer; + + private String queue; + + public ClaimReplicationQueueRemoteProcedure() { + } + + public ClaimReplicationQueueRemoteProcedure(ServerName crashedServer, String queue, + ServerName targetServer) { + this.crashedServer = crashedServer; + this.queue = queue; + this.targetServer = targetServer; + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + assert targetServer.equals(remote); + return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class, + ClaimReplicationQueueRemoteParameter.newBuilder() + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build() + .toByteArray())); + } + + @Override + public ServerName getServerName() { + // return crashed server here, as we are going to recover its replication queues so we should + // use its scheduler queue instead of the one for the target server. + return crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE; + } + + @Override + protected void complete(MasterProcedureEnv env, Throwable error) { + if (error != null) { + LOG.warn("Failed to claim replication queue {} of crashed server on server {} ", queue, + crashedServer, targetServer, error); + this.succ = false; + } else { + this.succ = true; + } + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder() + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + ClaimReplicationQueueRemoteStateData data = + serializer.deserialize(ClaimReplicationQueueRemoteStateData.class); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + queue = data.getQueue(); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java new file mode 100644 index 00000000000..5a35c3fbc10 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Used to assign the replication queues of a dead server to other region servers. + */ +@InterfaceAudience.Private +public class ClaimReplicationQueuesProcedure extends Procedure + implements ServerProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class); + + private ServerName crashedServer; + + private RetryCounter retryCounter; + + public ClaimReplicationQueuesProcedure() { + } + + public ClaimReplicationQueuesProcedure(ServerName crashedServer) { + this.crashedServer = crashedServer; + } + + @Override + public ServerName getServerName() { + return crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CLAIM_REPLICATION_QUEUES; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); + try { + List queues = storage.getAllQueues(crashedServer); + if (queues.isEmpty()) { + LOG.debug("Finish claiming replication queues for {}", crashedServer); + storage.removeReplicatorIfQueueIsEmpty(crashedServer); + // we are done + return null; + } + LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(), + crashedServer); + List targetServers = + env.getMasterServices().getServerManager().getOnlineServersList(); + if (targetServers.isEmpty()) { + throw new ReplicationException("no region server available"); + } + Collections.shuffle(targetServers); + ClaimReplicationQueueRemoteProcedure[] procs = + new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())]; + for (int i = 0; i < procs.length; i++) { + procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i), + targetServers.get(i)); + } + return procs; + } catch (ReplicationException e) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer, + backoff / 1000, e); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize(ClaimReplicationQueuesStateData.newBuilder() + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + ClaimReplicationQueuesStateData data = + serializer.deserialize(ClaimReplicationQueuesStateData.class); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java new file mode 100644 index 00000000000..76b749e221e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public abstract class BaseRSProcedureCallable implements RSProcedureCallable { + + protected HRegionServer rs; + + private Exception initError; + + @Override + public final Void call() throws Exception { + if (initError != null) { + throw initError; + } + doCall(); + return null; + } + + @Override + public final void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + try { + initParameter(parameter); + } catch (Exception e) { + initError = e; + } + } + + protected abstract void doCall() throws Exception; + + protected abstract void initParameter(byte[] parameter) throws Exception; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7783bff32c4..c00a8b7d068 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2104,6 +2104,10 @@ public class HRegionServer extends Thread implements conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads)); + final int claimReplicationQueueThreads = + conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java index 03f2061c17a..319eccab583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; /** @@ -43,27 +43,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; * we switch to procedure-based WAL splitting. */ @InterfaceAudience.Private -public class SplitWALCallable implements RSProcedureCallable { - private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class); +public class SplitWALCallable extends BaseRSProcedureCallable { private String walPath; - private Exception initError; - private HRegionServer rs; private final KeyLocker splitWALLocks = new KeyLocker<>(); private volatile Lock splitWALLock = null; @Override - public void init(byte[] parameter, HRegionServer rs) { - try { - this.rs = rs; - MasterProcedureProtos.SplitWALParameter param = - MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); - this.walPath = param.getWalPath(); - } catch (InvalidProtocolBufferException e) { - LOG.error("Parse proto buffer of split WAL request failed ", e); - initError = e; - } + protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { + MasterProcedureProtos.SplitWALParameter param = + MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); + this.walPath = param.getWalPath(); } @Override @@ -90,10 +81,7 @@ public class SplitWALCallable implements RSProcedureCallable { } @Override - public Void call() throws Exception { - if (initError != null) { - throw initError; - } + protected void doCall() throws Exception { //grab a lock splitWALLock = splitWALLocks.acquireLock(walPath); try { @@ -110,7 +98,6 @@ public class SplitWALCallable implements RSProcedureCallable { } finally { splitWALLock.unlock(); } - return null; } public String getWalPath() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java new file mode 100644 index 00000000000..ddae7311225 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; + +@InterfaceAudience.Private +public class ClaimReplicationQueueCallable extends BaseRSProcedureCallable { + + private ServerName crashedServer; + + private String queue; + + @Override + public EventType getEventType() { + return EventType.RS_CLAIM_REPLICATION_QUEUE; + } + + @Override + protected void doCall() throws Exception { + PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); + handler.claimReplicationQueue(crashedServer, queue); + } + + @Override + protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { + ClaimReplicationQueueRemoteParameter param = + ClaimReplicationQueueRemoteParameter.parseFrom(parameter); + crashedServer = ProtobufUtil.toServerName(param.getCrashedServer()); + queue = param.getQueue(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index f27cf083329..11b0c7ca806 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -36,21 +35,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.io.WALLink; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.ReplicationTrackerParams; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -302,18 +297,13 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - public String dumpQueues(ZKWatcher zkw, Set peerIds, - boolean hdfs) throws Exception { + public String dumpQueues(ZKWatcher zkw, Set peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; - ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - replicationTracker = ReplicationFactory - .getReplicationTracker(ReplicationTrackerParams.create(getConf(), new WarnOnlyStoppable()) - .abortable(new WarnOnlyAbortable()).zookeeper(zkw)); - Set liveRegionServers = - new HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners()); + Set liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode) + .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); // Loops each peer on each RS and dumps the queues List regionservers = queueStorage.getListOfReplicators(); @@ -416,16 +406,4 @@ public class DumpReplicationQueues extends Configured implements Tool { return false; } } - - private static class WarnOnlyStoppable implements Stoppable { - @Override - public void stop(String why) { - LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why); - } - - @Override - public boolean isStopped() { - return false; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java index 52b604bef04..2fe3110d797 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; @@ -40,4 +41,7 @@ public interface PeerProcedureHandler { void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) throws ReplicationException, IOException; + + void claimReplicationQueue(ServerName crashedServer, String queue) + throws ReplicationException, IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index d01b1305039..a50d74a448b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.locks.Lock; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -221,4 +222,10 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { peerLock.unlock(); } } + + @Override + public void claimReplicationQueue(ServerName crashedServer, String queue) + throws ReplicationException, IOException { + replicationSourceManager.claimQueue(crashedServer, queue); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 00991d2272d..0c07b1125b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -18,8 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; -import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,26 +32,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R * The callable executed at RS side to refresh the peer config/state.
*/ @InterfaceAudience.Private -public class RefreshPeerCallable implements RSProcedureCallable { +public class RefreshPeerCallable extends BaseRSProcedureCallable { private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerCallable.class); - private HRegionServer rs; - private String peerId; private PeerModificationType type; private int stage; - private Exception initError; - @Override - public Void call() throws Exception { - if (initError != null) { - throw initError; - } - + protected void doCall() throws Exception { LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type); PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); switch (type) { @@ -77,20 +68,14 @@ public class RefreshPeerCallable implements RSProcedureCallable { default: throw new IllegalArgumentException("Unknown peer modification type: " + type); } - return null; } @Override - public void init(byte[] parameter, HRegionServer rs) { - this.rs = rs; - try { - RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter); - this.peerId = param.getPeerId(); - this.type = param.getType(); - this.stage = param.getStage(); - } catch (InvalidProtocolBufferException e) { - initError = e; - } + protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { + RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter); + this.peerId = param.getPeerId(); + this.type = param.getType(); + this.stage = param.getStage(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index e03bbe2b1c6..fa4167b1678 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -22,15 +22,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Lock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; @@ -53,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R * This callable executed at RS side to replay sync replication wal. */ @InterfaceAudience.Private -public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { +public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class); @@ -62,27 +59,16 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024; - private HRegionServer rs; - - private FileSystem fs; - - private Configuration conf; - private String peerId; private List wals = new ArrayList<>(); - private Exception initError; - private long batchSize; private final KeyLocker peersLock = new KeyLocker<>(); @Override - public Void call() throws Exception { - if (initError != null) { - throw initError; - } + protected void doCall() throws Exception { LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); if (rs.getReplicationSinkService() != null) { Lock peerLock = peersLock.acquireLock(wals.get(0)); @@ -94,24 +80,16 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { peerLock.unlock(); } } - return null; } @Override - public void init(byte[] parameter, HRegionServer rs) { - this.rs = rs; - this.fs = rs.getWALFileSystem(); - this.conf = rs.getConfiguration(); - try { - ReplaySyncReplicationWALParameter param = - ReplaySyncReplicationWALParameter.parseFrom(parameter); - this.peerId = param.getPeerId(); - param.getWalList().forEach(this.wals::add); - this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, - DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); - } catch (InvalidProtocolBufferException e) { - initError = e; - } + protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { + ReplaySyncReplicationWALParameter param = + ReplaySyncReplicationWALParameter.parseFrom(parameter); + this.peerId = param.getPeerId(); + param.getWalList().forEach(this.wals::add); + this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, + DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); } @Override @@ -139,7 +117,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { Path path = new Path(rs.getWALRootDir(), wal); long length = rs.getWALFileSystem().getFileStatus(path).getLen(); try { - RecoverLeaseFSUtils.recoverFileLease(fs, path, conf); + RecoverLeaseFSUtils.recoverFileLease(rs.getWALFileSystem(), path, rs.getConfiguration()); return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration()); } catch (EOFException e) { if (length <= 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 6c64bd63942..4cf2b495fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,8 +36,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.replication.ReplicationTrackerParams; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; @@ -62,7 +59,6 @@ public class Replication implements ReplicationSourceService { private ReplicationSourceManager replicationManager; private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; - private ReplicationTracker replicationTracker; private Configuration conf; private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider; // Hosting server @@ -102,10 +98,6 @@ public class Replication implements ReplicationSourceService { this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); - this.replicationTracker = ReplicationFactory - .getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(), server) - .abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService()) - .connection(server.getConnection())); } catch (Exception e) { throw new IOException("Failed replication handler create", e); } @@ -118,9 +110,8 @@ public class Replication implements ReplicationSourceService { SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.globalMetricsSource = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); - this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, - replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory, - mapping, globalMetricsSource); + this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf, + this.server, fs, logDir, oldLogDir, clusterId, walFactory, mapping, globalMetricsSource); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fd871e8091c..73efcfe6b5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -34,9 +34,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -46,7 +44,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -55,7 +52,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -63,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; @@ -111,23 +106,23 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto *

  • No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which * modify it, {@link #removePeer(String)} , * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and - * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. + * {@link ReplicationSourceManager#claimQueue(ServerName, String)}. * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the * {@link ReplicationSourceInterface} firstly, then remove the wals from - * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} - * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a - * {@link ReplicationSourceInterface}. So there is no race here. For - * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there - * is already synchronized on {@link #oldsources}. So no need synchronized on - * {@link #walsByIdRecoveredQueues}.
  • + * {@link #walsByIdRecoveredQueues}. And + * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to + * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So + * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and + * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need + * synchronized on {@link #walsByIdRecoveredQueues}. *
  • Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.
  • *
  • Need synchronized on {@link #oldsources} to avoid adding recovered source for the * to-be-removed peer.
  • * */ @InterfaceAudience.Private -public class ReplicationSourceManager implements ReplicationListener { +public class ReplicationSourceManager { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); // all the sources that read this RS's logs and every peer only has one replication source private final ConcurrentMap sources; @@ -142,7 +137,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ private final ReplicationQueueStorage queueStorage; - private final ReplicationTracker replicationTracker; private final ReplicationPeers replicationPeers; // UUID for this cluster private final UUID clusterId; @@ -208,7 +202,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, - ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, + ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, @@ -216,7 +210,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; - this.replicationTracker = replicationTracker; this.server = server; this.walsById = new ConcurrentHashMap<>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); @@ -230,7 +223,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.clusterId = clusterId; this.walFactory = walFactory; this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; - this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -254,12 +246,9 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Adds a normal source per registered peer cluster and tries to process all old region server wal - * queues - *

    - * The returned future is for adoptAbandonedQueues task. + * Adds a normal source per registered peer cluster. */ - Future init() throws IOException { + void init() throws IOException { for (String id : this.replicationPeers.getAllPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { @@ -268,38 +257,6 @@ public class ReplicationSourceManager implements ReplicationListener { throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); } } - return this.executor.submit(this::adoptAbandonedQueues); - } - - private void adoptAbandonedQueues() { - List currentReplicators = null; - try { - currentReplicators = queueStorage.getListOfReplicators(); - } catch (ReplicationException e) { - server.abort("Failed to get all replicators", e); - return; - } - Set liveRegionServers; - try { - // must call this method to load the first snapshot of live region servers and initialize - // listeners - liveRegionServers = replicationTracker.loadLiveRegionServersAndInitializeListeners(); - } catch (IOException e) { - server.abort("Failed load live region server list for replication", e); - return; - } - LOG.info("Current list of replicators: {}, live RSes: {}", currentReplicators, - liveRegionServers); - if (currentReplicators == null || currentReplicators.isEmpty()) { - return; - } - - // Look if there's anything to process after a restart - for (ServerName rs : currentReplicators) { - if (!liveRegionServers.contains(rs)) { - transferQueues(rs); - } - } } /** @@ -838,185 +795,116 @@ public class ReplicationSourceManager implements ReplicationListener { } } - @Override - public void regionServerRemoved(ServerName regionserver) { - transferQueues(regionserver); - } - - /** - * Transfer all the queues of the specified to this region server. First it tries to grab a lock - * and if it works it will move the old queues and finally will delete the old queues. - *

    - * It creates one old source for any type of source of the old rs. - */ - private void transferQueues(ServerName deadRS) { - if (server.getServerName().equals(deadRS)) { - // it's just us, give up + void claimQueue(ServerName deadRS, String queue) { + // Wait a bit before transferring the queues, we may be shutting down. + // This sleep may not be enough in some cases. + try { + Thread.sleep(sleepBeforeFailover + + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + // We try to lock that rs' queue directory + if (server.isStopped()) { + LOG.info("Not transferring queue since we are shutting down"); return; } - NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); + // After claim the queues from dead region server, wewill skip to start the + // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a + // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get + // a copy of the replication peer first to decide whether we should start the + // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to + // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). + String peerId = new ReplicationQueueInfo(queue).getPeerId(); + ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); + if (oldPeer == null) { + LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", + peerId, queue); + return; + } + Pair> claimedQueue; try { - this.executor.execute(transfer); - } catch (RejectedExecutionException ex) { - CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) - .getGlobalSource().incrFailedRecoveryQueue(); - LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); + claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName()); + } catch (ReplicationException e) { + LOG.error( + "ReplicationException: cannot claim dead region ({})'s " + + "replication queue. Znode : ({})" + + " Possible solution: check if znode size exceeds jute.maxBuffer value. " + + " If so, increase it for both client and server side.", + deadRS, queueStorage.getRsNode(deadRS), e); + server.abort("Failed to claim queue from dead regionserver.", e); + return; } - } - - /** - * Class responsible to setup new ReplicationSources to take care of the queues from dead region - * servers. - */ - class NodeFailoverWorker extends Thread { - - private final ServerName deadRS; - // After claim the queues from dead region server, the NodeFailoverWorker will skip to start - // the RecoveredReplicationSource if the peer has been removed. but there's possible that - // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the - // NodeFailoverWorker. So we need a deep copied map to decide whether we - // should start the RecoveredReplicationSource. If the latest peer is not the old peer when - // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise - // the rs will abort (See HBASE-20475). - private final Map peersSnapshot; - - public NodeFailoverWorker(ServerName deadRS) { - super("Failover-for-" + deadRS); - this.deadRS = deadRS; - peersSnapshot = new HashMap<>(replicationPeers.getPeerCache()); + if (claimedQueue.getSecond().isEmpty()) { + return; + } + String queueId = claimedQueue.getFirst(); + Set walsSet = claimedQueue.getSecond(); + ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); + if (peer == null || peer != oldPeer) { + LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); + return; + } + if (server instanceof ReplicationSyncUp.DummyServer && + peer.getPeerState().equals(PeerState.DISABLED)) { + LOG.warn( + "Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.", + peerId); + return; } - private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) { - ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId); - return oldPeerRef != null && oldPeerRef == newPeerRef; + ReplicationSourceInterface src; + try { + src = createSource(queueId, peer); + } catch (IOException e) { + LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e); + server.abort("Failed to create replication source after claiming queue.", e); + return; } - - @Override - public void run() { - // Wait a bit before transferring the queues, we may be shutting down. - // This sleep may not be enough in some cases. - try { - Thread.sleep(sleepBeforeFailover + - (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); - } - // We try to lock that rs' queue directory - if (server.isStopped()) { - LOG.info("Not transferring queue since we are shutting down"); + // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer + synchronized (oldsources) { + peer = replicationPeers.getPeer(src.getPeerId()); + if (peer == null || peer != oldPeer) { + src.terminate("Recovered queue doesn't belong to any current peer"); + deleteQueue(queueId); return; } - Map> newQueues = new HashMap<>(); - try { - List queues = queueStorage.getAllQueues(deadRS); - while (!queues.isEmpty()) { - Pair> peer = queueStorage.claimQueue(deadRS, - queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName()); - long sleep = sleepBeforeFailover / 2; - if (!peer.getSecond().isEmpty()) { - newQueues.put(peer.getFirst(), peer.getSecond()); - sleep = sleepBeforeFailover; - } - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); - } - queues = queueStorage.getAllQueues(deadRS); - } - if (queues.isEmpty()) { - queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); - } - } catch (ReplicationException e) { - LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " + - "replication queue. Znode : (%s)" + - " Possible solution: check if znode size exceeds jute.maxBuffer value. " + - " If so, increase it for both client and server side.", - deadRS, queueStorage.getRsNode(deadRS)), e); - server.abort("Failed to claim queue from dead regionserver.", e); - return; - } - // Copying over the failed queue is completed. - if (newQueues.isEmpty()) { - // We either didn't get the lock or the failed region server didn't have any outstanding - // WALs to replicate, so we are done. - return; - } - - for (Map.Entry> entry : newQueues.entrySet()) { - String queueId = entry.getKey(); - Set walsSet = entry.getValue(); - try { - // there is not an actual peer defined corresponding to peerId for the failover. - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); - String actualPeerId = replicationQueueInfo.getPeerId(); - - ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); - if (peer == null || !isOldPeer(actualPeerId, peer)) { - LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId, - deadRS); - abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); - continue; - } - if (server instanceof ReplicationSyncUp.DummyServer - && peer.getPeerState().equals(PeerState.DISABLED)) { - LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip " - + "replicating data to this peer.", - actualPeerId); - continue; - } - - ReplicationSourceInterface src = createSource(queueId, peer); - // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer - synchronized (oldsources) { - peer = replicationPeers.getPeer(src.getPeerId()); - if (peer == null || !isOldPeer(src.getPeerId(), peer)) { - src.terminate("Recovered queue doesn't belong to any current peer"); - deleteQueue(queueId); - continue; - } - // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is - // transiting to STANDBY state. The only exception is we are in STANDBY state and - // transiting to DA, under this state we will replay the remote WAL and they need to be - // replicated back. - if (peer.getPeerConfig().isSyncReplication()) { - Pair stateAndNewState = - peer.getSyncReplicationStateAndNewState(); - if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) && - stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) || - stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) { - src.terminate("Sync replication peer is in STANDBY state"); - deleteQueue(queueId); - continue; - } - } - // track sources in walsByIdRecoveredQueues - Map> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(queueId, walsByGroup); - for (String wal : walsSet) { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); - NavigableSet wals = walsByGroup.get(walPrefix); - if (wals == null) { - wals = new TreeSet<>(); - walsByGroup.put(walPrefix, wals); - } - wals.add(wal); - } - oldsources.add(src); - LOG.info("Added source for recovered queue {}", src.getQueueId()); - for (String wal : walsSet) { - LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); - src.enqueueLog(new Path(oldLogDir, wal)); - } - src.startup(); - } - } catch (IOException e) { - // TODO manage it - LOG.error("Failed creating a source", e); + // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is + // transiting to STANDBY state. The only exception is we are in STANDBY state and + // transiting to DA, under this state we will replay the remote WAL and they need to be + // replicated back. + if (peer.getPeerConfig().isSyncReplication()) { + Pair stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) && + stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) || + stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) { + src.terminate("Sync replication peer is in STANDBY state"); + deleteQueue(queueId); + return; } } + // track sources in walsByIdRecoveredQueues + Map> walsByGroup = new HashMap<>(); + walsByIdRecoveredQueues.put(queueId, walsByGroup); + for (String wal : walsSet) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + NavigableSet wals = walsByGroup.get(walPrefix); + if (wals == null) { + wals = new TreeSet<>(); + walsByGroup.put(walPrefix, wals); + } + wals.add(wal); + } + oldsources.add(src); + LOG.info("Added source for recovered queue {}", src.getQueueId()); + for (String wal : walsSet) { + LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); + src.enqueueLog(new Path(oldLogDir, wal)); + } + src.startup(); } } @@ -1249,4 +1137,8 @@ public class ReplicationSourceManager implements ReplicationListener { } return crs.startup(); } + + ReplicationQueueStorage getQueueStorage() { + return queueStorage; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index a6362f99d3a..0e938ecf202 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -31,18 +35,21 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; /** * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool - * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase + * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase * crashes * *

    @@ -62,6 +69,29 @@ public class ReplicationSyncUp extends Configured implements Tool {
         System.exit(ret);
       }
     
    +  private Set getLiveRegionServers(ZKWatcher zkw) throws KeeperException {
    +    List rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
    +    return rsZNodes == null ? Collections.emptySet() :
    +      rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
    +  }
    +
    +  // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
    +  // replication queues for the dead region servers first and then replicate the data out.
    +  private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr)
    +    throws ReplicationException, KeeperException {
    +    List replicators = mgr.getQueueStorage().getListOfReplicators();
    +    Set liveRegionServers = getLiveRegionServers(zkw);
    +    for (ServerName sn : replicators) {
    +      if (!liveRegionServers.contains(sn)) {
    +        List replicationQueues = mgr.getQueueStorage().getAllQueues(sn);
    +        System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
    +        for (String queue : replicationQueues) {
    +          mgr.claimQueue(sn, queue);
    +        }
    +      }
    +    }
    +  }
    +
       @Override
       public int run(String[] args) throws Exception {
         Abortable abortable = new Abortable() {
    @@ -88,7 +118,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
           replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
             new WALFactory(conf, "test", null, false));
           ReplicationSourceManager manager = replication.getReplicationManager();
    -      manager.init().get();
    +      manager.init();
    +      claimReplicationQueues(zkw, manager);
           while (manager.activeFailoverTaskCount() > 0) {
             Thread.sleep(SLEEP_TIME);
           }
    diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
    index b2e698f0d03..c78fe40b028 100644
    --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
    +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
    @@ -18,41 +18,30 @@
     package org.apache.hadoop.hbase.replication.regionserver;
     
     import org.apache.hadoop.hbase.executor.EventType;
    -import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
    -import org.apache.hadoop.hbase.regionserver.HRegionServer;
    +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
     import org.apache.yetus.audience.InterfaceAudience;
     
     import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
    +
     import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
     
     /**
      * The callable executed at RS side to switch rpc throttle state. 
    */ @InterfaceAudience.Private -public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable { - private HRegionServer rs; +public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable { + private boolean rpcThrottleEnabled; - private Exception initError; @Override - public Void call() throws Exception { - if (initError != null) { - throw initError; - } + protected void doCall() throws Exception { rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled); - return null; } @Override - public void init(byte[] parameter, HRegionServer rs) { - this.rs = rs; - try { - SwitchRpcThrottleRemoteStateData param = - SwitchRpcThrottleRemoteStateData.parseFrom(parameter); - rpcThrottleEnabled = param.getRpcThrottleEnabled(); - } catch (InvalidProtocolBufferException e) { - initError = e; - } + protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { + SwitchRpcThrottleRemoteStateData param = SwitchRpcThrottleRemoteStateData.parseFrom(parameter); + rpcThrottleEnabled = param.getRpcThrottleEnabled(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java new file mode 100644 index 00000000000..6c86feb70b3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP, + * this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works correctly. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestClaimReplicationQueue extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClaimReplicationQueue.class); + + private static final TableName tableName3 = TableName.valueOf("test3"); + + private static final String PEER_ID3 = "3"; + + private static Table table3; + + private static Table table4; + + private static volatile boolean EMPTY = false; + + public static final class ServerManagerForTest extends ServerManager { + + public ServerManagerForTest(MasterServices master) { + super(master); + } + + @Override + public List getOnlineServersList() { + // return no region server to make the procedure hang + if (EMPTY) { + for (StackTraceElement e : Thread.currentThread().getStackTrace()) { + if (e.getClassName().equals(ClaimReplicationQueuesProcedure.class.getName())) { + return Collections.emptyList(); + } + } + } + return super.getOnlineServersList(); + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected ServerManager createServerManager(MasterServices master) throws IOException { + setupClusterConnection(); + return new ServerManagerForTest(master); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + TestReplicationBase.setUpBeforeClass(); + createTable(tableName3); + table3 = connection1.getTable(tableName3); + table4 = connection2.getTable(tableName3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(table3, true); + Closeables.close(table4, true); + TestReplicationBase.tearDownAfterClass(); + } + + @Override + public void setUpBase() throws Exception { + super.setUpBase(); + // set up two replication peers and only 1 rs to test claim replication queue with multiple + // round + addPeer(PEER_ID3, tableName3); + } + + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + removePeer(PEER_ID3); + } + + @Test + public void testClaim() throws Exception { + // disable the peers + hbaseAdmin.disableReplicationPeer(PEER_ID2); + hbaseAdmin.disableReplicationPeer(PEER_ID3); + + // put some data + int count1 = UTIL1.loadTable(htable1, famName); + int count2 = UTIL1.loadTable(table3, famName); + + EMPTY = true; + UTIL1.getMiniHBaseCluster().stopRegionServer(0).join(); + UTIL1.getMiniHBaseCluster().startRegionServer(); + + // since there is no active region server to get the replication queue, the procedure should be + // in WAITING_TIMEOUT state for most time to retry + HMaster master = UTIL1.getMiniHBaseCluster().getMaster(); + UTIL1.waitFor(30000, + () -> master.getProcedures().stream() + .filter(p -> p instanceof ClaimReplicationQueuesProcedure) + .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT)); + + hbaseAdmin.enableReplicationPeer(PEER_ID2); + hbaseAdmin.enableReplicationPeer(PEER_ID3); + + EMPTY = false; + // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP + UTIL1.waitFor(30000, () -> master.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); + + // we should get all the data in the target cluster + waitForReplication(htable2, count1, NB_RETRIES); + waitForReplication(table4, count2, NB_RETRIES); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 967ab75863c..4c442fb1ee0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -55,6 +56,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -68,8 +70,8 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables; */ public class TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); - private static Connection connection1; - private static Connection connection2; + protected static Connection connection1; + protected static Connection connection2; protected static Configuration CONF_WITH_LOCALFS; protected static Admin hbaseAdmin; @@ -147,19 +149,22 @@ public class TestReplicationBase { waitForReplication(htable2, expectedRows, retries); } - protected static void waitForReplication(Table htable2, int expectedRows, int retries) - throws IOException, InterruptedException { + protected static void waitForReplication(Table table, int expectedRows, int retries) + throws IOException, InterruptedException { Scan scan; for (int i = 0; i < retries; i++) { scan = new Scan(); - if (i== retries -1) { + if (i == retries - 1) { fail("Waited too much time for normal batch replication"); } - ResultScanner scanner = htable2.getScanner(scan); - Result[] res = scanner.next(expectedRows); - scanner.close(); - if (res.length != expectedRows) { - LOG.info("Only got " + res.length + " rows"); + int count = 0; + try (ResultScanner scanner = table.getScanner(scan)) { + while (scanner.next() != null) { + count++; + } + } + if (count != expectedRows) { + LOG.info("Only got " + count + " rows"); Thread.sleep(SLEEP_TIME); } else { break; @@ -235,6 +240,18 @@ public class TestReplicationBase { htable2 = UTIL2.getConnection().getTable(tableName); } + protected static void createTable(TableName tableName) + throws IOException { + TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + UTIL1.waitUntilAllRegionsAssigned(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); + } + private static void startClusters() throws Exception { UTIL1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); @@ -253,22 +270,9 @@ public class TestReplicationBase { connection2 = ConnectionFactory.createConnection(CONF2); hbaseAdmin = connection1.getAdmin(); - TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) - .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); - - try ( - Admin admin1 = connection1.getAdmin(); - Admin admin2 = connection2.getAdmin()) { - admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - UTIL1.waitUntilAllRegionsAssigned(tableName); - htable1 = connection1.getTable(tableName); - UTIL2.waitUntilAllRegionsAssigned(tableName); - htable2 = connection2.getTable(tableName); - } - + createTable(tableName); + htable1 = connection1.getTable(tableName); + htable2 = connection2.getTable(tableName); } @BeforeClass @@ -281,12 +285,11 @@ public class TestReplicationBase { return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); } - @Before - public void setUpBase() throws Exception { - if (!peerExist(PEER_ID2)) { + protected final void addPeer(String peerId, TableName tableName) throws Exception { + if (!peerExist(peerId)) { ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl( - ReplicationEndpointTest.class.getName()); + .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()) + .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName()); if (isSyncPeer()) { FileSystem fs2 = UTIL2.getTestFileSystem(); // The remote wal dir is not important as we do not use it in DA state, here we only need to @@ -297,15 +300,24 @@ public class TestReplicationBase { .setRemoteWALDir(new Path("/RemoteWAL") .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); } - hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build()); + hbaseAdmin.addReplicationPeer(peerId, builder.build()); + } + } + + @Before + public void setUpBase() throws Exception { + addPeer(PEER_ID2, tableName); + } + + protected final void removePeer(String peerId) throws Exception { + if (peerExist(peerId)) { + hbaseAdmin.removeReplicationPeer(peerId); } } @After public void tearDownBase() throws Exception { - if (peerExist(PEER_ID2)) { - hbaseAdmin.removeReplicationPeer(PEER_ID2); - } + removePeer(PEER_ID2); } protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 0b1e2aa097b..d6dbaf46a56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -75,7 +75,6 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -406,9 +405,7 @@ public abstract class TestReplicationSourceManager { ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); - NodeFailoverWorker w1 = - manager.new NodeFailoverWorker(server.getServerName()); - w1.run(); + manager.claimQueue(server.getServerName(), "1"); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -432,8 +429,7 @@ public abstract class TestReplicationSourceManager { rq.addWAL(server.getServerName(), "2", group + ".log1"); rq.addWAL(server.getServerName(), "2", group + ".log2"); - NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); - w1.run(); + manager.claimQueue(server.getServerName(), "2"); // The log of the unknown peer should be removed from zk for (String peer : manager.getAllQueues()) {