HBASE-25682 Add a new command to update the configuration of all RSs in a RSGroup (#3080)

* HBASE-25682 Add a new command to update the configuration of all RSs in a RSGroup

Signed-off-by: Pankaj Kumar<pankajkumar@apache.org>
This commit is contained in:
Baiqiang Zhao 2021-05-22 01:19:25 +08:00 committed by GitHub
parent 15e861169f
commit a1177b3e91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 240 additions and 3 deletions

View File

@ -1845,6 +1845,14 @@ public interface Admin extends Abortable, Closeable {
*/ */
void updateConfiguration() throws IOException; void updateConfiguration() throws IOException;
/**
* Update the configuration and trigger an online config change
* on all the regionservers in the RSGroup.
* @param groupName the group name
* @throws IOException if a remote or network exception occurs
*/
void updateConfiguration(String groupName) throws IOException;
/** /**
* Get the info port of the current master if one is available. * Get the info port of the current master if one is available.
* @return master info port * @return master info port

View File

@ -781,6 +781,11 @@ class AdminOverAsyncAdmin implements Admin {
get(admin.updateConfiguration()); get(admin.updateConfiguration());
} }
@Override
public void updateConfiguration(String groupName) throws IOException {
get(admin.updateConfiguration(groupName));
}
@Override @Override
public List<SecurityCapability> getSecurityCapabilities() throws IOException { public List<SecurityCapability> getSecurityCapabilities() throws IOException {
return get(admin.getSecurityCapabilities()); return get(admin.getSecurityCapabilities());

View File

@ -1129,6 +1129,13 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<Void> updateConfiguration(); CompletableFuture<Void> updateConfiguration();
/**
* Update the configuration and trigger an online config change on all the regionservers in
* the RSGroup.
* @param groupName the group name
*/
CompletableFuture<Void> updateConfiguration(String groupName);
/** /**
* Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file. * Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file.
* <p> * <p>

View File

@ -630,6 +630,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.updateConfiguration()); return wrap(rawAdmin.updateConfiguration());
} }
@Override
public CompletableFuture<Void> updateConfiguration(String groupName) {
return wrap(rawAdmin.updateConfiguration(groupName));
}
@Override @Override
public CompletableFuture<Void> rollWALWriter(ServerName serverName) { public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
return wrap(rawAdmin.rollWALWriter(serverName)); return wrap(rawAdmin.rollWALWriter(serverName));

View File

@ -2905,6 +2905,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future; return future;
} }
@Override
public CompletableFuture<Void> updateConfiguration(String groupName) {
CompletableFuture<Void> future = new CompletableFuture<Void>();
addListener(
getRSGroup(groupName),
(rsGroupInfo, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (rsGroupInfo == null) {
future.completeExceptionally(
new IllegalArgumentException("Group does not exist: " + groupName));
} else {
addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<ServerName> groupServers = status.getServersName().stream().filter(
s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
groupServers.forEach(server -> futures.add(updateConfiguration(server)));
addListener(
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
(result, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
future.complete(result);
}
});
}
});
}
});
return future;
}
@Override @Override
public CompletableFuture<Void> rollWALWriter(ServerName serverName) { public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
return this return this

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AbstractTestUpdateConfiguration;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
@ -63,7 +64,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
public abstract class TestRSGroupsBase { public abstract class TestRSGroupsBase extends AbstractTestUpdateConfiguration {
protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsBase.class); protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsBase.class);
// shared // shared

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MediumTests.class})
public class TestUpdateRSGroupConfiguration extends TestRSGroupsBase {
protected static final Logger LOG = LoggerFactory.getLogger(TestUpdateRSGroupConfiguration.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestUpdateRSGroupConfiguration.class);
private static final String TEST_GROUP = "test";
private static final String TEST2_GROUP = "test2";
@BeforeClass
public static void setUp() throws Exception {
setUpConfigurationFiles(TEST_UTIL);
setUpTestBeforeClass();
addResourceToRegionServerConfiguration(TEST_UTIL);
}
@AfterClass
public static void tearDown() throws Exception {
tearDownAfterClass();
}
@Before
public void beforeMethod() throws Exception {
setUpBeforeMethod();
}
@After
public void afterMethod() throws Exception {
tearDownAfterMethod();
}
@Test
public void testOnlineConfigChangeInRSGroup() throws Exception {
addGroup(TEST_GROUP, 1);
ADMIN.updateConfiguration(TEST_GROUP);
}
@Test
public void testNonexistentRSGroup() throws Exception {
try {
ADMIN.updateConfiguration(TEST2_GROUP);
fail("Group does not exist: test2");
} catch (IllegalArgumentException iae) {
// expected
}
}
@Test
public void testCustomOnlineConfigChangeInRSGroup() throws Exception {
// Check the default configuration of the RegionServers
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
Configuration conf = thread.getRegionServer().getConfiguration();
assertEquals(0, conf.getInt("hbase.custom.config", 0));
});
replaceHBaseSiteXML();
RSGroupInfo testRSGroup = addGroup(TEST_GROUP, 1);
RSGroupInfo test2RSGroup = addGroup(TEST2_GROUP, 1);
ADMIN.updateConfiguration(TEST_GROUP);
// Check the configuration of the RegionServer in test rsgroup, should be update
Configuration regionServerConfiguration =
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.filter(regionServer ->
(regionServer.getServerName().getAddress().equals(testRSGroup.getServers().first())))
.collect(Collectors.toList()).get(0).getConfiguration();
int custom = regionServerConfiguration.getInt("hbase.custom.config", 0);
assertEquals(1000, custom);
// Check the configuration of the RegionServer in test2 rsgroup, should not be update
regionServerConfiguration =
TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer)
.filter(regionServer ->
(regionServer.getServerName().getAddress().equals(test2RSGroup.getServers().first())))
.collect(Collectors.toList()).get(0).getConfiguration();
custom = regionServerConfiguration.getInt("hbase.custom.config", 0);
assertEquals(0, custom);
restoreHBaseSiteXML();
}
}

View File

@ -617,6 +617,10 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
admin.updateConfiguration(); admin.updateConfiguration();
} }
public void updateConfiguration(String groupName) throws IOException {
admin.updateConfiguration(groupName);
}
public List<SecurityCapability> getSecurityCapabilities() throws IOException { public List<SecurityCapability> getSecurityCapabilities() throws IOException {
return admin.getSecurityCapabilities(); return admin.getSecurityCapabilities();
} }

View File

@ -1379,6 +1379,12 @@ module Hbase
@admin.updateConfiguration @admin.updateConfiguration
end end
#----------------------------------------------------------------------------------------------
# Updates the configuration of all the regionservers in the rsgroup.
def update_rsgroup_config(groupName)
@admin.updateConfiguration(groupName)
end
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Returns namespace's structure description # Returns namespace's structure description
def describe_namespace(namespace_name) def describe_namespace(namespace_name)

View File

@ -553,6 +553,7 @@ Shell.load_command_group(
commands: %w[ commands: %w[
update_config update_config
update_all_config update_all_config
update_rsgroup_config
] ]
) )

View File

@ -0,0 +1,37 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class UpdateRsgroupConfig < Command
def help
<<-EOF
Reload a subset of configuration on all servers in the rsgroup. See
http://hbase.apache.org/book.html#dyn_config for more details. Here is how
you would run the command in the hbase shell:
hbase> update_rsgroup_config 'groupName'
EOF
end
def command(groupName)
admin.update_rsgroup_config(groupName)
end
end
end
end

View File

@ -944,6 +944,11 @@ public class ThriftAdmin implements Admin {
throw new NotImplementedException("updateConfiguration not supported in ThriftAdmin"); throw new NotImplementedException("updateConfiguration not supported in ThriftAdmin");
} }
@Override
public void updateConfiguration(String groupName) {
throw new NotImplementedException("updateConfiguration not supported in ThriftAdmin");
}
@Override @Override
public List<SecurityCapability> getSecurityCapabilities() { public List<SecurityCapability> getSecurityCapabilities() {
throw new NotImplementedException("getSecurityCapabilities not supported in ThriftAdmin"); throw new NotImplementedException("getSecurityCapabilities not supported in ThriftAdmin");

View File

@ -1274,8 +1274,8 @@ The corresponding properties for port configuration are `master.rmi.registry.por
== Dynamic Configuration == Dynamic Configuration
It is possible to change a subset of the configuration without requiring a server restart. In the It is possible to change a subset of the configuration without requiring a server restart. In the
HBase shell, the operations `update_config` and `update_all_config` will prompt a server or all HBase shell, the operations `update_config`, `update_all_config` and `update_rsgroup_config`
servers to reload configuration. will prompt a server, all servers or all servers in the RSGroup to reload configuration.
Only a subset of all configurations can currently be changed in the running server. Only a subset of all configurations can currently be changed in the running server.
Here are those configurations: Here are those configurations: