HBASE-10483 Provide API for retrieving info port when hbase.master.info.port is set to 0 (Liu Shaohui)
Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
This commit is contained in:
parent
86711846d3
commit
f3a38dcf16
|
@ -1285,4 +1285,11 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @throws IOException
|
||||
*/
|
||||
void updateConfiguration() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the info port of the current master if one is available.
|
||||
* @return master info port
|
||||
* @throws IOException
|
||||
*/
|
||||
public int getMasterInfoPort() throws IOException;
|
||||
}
|
||||
|
|
|
@ -139,6 +139,8 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -3684,7 +3686,7 @@ public class HBaseAdmin implements Admin {
|
|||
public CoprocessorRpcChannel coprocessorService(ServerName sn) {
|
||||
return new RegionServerCoprocessorRpcChannel(connection, sn);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void updateConfiguration(ServerName server) throws IOException {
|
||||
try {
|
||||
|
@ -3701,4 +3703,16 @@ public class HBaseAdmin implements Admin {
|
|||
updateConfiguration(server);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMasterInfoPort() throws IOException {
|
||||
ConnectionManager.HConnectionImplementation connection =
|
||||
(ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf);
|
||||
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
|
||||
try {
|
||||
return MasterAddressTracker.getMasterInfoPort(zkw);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Failed to get master info port from MasterAddressTracker", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.zookeeper.data.Stat;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Manages the location of the current active Master for the RegionServer.
|
||||
|
@ -75,6 +76,36 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
return getMasterAddress(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the info port of the current master of one is available.
|
||||
* Return 0 if no current master or zookeeper is unavailable
|
||||
* @return info port or 0 if timed out
|
||||
*/
|
||||
public int getMasterInfoPort() {
|
||||
try {
|
||||
return parse(this.getData(false)).getInfoPort();
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse master zk node data", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Get the info port of the backup master if it is available.
|
||||
* Return 0 if no current master or zookeeper is unavailable
|
||||
* @param sn server name of backup master
|
||||
* @return info port or 0 if timed out or exceptions
|
||||
*/
|
||||
public int getBackupMasterInfoPort(final ServerName sn) {
|
||||
String backupZNode = ZKUtil.joinZNode(watcher.backupMasterAddressesZNode, sn.toString());
|
||||
try {
|
||||
byte[] data = ZKUtil.getData(watcher, backupZNode);
|
||||
return parse(data).getInfoPort();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get backup master: " + sn + "'s info port.", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the address of the current master if one is available. Returns null
|
||||
* if no current master. If refresh is set, try to load the data from ZK again,
|
||||
|
@ -99,8 +130,8 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
* @param zkw ZooKeeperWatcher to use
|
||||
* @return ServerName stored in the the master address znode or null if no
|
||||
* znode present.
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ServerName getMasterAddress(final ZooKeeperWatcher zkw)
|
||||
throws KeeperException, IOException {
|
||||
|
@ -122,6 +153,36 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get master info port.
|
||||
* Use this instead of {@link #getMasterInfoPort()} if you do not have an
|
||||
* instance of this tracker in your context.
|
||||
* @param zkw ZooKeeperWatcher to use
|
||||
* @return master info port in the the master address znode or null if no
|
||||
* znode present.
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int getMasterInfoPort(final ZooKeeperWatcher zkw) throws KeeperException,
|
||||
IOException {
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode());
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
if (data == null) {
|
||||
throw new IOException("Can't get master address from ZooKeeper; znode data == null");
|
||||
}
|
||||
try {
|
||||
return parse(data).getInfoPort();
|
||||
} catch (DeserializationException e) {
|
||||
KeeperException ke = new KeeperException.DataInconsistencyException();
|
||||
ke.initCause(e);
|
||||
throw ke;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set master address into the <code>master</code> znode or into the backup
|
||||
* subdirectory of backup masters; switch off the passed in <code>znode</code>
|
||||
|
@ -134,9 +195,9 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean setMasterAddress(final ZooKeeperWatcher zkw,
|
||||
final String znode, final ServerName master)
|
||||
final String znode, final ServerName master, int infoPort)
|
||||
throws KeeperException {
|
||||
return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master));
|
||||
return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master, infoPort));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -152,17 +213,31 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
|||
* @return Content of the master znode as a serialized pb with the pb
|
||||
* magic as prefix.
|
||||
*/
|
||||
static byte [] toByteArray(final ServerName sn) {
|
||||
ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
|
||||
HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
|
||||
snbuilder.setHostName(sn.getHostname());
|
||||
snbuilder.setPort(sn.getPort());
|
||||
snbuilder.setStartCode(sn.getStartcode());
|
||||
mbuilder.setMaster(snbuilder.build());
|
||||
mbuilder.setRpcVersion(HConstants.RPC_CURRENT_VERSION);
|
||||
return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray());
|
||||
}
|
||||
static byte[] toByteArray(final ServerName sn, int infoPort) {
|
||||
ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
|
||||
HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
|
||||
snbuilder.setHostName(sn.getHostname());
|
||||
snbuilder.setPort(sn.getPort());
|
||||
snbuilder.setStartCode(sn.getStartcode());
|
||||
mbuilder.setMaster(snbuilder.build());
|
||||
mbuilder.setRpcVersion(HConstants.RPC_CURRENT_VERSION);
|
||||
mbuilder.setInfoPort(infoPort);
|
||||
return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data zookeeper data
|
||||
* @return pb object of master
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
public static ZooKeeperProtos.Master parse(byte[] data) throws DeserializationException {
|
||||
int prefixLen = ProtobufUtil.lengthOfPBMagic();
|
||||
try {
|
||||
return ZooKeeperProtos.Master.PARSER.parseFrom(data, prefixLen, data.length - prefixLen);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* delete the master znode if its content is same as the parameter
|
||||
*/
|
||||
|
|
|
@ -823,6 +823,16 @@ public final class ZooKeeperProtos {
|
|||
* </pre>
|
||||
*/
|
||||
int getRpcVersion();
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
boolean hasInfoPort();
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
int getInfoPort();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code Master}
|
||||
|
@ -898,6 +908,11 @@ public final class ZooKeeperProtos {
|
|||
rpcVersion_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 24: {
|
||||
bitField0_ |= 0x00000004;
|
||||
infoPort_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -996,9 +1011,26 @@ public final class ZooKeeperProtos {
|
|||
return rpcVersion_;
|
||||
}
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
public static final int INFO_PORT_FIELD_NUMBER = 3;
|
||||
private int infoPort_;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public boolean hasInfoPort() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public int getInfoPort() {
|
||||
return infoPort_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
|
||||
rpcVersion_ = 0;
|
||||
infoPort_ = 0;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -1026,6 +1058,9 @@ public final class ZooKeeperProtos {
|
|||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeUInt32(2, rpcVersion_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeUInt32(3, infoPort_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -1043,6 +1078,10 @@ public final class ZooKeeperProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(2, rpcVersion_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(3, infoPort_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -1076,6 +1115,11 @@ public final class ZooKeeperProtos {
|
|||
result = result && (getRpcVersion()
|
||||
== other.getRpcVersion());
|
||||
}
|
||||
result = result && (hasInfoPort() == other.hasInfoPort());
|
||||
if (hasInfoPort()) {
|
||||
result = result && (getInfoPort()
|
||||
== other.getInfoPort());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -1097,6 +1141,10 @@ public final class ZooKeeperProtos {
|
|||
hash = (37 * hash) + RPC_VERSION_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getRpcVersion();
|
||||
}
|
||||
if (hasInfoPort()) {
|
||||
hash = (37 * hash) + INFO_PORT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getInfoPort();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -1220,6 +1268,8 @@ public final class ZooKeeperProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
rpcVersion_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
infoPort_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -1260,6 +1310,10 @@ public final class ZooKeeperProtos {
|
|||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
result.rpcVersion_ = rpcVersion_;
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.infoPort_ = infoPort_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -1282,6 +1336,9 @@ public final class ZooKeeperProtos {
|
|||
if (other.hasRpcVersion()) {
|
||||
setRpcVersion(other.getRpcVersion());
|
||||
}
|
||||
if (other.hasInfoPort()) {
|
||||
setInfoPort(other.getInfoPort());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -1519,6 +1576,39 @@ public final class ZooKeeperProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint32 info_port = 3;
|
||||
private int infoPort_ ;
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public boolean hasInfoPort() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public int getInfoPort() {
|
||||
return infoPort_;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public Builder setInfoPort(int value) {
|
||||
bitField0_ |= 0x00000004;
|
||||
infoPort_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint32 info_port = 3;</code>
|
||||
*/
|
||||
public Builder clearInfoPort() {
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
infoPort_ = 0;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:Master)
|
||||
}
|
||||
|
||||
|
@ -10508,40 +10598,41 @@ public final class ZooKeeperProtos {
|
|||
java.lang.String[] descriptorData = {
|
||||
"\n\017ZooKeeper.proto\032\013HBase.proto\"D\n\020MetaRe" +
|
||||
"gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerName\022" +
|
||||
"\023\n\013rpc_version\030\002 \001(\r\":\n\006Master\022\033\n\006master" +
|
||||
"\023\n\013rpc_version\030\002 \001(\r\"M\n\006Master\022\033\n\006master" +
|
||||
"\030\001 \002(\0132\013.ServerName\022\023\n\013rpc_version\030\002 \001(\r" +
|
||||
"\"\037\n\tClusterUp\022\022\n\nstart_date\030\001 \002(\t\"\210\001\n\020Re" +
|
||||
"gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" +
|
||||
"\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" +
|
||||
"\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" +
|
||||
"ayload\030\005 \001(\014\"\214\002\n\014SplitLogTask\022\"\n\005state\030\001" +
|
||||
" \002(\0162\023.SplitLogTask.State\022 \n\013server_name",
|
||||
"\030\002 \002(\0132\013.ServerName\0221\n\004mode\030\003 \001(\0162\032.Spli" +
|
||||
"tLogTask.RecoveryMode:\007UNKNOWN\"C\n\005State\022" +
|
||||
"\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002" +
|
||||
"\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">\n\014RecoveryMode\022\013\n\007U" +
|
||||
"NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
|
||||
"Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
|
||||
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
|
||||
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"\215\001\n\017" +
|
||||
"ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027r" +
|
||||
"eplicationEndpointImpl\030\002 \001(\t\022\035\n\004data\030\003 \003",
|
||||
"(\0132\017.BytesBytesPair\022&\n\rconfiguration\030\004 \003" +
|
||||
"(\0132\017.NameStringPair\"^\n\020ReplicationState\022" +
|
||||
"&\n\005state\030\001 \002(\0162\027.ReplicationState.State\"" +
|
||||
"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027R" +
|
||||
"eplicationHLogPosition\022\020\n\010position\030\001 \002(\003" +
|
||||
"\"%\n\017ReplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"" +
|
||||
"\230\001\n\tTableLock\022\036\n\ntable_name\030\001 \001(\0132\n.Tabl" +
|
||||
"eName\022\037\n\nlock_owner\030\002 \001(\0132\013.ServerName\022\021" +
|
||||
"\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007" +
|
||||
"purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017St",
|
||||
"oreSequenceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013se" +
|
||||
"quence_id\030\002 \002(\004\"g\n\026RegionStoreSequenceId" +
|
||||
"s\022 \n\030last_flushed_sequence_id\030\001 \002(\004\022+\n\021s" +
|
||||
"tore_sequence_id\030\002 \003(\0132\020.StoreSequenceId" +
|
||||
"BE\n*org.apache.hadoop.hbase.protobuf.gen" +
|
||||
"eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||
"\022\021\n\tinfo_port\030\003 \001(\r\"\037\n\tClusterUp\022\022\n\nstar" +
|
||||
"t_date\030\001 \002(\t\"\210\001\n\020RegionTransition\022\027\n\017eve" +
|
||||
"nt_type_code\030\001 \002(\r\022\023\n\013region_name\030\002 \002(\014\022" +
|
||||
"\023\n\013create_time\030\003 \002(\004\022 \n\013server_name\030\004 \002(" +
|
||||
"\0132\013.ServerName\022\017\n\007payload\030\005 \001(\014\"\214\002\n\014Spli" +
|
||||
"tLogTask\022\"\n\005state\030\001 \002(\0162\023.SplitLogTask.S",
|
||||
"tate\022 \n\013server_name\030\002 \002(\0132\013.ServerName\0221" +
|
||||
"\n\004mode\030\003 \001(\0162\032.SplitLogTask.RecoveryMode" +
|
||||
":\007UNKNOWN\"C\n\005State\022\016\n\nUNASSIGNED\020\000\022\t\n\005OW" +
|
||||
"NED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\">" +
|
||||
"\n\014RecoveryMode\022\013\n\007UNKNOWN\020\000\022\021\n\rLOG_SPLIT" +
|
||||
"TING\020\001\022\016\n\nLOG_REPLAY\020\002\"n\n\005Table\022$\n\005state" +
|
||||
"\030\001 \002(\0162\014.Table.State:\007ENABLED\"?\n\005State\022\013" +
|
||||
"\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\022\r\n\tDISABLING\020\002" +
|
||||
"\022\014\n\010ENABLING\020\003\"\215\001\n\017ReplicationPeer\022\022\n\ncl" +
|
||||
"usterkey\030\001 \002(\t\022\037\n\027replicationEndpointImp",
|
||||
"l\030\002 \001(\t\022\035\n\004data\030\003 \003(\0132\017.BytesBytesPair\022&" +
|
||||
"\n\rconfiguration\030\004 \003(\0132\017.NameStringPair\"^" +
|
||||
"\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Repl" +
|
||||
"icationState.State\"\"\n\005State\022\013\n\007ENABLED\020\000" +
|
||||
"\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositio" +
|
||||
"n\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022" +
|
||||
"\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntabl" +
|
||||
"e_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002" +
|
||||
" \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\t" +
|
||||
"is_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013crea",
|
||||
"te_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fami" +
|
||||
"ly_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Re" +
|
||||
"gionStoreSequenceIds\022 \n\030last_flushed_seq" +
|
||||
"uence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(" +
|
||||
"\0132\020.StoreSequenceIdBE\n*org.apache.hadoop" +
|
||||
".hbase.protobuf.generatedB\017ZooKeeperProt" +
|
||||
"osH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -10559,7 +10650,7 @@ public final class ZooKeeperProtos {
|
|||
internal_static_Master_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_Master_descriptor,
|
||||
new java.lang.String[] { "Master", "RpcVersion", });
|
||||
new java.lang.String[] { "Master", "RpcVersion", "InfoPort", });
|
||||
internal_static_ClusterUp_descriptor =
|
||||
getDescriptor().getMessageTypes().get(2);
|
||||
internal_static_ClusterUp_fieldAccessorTable = new
|
||||
|
|
|
@ -47,6 +47,7 @@ message Master {
|
|||
required ServerName master = 1;
|
||||
// Major RPC version so that clients can know what version the master can accept.
|
||||
optional uint32 rpc_version = 2;
|
||||
optional uint32 info_port = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,12 +28,11 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|||
</%import>
|
||||
<%java>
|
||||
Collection<ServerName> masters = null;
|
||||
|
||||
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
|
||||
if (master.isActiveMaster()) {
|
||||
ClusterStatus status = master.getClusterStatus();
|
||||
masters = status.getBackupMasters();
|
||||
} else{
|
||||
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
|
||||
ServerName sn = masterAddressTracker == null ? null
|
||||
: masterAddressTracker.getMasterAddress();
|
||||
assert sn != null : "Failed to retreive master's ServerName!";
|
||||
|
@ -43,7 +42,7 @@ if (master.isActiveMaster()) {
|
|||
|
||||
<%java>
|
||||
ServerName [] serverNames = masters.toArray(new ServerName[masters.size()]);
|
||||
int infoPort = master.getConfiguration().getInt("hbase.master.info.port", 16010);
|
||||
int infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getMasterInfoPort();
|
||||
</%java>
|
||||
<%if (!master.isActiveMaster()) %>
|
||||
<%if serverNames[0] != null %>
|
||||
|
@ -66,6 +65,7 @@ int infoPort = master.getConfiguration().getInt("hbase.master.info.port", 16010)
|
|||
<%java>
|
||||
Arrays.sort(serverNames);
|
||||
for (ServerName serverName : serverNames) {
|
||||
infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getBackupMasterInfoPort(serverName);
|
||||
</%java>
|
||||
<tr>
|
||||
<td><a href="//<% serverName.getHostname() %>:<%
|
||||
|
|
|
@ -45,10 +45,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|||
ServerInfo serverInfo = ProtobufUtil.getServerInfo(regionServer.getRSRpcServices());
|
||||
ServerName serverName = ProtobufUtil.toServerName(serverInfo.getServerName());
|
||||
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(regionServer.getRSRpcServices());
|
||||
int infoPort = regionServer.getConfiguration().getInt("hbase.master.info.port", 16010);
|
||||
MasterAddressTracker masterAddressTracker = regionServer.getMasterAddressTracker();
|
||||
ServerName masterServerName = masterAddressTracker == null ? null
|
||||
: masterAddressTracker.getMasterAddress();
|
||||
int infoPort = masterAddressTracker == null ? 0 : masterAddressTracker.getMasterInfoPort();
|
||||
</%java>
|
||||
<!--[if IE]>
|
||||
<!DOCTYPE html>
|
||||
|
|
|
@ -56,6 +56,7 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
|
||||
|
||||
private final ServerName sn;
|
||||
private final int infoPort;
|
||||
private final Server master;
|
||||
|
||||
/**
|
||||
|
@ -63,10 +64,11 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
* @param sn ServerName
|
||||
* @param master In an instance of a Master.
|
||||
*/
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) {
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, int infoPort, Server master) {
|
||||
super(watcher);
|
||||
watcher.registerListener(this);
|
||||
this.sn = sn;
|
||||
this.infoPort = infoPort;
|
||||
this.master = master;
|
||||
}
|
||||
|
||||
|
@ -156,7 +158,7 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
|||
// Write out our ServerName as versioned bytes.
|
||||
try {
|
||||
if (MasterAddressTracker.setMasterAddress(this.watcher,
|
||||
this.watcher.getMasterAddressZNode(), this.sn)) {
|
||||
this.watcher.getMasterAddressZNode(), this.sn, infoPort)) {
|
||||
|
||||
// If we were a backup master before, delete our ZNode from the backup
|
||||
// master directory since we are the active now)
|
||||
|
|
|
@ -235,6 +235,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
/** jetty server for master to redirect requests to regionserver infoServer */
|
||||
private org.mortbay.jetty.Server masterJettyServer;
|
||||
|
||||
private int masterInfoPort;
|
||||
public static class RedirectServlet extends HttpServlet {
|
||||
private static final long serialVersionUID = 2894774810058302472L;
|
||||
private static int regionServerInfoPort;
|
||||
|
@ -347,6 +348,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
} catch (Exception e) {
|
||||
throw new IOException("Failed to start redirecting jetty server", e);
|
||||
}
|
||||
masterInfoPort = connector.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1277,11 +1279,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
* this node for us since it is ephemeral.
|
||||
*/
|
||||
LOG.info("Adding backup master ZNode " + backupZNode);
|
||||
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName)) {
|
||||
if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
|
||||
serverName, masterInfoPort)) {
|
||||
LOG.warn("Failed create of " + backupZNode + " by " + serverName);
|
||||
}
|
||||
|
||||
activeMasterManager = new ActiveMasterManager(zooKeeper, serverName, this);
|
||||
activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
|
||||
masterInfoPort, this);
|
||||
// Start a thread to try to become the active master, so we won't block here
|
||||
Threads.setDaemonThreadRunning(new Thread(new Runnable() {
|
||||
public void run() {
|
||||
|
|
|
@ -258,9 +258,9 @@ public class TestActiveMasterManager {
|
|||
this.clusterStatusTracker =
|
||||
new ClusterStatusTracker(zk, this);
|
||||
clusterStatusTracker.start();
|
||||
|
||||
|
||||
this.activeMasterManager =
|
||||
new ActiveMasterManager(zk, master, this);
|
||||
new ActiveMasterManager(zk, master, 0, this);
|
||||
zk.registerListener(activeMasterManager);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -75,9 +76,10 @@ public class TestMasterAddressTracker {
|
|||
// Create the master node with a dummy address
|
||||
String host = "localhost";
|
||||
int port = 1234;
|
||||
int infoPort = 1235;
|
||||
ServerName sn = ServerName.valueOf(host, port, System.currentTimeMillis());
|
||||
LOG.info("Creating master node");
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getMasterAddressZNode(), sn);
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getMasterAddressZNode(), sn, infoPort);
|
||||
|
||||
// Wait for the node to be created
|
||||
LOG.info("Waiting for master address manager to be notified");
|
||||
|
@ -86,7 +88,7 @@ public class TestMasterAddressTracker {
|
|||
assertTrue(addressTracker.hasMaster());
|
||||
ServerName pulledAddress = addressTracker.getMasterAddress();
|
||||
assertTrue(pulledAddress.equals(sn));
|
||||
|
||||
assertEquals(infoPort, addressTracker.getMasterInfoPort());
|
||||
}
|
||||
|
||||
public static class NodeCreationListener extends ZooKeeperListener {
|
||||
|
|
|
@ -332,12 +332,12 @@ public class TestZooKeeperNodeTracker {
|
|||
Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
|
||||
|
||||
// Check that we don't delete if we're not supposed to
|
||||
ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn));
|
||||
ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0));
|
||||
MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString());
|
||||
Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
|
||||
|
||||
// Check that we delete when we're supposed to
|
||||
ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn));
|
||||
ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn, 0));
|
||||
MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
|
||||
Assert.assertTrue( ZKUtil.getData(zkw, nodeName)== null );
|
||||
|
||||
|
|
Loading…
Reference in New Issue