HBASE-10483 Provide API for retrieving info port when hbase.master.info.port is set to 0 (Liu Shaohui)
This commit is contained in:
parent
1ba615657b
commit
fb6c5c7fe8
|
@ -1306,4 +1306,11 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @throws IOException
|
||||
*/
|
||||
void updateConfiguration() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the info port of the current master if one is available.
|
||||
* @return master info port
|
||||
* @throws IOException
|
||||
*/
|
||||
public int getMasterInfoPort() throws IOException;
|
||||
}
|
||||
|
|
|
@ -142,6 +142,7 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -3750,4 +3751,16 @@ public class HBaseAdmin implements Admin {
|
|||
updateConfiguration(server);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMasterInfoPort() throws IOException {
|
||||
ConnectionManager.HConnectionImplementation connection =
|
||||
(ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf);
|
||||
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
return MasterAddressTracker.getMasterInfoPort(zkw);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to get master info port from MasterAddressTracker", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Manages the location of the current active Master for the RegionServer.
|
||||
* <p>
|
||||
|
@ -75,6 +77,36 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
return getMasterAddress(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the info port of the current master of one is available.
|
||||
* Return 0 if no current master or zookeeper is unavailable
|
||||
* @return info port or 0 if timed out
|
||||
*/
|
||||
public int getMasterInfoPort() {
|
||||
try {
|
||||
return parse(this.getData(false)).getInfoPort();
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse master zk node data", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Get the info port of the backup master if it is available.
|
||||
* Return 0 if no current master or zookeeper is unavailable
|
||||
* @param sn server name of backup master
|
||||
* @return info port or 0 if timed out or exceptions
|
||||
*/
|
||||
public int getBackupMasterInfoPort(final ServerName sn) {
|
||||
String backupZNode = ZKUtil.joinZNode(watcher.backupMasterAddressesZNode, sn.toString());
|
||||
try {
|
||||
byte[] data = ZKUtil.getData(watcher, backupZNode);
|
||||
return parse(data).getInfoPort();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get backup master: " + sn + "'s info port.", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the address of the current master if one is available. Returns null
|
||||
* if no current master. If refresh is set, try to load the data from ZK again,
|
||||
|
@ -122,6 +154,36 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get master info port.
|
||||
* Use this instead of {@link #getMasterInfoPort()} if you do not have an
|
||||
* instance of this tracker in your context.
|
||||
* @param zkw ZooKeeperWatcher to use
|
||||
* @return master info port in the the master address znode or null if no
|
||||
* znode present.
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int getMasterInfoPort(final ZooKeeperWatcher zkw) throws KeeperException,
|
||||
IOException {
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode());
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
if (data == null) {
|
||||
throw new IOException("Can't get master address from ZooKeeper; znode data == null");
|
||||
}
|
||||
try {
|
||||
return parse(data).getInfoPort();
|
||||
} catch (DeserializationException e) {
|
||||
KeeperException ke = new KeeperException.DataInconsistencyException();
|
||||
ke.initCause(e);
|
||||
throw ke;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set master address into the <code>master</code> znode or into the backup
|
||||
* subdirectory of backup masters; switch off the passed in <code>znode</code>
|
||||
|
@ -134,9 +196,9 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean setMasterAddress(final ZooKeeperWatcher zkw,
|
||||
final String znode, final ServerName master)
|
||||
final String znode, final ServerName master, int infoPort)
|
||||
throws KeeperException {
|
||||
return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master));
|
||||
return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master, infoPort));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -152,17 +214,31 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
* @return Content of the master znode as a serialized pb with the pb
|
||||
* magic as prefix.
|
||||
*/
|
||||
static byte [] toByteArray(final ServerName sn) {
|
||||
ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
|
||||
HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
|
||||
snbuilder.setHostName(sn.getHostname());
|
||||
snbuilder.setPort(sn.getPort());
|
||||
snbuilder.setStartCode(sn.getStartcode());
|
||||
mbuilder.setMaster(snbuilder.build());
|
||||
mbuilder.setRpcVersion(HConstants.RPC_CURRENT_VERSION);
|
||||
return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray());
|
||||
}
|
||||
static byte[] toByteArray(final ServerName sn, int infoPort) {
|
||||
ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
|
||||
HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
|
||||
snbuilder.setHostName(sn.getHostname());
|
||||
snbuilder.setPort(sn.getPort());
|
||||
snbuilder.setStartCode(sn.getStartcode());
|
||||
mbuilder.setMaster(snbuilder.build());
|
||||
mbuilder.setRpcVersion(HConstants.RPC_CURRENT_VERSION);
|
||||
mbuilder.setInfoPort(infoPort);
|
||||
return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data zookeeper data
|
||||
* @return pb object of master
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
public static ZooKeeperProtos.Master parse(byte[] data) throws DeserializationException {
|
||||
int prefixLen = ProtobufUtil.lengthOfPBMagic();
|
||||
try {
|
||||
return ZooKeeperProtos.Master.PARSER.parseFrom(data, prefixLen, data.length - prefixLen);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* delete the master znode if its content is same as the parameter
|
||||
*/
|
||||
|
|
|
@ -969,6 +969,16 @@ public final class ZooKeeperProtos {
|
|||
* </pre>
|
||||
*/
|
||||
int getRpcVersion();
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
boolean hasInfoPort();
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
int getInfoPort();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code Master}
|
||||
|
@ -1044,6 +1054,11 @@ public final class ZooKeeperProtos {
|
|||
rpcVersion_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 24: {
|
||||
bitField0_ |= 0x00000004;
|
||||
infoPort_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -1142,9 +1157,26 @@ public final class ZooKeeperProtos {
|
|||
return rpcVersion_;
|
||||
}
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
public static final int INFO_PORT_FIELD_NUMBER = 3;
|
||||
private int infoPort_;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public boolean hasInfoPort() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public int getInfoPort() {
|
||||
return infoPort_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
|
||||
rpcVersion_ = 0;
|
||||
infoPort_ = 0;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -1172,6 +1204,9 @@ public final class ZooKeeperProtos {
|
|||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeUInt32(2, rpcVersion_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeUInt32(3, infoPort_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -1189,6 +1224,10 @@ public final class ZooKeeperProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(2, rpcVersion_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(3, infoPort_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -1222,6 +1261,11 @@ public final class ZooKeeperProtos {
|
|||
result = result && (getRpcVersion()
|
||||
== other.getRpcVersion());
|
||||
}
|
||||
result = result && (hasInfoPort() == other.hasInfoPort());
|
||||
if (hasInfoPort()) {
|
||||
result = result && (getInfoPort()
|
||||
== other.getInfoPort());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -1243,6 +1287,10 @@ public final class ZooKeeperProtos {
|
|||
hash = (37 * hash) + RPC_VERSION_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getRpcVersion();
|
||||
}
|
||||
if (hasInfoPort()) {
|
||||
hash = (37 * hash) + INFO_PORT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getInfoPort();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -1366,6 +1414,8 @@ public final class ZooKeeperProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
rpcVersion_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
infoPort_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -1406,6 +1456,10 @@ public final class ZooKeeperProtos {
|
|||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.rpcVersion_ = rpcVersion_;
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.infoPort_ = infoPort_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -1428,6 +1482,9 @@ public final class ZooKeeperProtos {
|
|||
if (other.hasRpcVersion()) {
|
||||
setRpcVersion(other.getRpcVersion());
|
||||
}
|
||||
if (other.hasInfoPort()) {
|
||||
setInfoPort(other.getInfoPort());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -1665,6 +1722,39 @@ public final class ZooKeeperProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
private int infoPort_ ;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public boolean hasInfoPort() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public int getInfoPort() {
|
||||
return infoPort_;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public Builder setInfoPort(int value) {
|
||||
bitField0_ |= 0x00000004;
|
||||
infoPort_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public Builder clearInfoPort() {
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
infoPort_ = 0;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:Master)
|
||||
}
|
||||
|
||||
|
@ -9565,38 +9655,38 @@ public final class ZooKeeperProtos {
|
|||
"\n\017ZooKeeper.proto\032\013HBase.proto\032\023ClusterS" +
|
||||
"tatus.proto\"g\n\020MetaRegionServer\022\033\n\006serve" +
|
||||
"r\030\001 \002(\0132\013.ServerName\022\023\n\013rpc_version\030\002 \001(" +
|
||||
"\r\022!\n\005state\030\003 \001(\0162\022.RegionState.State\":\n\006" +
|
||||
"\r\022!\n\005state\030\003 \001(\0162\022.RegionState.State\"M\n\006" +
|
||||
"Master\022\033\n\006master\030\001 \002(\0132\013.ServerName\022\023\n\013r" +
|
||||
"pc_version\030\002 \001(\r\"\037\n\tClusterUp\022\022\n\nstart_d" +
|
||||
"ate\030\001 \002(\t\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001 \002(" +
|
||||
"\0162\023.SplitLogTask.State\022 \n\013server_name\030\002 " +
|
||||
"\002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.SplitLo" +
|
||||
"gTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022\016\n\n",
|
||||
"UNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n" +
|
||||
"\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007UNKN" +
|
||||
"OWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLAY\020\002" +
|
||||
"\"\214\001\n\024DeprecatedTableState\0223\n\005state\030\001 \002(\016" +
|
||||
"2\033.DeprecatedTableState.State:\007ENABLED\"?" +
|
||||
"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\022\r\n\tDI" +
|
||||
"SABLING\020\002\022\014\n\010ENABLING\020\003\"\215\001\n\017ReplicationP" +
|
||||
"eer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEn" +
|
||||
"dpointImpl\030\002 \001(\t\022\035\n\004data\030\003 \003(\0132\017.BytesBy" +
|
||||
"tesPair\022&\n\rconfiguration\030\004 \003(\0132\017.NameStr",
|
||||
"ingPair\"^\n\020ReplicationState\022&\n\005state\030\001 \002" +
|
||||
"(\0162\027.ReplicationState.State\"\"\n\005State\022\013\n\007" +
|
||||
"ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHL" +
|
||||
"ogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Replicat" +
|
||||
"ionLock\022\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLoc" +
|
||||
"k\022\036\n\ntable_name\030\001 \001(\0132\n.TableName\022\037\n\nloc" +
|
||||
"k_owner\030\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030" +
|
||||
"\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(" +
|
||||
"\t\022\023\n\013create_time\030\006 \001(\003\";\n\017StoreSequenceI" +
|
||||
"d\022\023\n\013family_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 ",
|
||||
"\002(\004\"g\n\026RegionStoreSequenceIds\022 \n\030last_fl" +
|
||||
"ushed_sequence_id\030\001 \002(\004\022+\n\021store_sequenc" +
|
||||
"e_id\030\002 \003(\0132\020.StoreSequenceIdBE\n*org.apac" +
|
||||
"he.hadoop.hbase.protobuf.generatedB\017ZooK" +
|
||||
"eeperProtosH\001\210\001\001\240\001\001"
|
||||
"pc_version\030\002 \001(\r\022\021\n\tinfo_port\030\003 \001(\r\"\037\n\tC" +
|
||||
"lusterUp\022\022\n\nstart_date\030\001 \002(\t\"\214\002\n\014SplitLo" +
|
||||
"gTask\022\"\n\005state\030\001 \002(\0162\023.SplitLogTask.Stat" +
|
||||
"e\022 \n\013server_name\030\002 \002(\0132\013.ServerName\0221\n\004m" +
|
||||
"ode\030\003 \001(\0162\032.SplitLogTask.RecoveryMode:\007U",
|
||||
"NKNOWN\"C\n\005State\022\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED" +
|
||||
"\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014R" +
|
||||
"ecoveryMode\022\013\n\007UNKNOWN\020\000\022\021\n\rLOG_SPLITTIN" +
|
||||
"G\020\001\022\016\n\nLOG_REPLAY\020\002\"\214\001\n\024DeprecatedTableS" +
|
||||
"tate\0223\n\005state\030\001 \002(\0162\033.DeprecatedTableSta" +
|
||||
"te.State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022" +
|
||||
"\014\n\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING" +
|
||||
"\020\003\"\215\001\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002" +
|
||||
"(\t\022\037\n\027replicationEndpointImpl\030\002 \001(\t\022\035\n\004d" +
|
||||
"ata\030\003 \003(\0132\017.BytesBytesPair\022&\n\rconfigurat",
|
||||
"ion\030\004 \003(\0132\017.NameStringPair\"^\n\020Replicatio" +
|
||||
"nState\022&\n\005state\030\001 \002(\0162\027.ReplicationState" +
|
||||
".State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED" +
|
||||
"\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010positio" +
|
||||
"n\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlock_owner" +
|
||||
"\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_name\030\001 \001(\013" +
|
||||
"2\n.TableName\022\037\n\nlock_owner\030\002 \001(\0132\013.Serve" +
|
||||
"rName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 " +
|
||||
"\001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(" +
|
||||
"\003\";\n\017StoreSequenceId\022\023\n\013family_name\030\001 \002(",
|
||||
"\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026RegionStoreSeq" +
|
||||
"uenceIds\022 \n\030last_flushed_sequence_id\030\001 \002" +
|
||||
"(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020.StoreSeq" +
|
||||
"uenceIdBE\n*org.apache.hadoop.hbase.proto" +
|
||||
"buf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -9614,7 +9704,7 @@ public final class ZooKeeperProtos {
|
|||
internal_static_Master_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_Master_descriptor,
|
||||
new java.lang.String[] { "Master", "RpcVersion", });
|
||||
new java.lang.String[] { "Master", "RpcVersion", "InfoPort", });
|
||||
internal_static_ClusterUp_descriptor =
|
||||
getDescriptor().getMessageTypes().get(2);
|
||||
internal_static_ClusterUp_fieldAccessorTable = new
|
||||
|
|
|
@ -52,6 +52,7 @@ message Master {
|
|||
required ServerName master = 1;
|
||||
// Major RPC version so that clients can know what version the master can accept.
|
||||
optional uint32 rpc_version = 2;
|
||||
optional uint32 info_port = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,12 +28,11 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|||
</%import>
|
||||
<%java>
|
||||
Collection<ServerName> masters = null;
|
||||
|
||||
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
|
||||
if (master.isActiveMaster()) {
|
||||
ClusterStatus status = master.getClusterStatus();
|
||||
masters = status.getBackupMasters();
|
||||
} else{
|
||||
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
|
||||
ServerName sn = masterAddressTracker == null ? null
|
||||
: masterAddressTracker.getMasterAddress();
|
||||
assert sn != null : "Failed to retreive master's ServerName!";
|
||||
|
@ -43,7 +42,7 @@ if (master.isActiveMaster()) {
|
|||
|
||||
<%java>
|
||||
ServerName [] serverNames = masters.toArray(new ServerName[masters.size()]);
|
||||
int infoPort = master.getConfiguration().getInt("hbase.master.info.port", 16010);
|
||||
int infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getMasterInfoPort();
|
||||
</%java>
|
||||
<%if (!master.isActiveMaster()) %>
|
||||
<%if serverNames[0] != null %>
|
||||
|
@ -66,6 +65,7 @@ int infoPort = master.getConfiguration().getInt("hbase.master.info.port", 16010)
|
|||
<%java>
|
||||
Arrays.sort(serverNames);
|
||||
for (ServerName serverName : serverNames) {
|
||||
infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getBackupMasterInfoPort(serverName);
|
||||
</%java>
|
||||
<tr>
|
||||
<td><a href="//<% serverName.getHostname() %>:<%
|
||||
|
|
|
@ -45,10 +45,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|||
ServerInfo serverInfo = ProtobufUtil.getServerInfo(regionServer.getRSRpcServices());
|
||||
ServerName serverName = ProtobufUtil.toServerName(serverInfo.getServerName());
|
||||
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices());
|
||||
int infoPort = regionServer.getConfiguration().getInt("hbase.master.info.port", 16010);
|
||||
MasterAddressTracker masterAddressTracker = regionServer.getMasterAddressTracker();
|
||||
ServerName masterServerName = masterAddressTracker == null ? null
|
||||
: masterAddressTracker.getMasterAddress();
|
||||
int infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getMasterInfoPort();
|
||||
</%java>
|
||||
<!--[if IE]>
|
||||
<!DOCTYPE html>
|
||||
|
|
|
@ -56,6 +56,7 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
|
||||
|
||||
private final ServerName sn;
|
||||
private final int infoPort;
|
||||
private final Server master;
|
||||
|
||||
/**
|
||||
|
@ -63,10 +64,11 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
* @param sn ServerName
|
||||
* @param master In an instance of a Master.
|
||||
*/
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) {
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, int infoPort, Server master) {
|
||||
super(watcher);
|
||||
watcher.registerListener(this);
|
||||
this.sn = sn;
|
||||
this.infoPort = infoPort;
|
||||
this.master = master;
|
||||
}
|
||||
|
||||
|
@ -156,7 +158,7 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
// Write out our ServerName as versioned bytes.
|
||||
try {
|
||||
if (MasterAddressTracker.setMasterAddress(this.watcher,
|
||||
this.watcher.getMasterAddressZNode(), this.sn)) {
|
||||
this.watcher.getMasterAddressZNode(), this.sn, infoPort)) {
|
||||
|
||||
// If we were a backup master before, delete our ZNode from the backup
|
||||
// master directory since we are the active now)
|
||||
|
|
|
@ -241,6 +241,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
/** jetty server for master to redirect requests to regionserver infoServer */
|
||||
private org.mortbay.jetty.Server masterJettyServer;
|
||||
|
||||
private int masterInfoPort;
|
||||
public static class RedirectServlet extends HttpServlet {
|
||||
private static final long serialVersionUID = 2894774810058302472L;
|
||||
private static int regionServerInfoPort;
|
||||
|
@ -352,6 +353,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
} catch (Exception e) {
|
||||
throw new IOException("Failed to start redirecting jetty server", e);
|
||||
}
|
||||
masterInfoPort = connector.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1283,11 +1285,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
* this node for us since it is ephemeral.
|
||||
*/
|
||||
LOG.info("Adding backup master ZNode " + backupZNode);
|
||||
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName)) {
|
||||
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
|
||||
serverName, masterInfoPort)) {
|
||||
LOG.warn("Failed create of " + backupZNode + " by " + serverName);
|
||||
}
|
||||
|
||||
activeMasterManager = new ActiveMasterManager(zooKeeper, serverName, this);
|
||||
activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
|
||||
masterInfoPort, this);
|
||||
// Start a thread to try to become the active master, so we won't block here
|
||||
Threads.setDaemonThreadRunning(new Thread(new Runnable() {
|
||||
public void run() {
|
||||
|
|
|
@ -259,9 +259,9 @@ public class TestActiveMasterManager {
|
|||
this.clusterStatusTracker =
|
||||
new ClusterStatusTracker(zk, this);
|
||||
clusterStatusTracker.start();
|
||||
|
||||
|
||||
this.activeMasterManager =
|
||||
new ActiveMasterManager(zk, master, this);
|
||||
new ActiveMasterManager(zk, master, 0, this);
|
||||
zk.registerListener(activeMasterManager);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -77,9 +78,10 @@ public class TestMasterAddressTracker {
|
|||
// Create the master node with a dummy address
|
||||
String host = "localhost";
|
||||
int port = 1234;
|
||||
int infoPort = 1235;
|
||||
ServerName sn = ServerName.valueOf(host, port, System.currentTimeMillis());
|
||||
LOG.info("Creating master node");
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getMasterAddressZNode(), sn);
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getMasterAddressZNode(), sn, infoPort);
|
||||
|
||||
// Wait for the node to be created
|
||||
LOG.info("Waiting for master address manager to be notified");
|
||||
|
@ -88,7 +90,7 @@ public class TestMasterAddressTracker {
|
|||
assertTrue(addressTracker.hasMaster());
|
||||
ServerName pulledAddress = addressTracker.getMasterAddress();
|
||||
assertTrue(pulledAddress.equals(sn));
|
||||
|
||||
assertEquals(infoPort, addressTracker.getMasterInfoPort());
|
||||
}
|
||||
|
||||
public static class NodeCreationListener extends ZooKeeperListener {
|
||||
|
|
|
@ -334,12 +334,12 @@ public class TestZooKeeperNodeTracker {
|
|||
Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
|
||||
|
||||
// Check that we don't delete if we're not supposed to
|
||||
ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn));
|
||||
ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0));
|
||||
MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString());
|
||||
Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
|
||||
|
||||
// Check that we delete when we're supposed to
|
||||
ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn));
|
||||
ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn, 0));
|
||||
MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
|
||||
Assert.assertTrue( ZKUtil.getData(zkw, nodeName)== null );
|
||||
|
||||
|
|
Loading…
Reference in New Issue