HBASE-19010 Reimplement getMasterInfoPort for Admin
This commit is contained in:
parent
cb5c4776de
commit
592d541f5d
|
@ -81,6 +81,7 @@ public class ClusterStatus {
|
|||
private String clusterId;
|
||||
private String[] masterCoprocessors;
|
||||
private Boolean balancerOn;
|
||||
private int masterInfoPort;
|
||||
|
||||
/**
|
||||
* Use {@link ClusterStatus.Builder} to construct a ClusterStatus instead.
|
||||
|
@ -95,7 +96,8 @@ public class ClusterStatus {
|
|||
final Collection<ServerName> backupMasters,
|
||||
final List<RegionState> rit,
|
||||
final String[] masterCoprocessors,
|
||||
final Boolean balancerOn) {
|
||||
final Boolean balancerOn,
|
||||
final int masterInfoPort) {
|
||||
// TODO: make this constructor private
|
||||
this.hbaseVersion = hbaseVersion;
|
||||
this.liveServers = servers;
|
||||
|
@ -106,6 +108,7 @@ public class ClusterStatus {
|
|||
this.clusterId = clusterid;
|
||||
this.masterCoprocessors = masterCoprocessors;
|
||||
this.balancerOn = balancerOn;
|
||||
this.masterInfoPort = masterInfoPort;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,15 +205,17 @@ public class ClusterStatus {
|
|||
getDeadServerNames().containsAll(other.getDeadServerNames()) &&
|
||||
Arrays.equals(getMasterCoprocessors(), other.getMasterCoprocessors()) &&
|
||||
Objects.equal(getMaster(), other.getMaster()) &&
|
||||
getBackupMasters().containsAll(other.getBackupMasters());
|
||||
getBackupMasters().containsAll(other.getBackupMasters()) &&
|
||||
Objects.equal(getClusterId(), other.getClusterId()) &&
|
||||
getMasterInfoPort() == other.getMasterInfoPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#hashCode()
|
||||
*/
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(hbaseVersion, liveServers, deadServers,
|
||||
master, backupMasters);
|
||||
return Objects.hashCode(hbaseVersion, liveServers, deadServers, master, backupMasters,
|
||||
clusterId, masterInfoPort);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -312,6 +317,10 @@ public class ClusterStatus {
|
|||
return balancerOn;
|
||||
}
|
||||
|
||||
public int getMasterInfoPort() {
|
||||
return masterInfoPort;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(1024);
|
||||
sb.append("Master: " + master);
|
||||
|
@ -372,6 +381,7 @@ public class ClusterStatus {
|
|||
private String clusterId = null;
|
||||
private String[] masterCoprocessors = null;
|
||||
private Boolean balancerOn = null;
|
||||
private int masterInfoPort = -1;
|
||||
|
||||
private Builder() {}
|
||||
|
||||
|
@ -420,10 +430,15 @@ public class ClusterStatus {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMasterInfoPort(int masterInfoPort) {
|
||||
this.masterInfoPort = masterInfoPort;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClusterStatus build() {
|
||||
return new ClusterStatus(hbaseVersion, clusterId, liveServers,
|
||||
deadServers, master, backupMasters, intransition, masterCoprocessors,
|
||||
balancerOn);
|
||||
balancerOn, masterInfoPort);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -439,6 +454,7 @@ public class ClusterStatus {
|
|||
MASTER, /** status about master */
|
||||
BACKUP_MASTERS, /** status about backup masters */
|
||||
MASTER_COPROCESSORS, /** status about master coprocessors */
|
||||
REGIONS_IN_TRANSITION; /** status about regions in transition */
|
||||
REGIONS_IN_TRANSITION, /** status about regions in transition */
|
||||
MASTER_INFO_PORT; /** master info port **/
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2194,7 +2194,9 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @return master info port
|
||||
* @throws IOException
|
||||
*/
|
||||
int getMasterInfoPort() throws IOException;
|
||||
default int getMasterInfoPort() throws IOException {
|
||||
return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).getMasterInfoPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a table. Asynchronous operation in that this method requests that a
|
||||
|
|
|
@ -814,6 +814,15 @@ public interface AsyncAdmin {
|
|||
return getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).thenApply(ClusterStatus::getServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the info port of the current master if one is available.
|
||||
* @return master info port
|
||||
*/
|
||||
default CompletableFuture<Integer> getMasterInfoPort() {
|
||||
return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).thenApply(
|
||||
ClusterStatus::getMasterInfoPort);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver.
|
||||
* @param serverName
|
||||
|
|
|
@ -3185,18 +3185,6 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMasterInfoPort() throws IOException {
|
||||
// TODO: Fix! Reaching into internal implementation!!!!
|
||||
ConnectionImplementation connection = (ConnectionImplementation)this.connection;
|
||||
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
return MasterAddressTracker.getMasterInfoPort(zkw);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to get master info port from MasterAddressTracker", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ServerName getMasterAddress() throws IOException {
|
||||
// TODO: Fix! Reaching into internal implementation!!!!
|
||||
ConnectionImplementation connection = (ConnectionImplementation)this.connection;
|
||||
|
|
|
@ -3009,6 +3009,7 @@ public final class ProtobufUtil {
|
|||
if (proto.hasBalancerOn()) {
|
||||
balancerOn = proto.getBalancerOn();
|
||||
}
|
||||
|
||||
builder.setHBaseVersion(hbaseVersion)
|
||||
.setClusterId(clusterId)
|
||||
.setLiveServers(servers)
|
||||
|
@ -3018,6 +3019,9 @@ public final class ProtobufUtil {
|
|||
.setRegionState(rit)
|
||||
.setMasterCoprocessors(masterCoprocessors)
|
||||
.setBalancerOn(balancerOn);
|
||||
if (proto.hasMasterInfoPort()) {
|
||||
builder.setMasterInfoPort(proto.getMasterInfoPort());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -3037,6 +3041,7 @@ public final class ProtobufUtil {
|
|||
case MASTER: return ClusterStatus.Option.MASTER;
|
||||
case BACKUP_MASTERS: return ClusterStatus.Option.BACKUP_MASTERS;
|
||||
case BALANCER_ON: return ClusterStatus.Option.BALANCER_ON;
|
||||
case MASTER_INFO_PORT: return ClusterStatus.Option.MASTER_INFO_PORT;
|
||||
// should not reach here
|
||||
default: throw new IllegalArgumentException("Invalid option: " + option);
|
||||
}
|
||||
|
@ -3058,6 +3063,7 @@ public final class ProtobufUtil {
|
|||
case MASTER: return ClusterStatusProtos.Option.MASTER;
|
||||
case BACKUP_MASTERS: return ClusterStatusProtos.Option.BACKUP_MASTERS;
|
||||
case BALANCER_ON: return ClusterStatusProtos.Option.BALANCER_ON;
|
||||
case MASTER_INFO_PORT: return ClusterStatusProtos.Option.MASTER_INFO_PORT;
|
||||
// should not reach here
|
||||
default: throw new IllegalArgumentException("Invalid option: " + option);
|
||||
}
|
||||
|
@ -3155,6 +3161,7 @@ public final class ProtobufUtil {
|
|||
builder.setBalancerOn(status.getBalancerOn());
|
||||
}
|
||||
|
||||
builder.setMasterInfoPort(status.getMasterInfoPort());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -224,6 +224,7 @@ message ClusterStatus {
|
|||
optional ServerName master = 7;
|
||||
repeated ServerName backup_masters = 8;
|
||||
optional bool balancer_on = 9;
|
||||
optional int32 master_info_port = 10 [default = -1];
|
||||
}
|
||||
|
||||
enum Option {
|
||||
|
@ -236,4 +237,5 @@ enum Option {
|
|||
MASTER_COPROCESSORS = 6;
|
||||
REGIONS_IN_TRANSITION = 7;
|
||||
BALANCER_ON = 8;
|
||||
MASTER_INFO_PORT = 9;
|
||||
}
|
||||
|
|
|
@ -2503,6 +2503,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case MASTER_INFO_PORT: {
|
||||
if (infoServer != null) {
|
||||
builder.setMasterInfoPort(infoServer.getPort());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -28,6 +29,7 @@ import java.net.URL;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -70,6 +72,14 @@ public class TestInfoServers {
|
|||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMasterInfoPort() throws Exception {
|
||||
try (Admin admin = UTIL.getAdmin()) {
|
||||
assertEquals(UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(),
|
||||
admin.getMasterInfoPort());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure when we go to top level index pages that we get redirected to an info-server specific status
|
||||
* page.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -63,6 +65,23 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
|
|||
private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml");
|
||||
private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMasterInfoPort() throws Exception {
|
||||
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin
|
||||
.getMasterInfoPort().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionServerOnlineConfigChange() throws Exception {
|
||||
replaceHBaseSiteXML();
|
||||
|
|
|
@ -81,6 +81,7 @@ public class TestClientClusterStatus {
|
|||
Assert.assertTrue(origin.getDeadServersSize() == defaults.getDeadServersSize());
|
||||
Assert.assertTrue(origin.getRegionsCount() == defaults.getRegionsCount());
|
||||
Assert.assertTrue(origin.getServersSize() == defaults.getServersSize());
|
||||
Assert.assertTrue(origin.getMasterInfoPort() == defaults.getMasterInfoPort());
|
||||
Assert.assertTrue(origin.equals(defaults));
|
||||
}
|
||||
|
||||
|
@ -97,6 +98,7 @@ public class TestClientClusterStatus {
|
|||
Assert.assertTrue(status.getDeadServerNames().isEmpty());
|
||||
Assert.assertNull(status.getMaster());
|
||||
Assert.assertTrue(status.getBackupMasters().isEmpty());
|
||||
Assert.assertEquals(-1, status.getMasterInfoPort());
|
||||
// No npe thrown is expected
|
||||
Assert.assertNotNull(status.hashCode());
|
||||
ClusterStatus nullEqualsCheck =
|
||||
|
|
Loading…
Reference in New Issue