YARN-2768 Improved Yarn Registry service record structure (stevel)

This commit is contained in:
Steve Loughran 2014-11-06 20:21:25 +00:00
parent 0f9199fb07
commit 5924e74d55
17 changed files with 626 additions and 409 deletions

View File

@ -710,6 +710,8 @@ Release 2.6.0 - UNRELEASED
YARN-2677 registry punycoding of usernames doesn't fix all usernames to be YARN-2677 registry punycoding of usernames doesn't fix all usernames to be
DNS-valid (stevel) DNS-valid (stevel)
YARN-2768 Improved Yarn Registry service record structure (stevel)
--- ---
YARN-2598 GHS should show N/A instead of null for the inaccessible information YARN-2598 GHS should show N/A instead of null for the inaccessible information

View File

@ -24,6 +24,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -174,24 +175,22 @@ public int resolve(String [] args) {
ServiceRecord record = registry.resolve(argsList.get(1)); ServiceRecord record = registry.resolve(argsList.get(1));
for (Endpoint endpoint : record.external) { for (Endpoint endpoint : record.external) {
if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI)) sysout.println(" Endpoint(ProtocolType="
|| (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) {
sysout.print(" Endpoint(ProtocolType="
+ endpoint.protocolType + ", Api="
+ endpoint.api + "); Uris are: ");
} else {
sysout.print(" Endpoint(ProtocolType="
+ endpoint.protocolType + ", Api=" + endpoint.protocolType + ", Api="
+ endpoint.api + ");" + endpoint.api + ");"
+ " Addresses(AddressType=" + " Addresses(AddressType="
+ endpoint.addressType + ") are: "); + endpoint.addressType + ") are: ");
} for (Map<String, String> address : endpoint.addresses) {
for (List<String> a : endpoint.addresses) { sysout.println(" [ ");
sysout.print(a + " "); for (Map.Entry<String, String> entry : address.entrySet()) {
} sysout.println(" " + entry.getKey()
sysout.println(); + ": \"" + entry.getValue() + "\"");
} }
sysout.println(" ]");
}
sysout.println();
}
return 0; return 0;
} catch (Exception e) { } catch (Exception e) {
syserr.println(analyzeException("resolve", e, argsList)); syserr.println(analyzeException("resolve", e, argsList));

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.registry.client.binding; package org.apache.hadoop.registry.client.binding;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -45,8 +46,6 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/** /**
* Support for marshalling objects to and from JSON. * Support for marshalling objects to and from JSON.
@ -62,30 +61,30 @@ public class JsonSerDeser<T> {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class); private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class);
private static final String UTF_8 = "UTF-8"; private static final String UTF_8 = "UTF-8";
public static final String E_NO_SERVICE_RECORD = "No service record at path"; public static final String E_NO_DATA = "No data at path";
public static final String E_DATA_TOO_SHORT = "Data at path too short";
public static final String E_MISSING_MARKER_STRING =
"Missing marker string: ";
private final Class<T> classType; private final Class<T> classType;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final byte[] header;
/** /**
* Create an instance bound to a specific type * Create an instance bound to a specific type
* @param classType class to marshall * @param classType class to marshall
* @param header byte array to use as header
*/ */
public JsonSerDeser(Class<T> classType, byte[] header) { public JsonSerDeser(Class<T> classType) {
Preconditions.checkArgument(classType != null, "null classType"); Preconditions.checkArgument(classType != null, "null classType");
Preconditions.checkArgument(header != null, "null header");
this.classType = classType; this.classType = classType;
this.mapper = new ObjectMapper(); this.mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
false); false);
// make an immutable copy to keep findbugs happy.
byte[] h = new byte[header.length];
System.arraycopy(header, 0, h, 0, header.length);
this.header = h;
} }
/**
* Get the simple name of the class type to be marshalled
* @return the name of the class being marshalled
*/
public String getName() { public String getName() {
return classType.getSimpleName(); return classType.getSimpleName();
} }
@ -183,7 +182,7 @@ public T load(FileSystem fs, Path path)
if (count != len) { if (count != len) {
throw new EOFException(path.toString() + ": read finished prematurely"); throw new EOFException(path.toString() + ": read finished prematurely");
} }
return fromBytes(path.toString(), b, 0); return fromBytes(path.toString(), b);
} }
/** /**
@ -206,8 +205,7 @@ public void save(FileSystem fs, Path path, T instance,
* @throws IOException on any failure * @throws IOException on any failure
*/ */
private void writeJsonAsBytes(T instance, private void writeJsonAsBytes(T instance,
DataOutputStream dataOutputStream) throws DataOutputStream dataOutputStream) throws IOException {
IOException {
try { try {
byte[] b = toBytes(instance); byte[] b = toBytes(instance);
dataOutputStream.write(b); dataOutputStream.write(b);
@ -227,37 +225,51 @@ public byte[] toBytes(T instance) throws IOException {
return json.getBytes(UTF_8); return json.getBytes(UTF_8);
} }
/**
* Convert JSON To bytes, inserting the header
* @param instance instance to convert
* @return a byte array
* @throws IOException
*/
public byte[] toByteswithHeader(T instance) throws IOException {
byte[] body = toBytes(instance);
ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length);
buffer.put(header);
buffer.put(body);
return buffer.array();
}
/** /**
* Deserialize from a byte array * Deserialize from a byte array
* @param path path the data came from * @param path path the data came from
* @param bytes byte array * @param bytes byte array
* @return offset in the array to read from
* @throws IOException all problems * @throws IOException all problems
* @throws EOFException not enough data * @throws EOFException not enough data
* @throws InvalidRecordException if the parsing failed -the record is invalid * @throws InvalidRecordException if the parsing failed -the record is invalid
*/ */
public T fromBytes(String path, byte[] bytes, int offset) throws IOException, public T fromBytes(String path, byte[] bytes) throws IOException,
InvalidRecordException { InvalidRecordException {
int data = bytes.length - offset; return fromBytes(path, bytes, "");
if (data <= 0) { }
throw new EOFException("No data at " + path);
/**
* Deserialize from a byte array, optionally checking for a marker string.
* <p>
* If the marker parameter is supplied (and not empty), then its presence
* will be verified before the JSON parsing takes place; it is a fast-fail
* check. If not found, an {@link InvalidRecordException} exception will be
* raised
* @param path path the data came from
* @param bytes byte array
* @param marker an optional string which, if set, MUST be present in the
* UTF-8 parsed payload.
* @return The parsed record
* @throws IOException all problems
* @throws EOFException not enough data
* @throws InvalidRecordException if the JSON parsing failed.
* @throws NoRecordException if the data is not considered a record: either
* it is too short or it did not contain the marker string.
*/
public T fromBytes(String path, byte[] bytes, String marker)
throws IOException, NoRecordException, InvalidRecordException {
int len = bytes.length;
if (len == 0 ) {
throw new NoRecordException(path, E_NO_DATA);
}
if (StringUtils.isNotEmpty(marker) && len < marker.length()) {
throw new NoRecordException(path, E_DATA_TOO_SHORT);
}
String json = new String(bytes, 0, len, UTF_8);
if (StringUtils.isNotEmpty(marker)
&& !json.contains(marker)) {
throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker);
} }
String json = new String(bytes, offset, data, UTF_8);
try { try {
return fromJson(json); return fromJson(json);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
@ -266,52 +278,7 @@ public T fromBytes(String path, byte[] bytes, int offset) throws IOException,
} }
/** /**
* Read from a byte array to a type, checking the header first * Convert an instance to a JSON string
* @param path source of data
* @param buffer buffer
* @return the parsed structure
* Null if the record was too short or the header did not match
* @throws IOException on a failure
* @throws NoRecordException if header checks implied there was no record
* @throws InvalidRecordException if record parsing failed
*/
@SuppressWarnings("unchecked")
public T fromBytesWithHeader(String path, byte[] buffer) throws IOException {
int hlen = header.length;
int blen = buffer.length;
if (hlen > 0) {
if (blen < hlen) {
throw new NoRecordException(path, E_NO_SERVICE_RECORD);
}
byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
if (!Arrays.equals(header, magic)) {
LOG.debug("start of entry does not match service record header at {}",
path);
throw new NoRecordException(path, E_NO_SERVICE_RECORD);
}
}
return fromBytes(path, buffer, hlen);
}
/**
* Check if a buffer has a header which matches this record type
* @param buffer buffer
* @return true if there is a match
* @throws IOException
*/
public boolean headerMatches(byte[] buffer) throws IOException {
int hlen = header.length;
int blen = buffer.length;
boolean matches = false;
if (blen > hlen) {
byte[] magic = Arrays.copyOfRange(buffer, 0, hlen);
matches = Arrays.equals(header, magic);
}
return matches;
}
/**
* Convert an object to a JSON string
* @param instance instance to convert * @param instance instance to convert
* @return a JSON string description * @return a JSON string description
* @throws JsonParseException parse problems * @throws JsonParseException parse problems
@ -324,4 +291,19 @@ public synchronized String toJson(T instance) throws IOException,
return mapper.writeValueAsString(instance); return mapper.writeValueAsString(instance);
} }
/**
* Convert an instance to a string form for output. This is a robust
* operation which will convert any JSON-generating exceptions into
* error text.
* @param instance non-null instance
* @return a JSON string
*/
public String toString(T instance) {
Preconditions.checkArgument(instance != null, "Null instance argument");
try {
return toJson(instance);
} catch (IOException e) {
return "Failed to convert to a string: " + e;
}
}
} }

View File

@ -22,17 +22,19 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.types.AddressTypes; import static org.apache.hadoop.registry.client.types.AddressTypes.*;
import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes; import org.apache.hadoop.registry.client.types.ProtocolTypes;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Static methods to work with registry types primarily endpoints and the * Static methods to work with registry types primarily endpoints and the
@ -94,79 +96,66 @@ public static Endpoint inetAddrEndpoint(String api,
Preconditions.checkArgument(protocolType != null, "null protocolType"); Preconditions.checkArgument(protocolType != null, "null protocolType");
Preconditions.checkArgument(hostname != null, "null hostname"); Preconditions.checkArgument(hostname != null, "null hostname");
return new Endpoint(api, return new Endpoint(api,
AddressTypes.ADDRESS_HOSTNAME_AND_PORT, ADDRESS_HOSTNAME_AND_PORT,
protocolType, protocolType,
tuplelist(hostname, Integer.toString(port))); hostnamePortPair(hostname, port));
} }
/** /**
* Create an IPC endpoint * Create an IPC endpoint
* @param api API * @param api API
* @param protobuf flag to indicate whether or not the IPC uses protocol
* buffers
* @param address the address as a tuple of (hostname, port) * @param address the address as a tuple of (hostname, port)
* @return the new endpoint * @return the new endpoint
*/ */
public static Endpoint ipcEndpoint(String api, public static Endpoint ipcEndpoint(String api, InetSocketAddress address) {
boolean protobuf, List<String> address) {
ArrayList<List<String>> addressList = new ArrayList<List<String>>();
if (address != null) {
addressList.add(address);
}
return new Endpoint(api, return new Endpoint(api,
AddressTypes.ADDRESS_HOSTNAME_AND_PORT, ADDRESS_HOSTNAME_AND_PORT,
protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF ProtocolTypes.PROTOCOL_HADOOP_IPC,
: ProtocolTypes.PROTOCOL_HADOOP_IPC, address== null ? null: hostnamePortPair(address));
addressList);
} }
/** /**
* Create a single-element list of tuples from the input. * Create a single entry map
* that is, an input ("a","b","c") is converted into a list * @param key map entry key
* in the form [["a","b","c"]] * @param val map entry value
* @param t1 tuple elements * @return a 1 entry map.
* @return a list containing a single tuple
*/ */
public static List<List<String>> tuplelist(String... t1) { public static Map<String, String> map(String key, String val) {
List<List<String>> outer = new ArrayList<List<String>>(); Map<String, String> map = new HashMap<String, String>(1);
outer.add(tuple(t1)); map.put(key, val);
return outer; return map;
} }
/** /**
* Create a tuples from the input. * Create a URI
* that is, an input ("a","b","c") is converted into a list * @param uri value
* in the form ["a","b","c"] * @return a 1 entry map.
* @param t1 tuple elements
* @return a single tuple as a list
*/ */
public static List<String> tuple(String... t1) { public static Map<String, String> uri(String uri) {
return Arrays.asList(t1); return map(ADDRESS_URI, uri);
} }
/** /**
* Create a tuples from the input, converting all to Strings in the process * Create a (hostname, port) address pair
* that is, an input ("a", 7, true) is converted into a list * @param hostname hostname
* in the form ["a","7,"true"] * @param port port
* @param t1 tuple elements * @return a 1 entry map.
* @return a single tuple as a list
*/ */
public static List<String> tuple(Object... t1) { public static Map<String, String> hostnamePortPair(String hostname, int port) {
List<String> l = new ArrayList<String>(t1.length); Map<String, String> map =
for (Object t : t1) { map(ADDRESS_HOSTNAME_FIELD, hostname);
l.add(t.toString()); map.put(ADDRESS_PORT_FIELD, Integer.toString(port));
} return map;
return l;
} }
/** /**
* Convert a socket address pair into a string tuple, (host, port). * Create a (hostname, port) address pair
* TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups. * @param address socket address whose hostname and port are used for the
* @param address an address * generated address.
* @return an element for the address list * @return a 1 entry map.
*/ */
public static List<String> marshall(InetSocketAddress address) { public static Map<String, String> hostnamePortPair(InetSocketAddress address) {
return tuple(address.getHostName(), address.getPort()); return hostnamePortPair(address.getHostName(), address.getPort());
} }
/** /**
@ -199,24 +188,36 @@ public static List<String> retrieveAddressesUriType(Endpoint epr)
if (epr == null) { if (epr == null) {
return null; return null;
} }
requireAddressType(AddressTypes.ADDRESS_URI, epr); requireAddressType(ADDRESS_URI, epr);
List<List<String>> addresses = epr.addresses; List<Map<String, String>> addresses = epr.addresses;
if (addresses.size() < 1) { if (addresses.size() < 1) {
throw new InvalidRecordException(epr.toString(), throw new InvalidRecordException(epr.toString(),
"No addresses in endpoint"); "No addresses in endpoint");
} }
List<String> results = new ArrayList<String>(addresses.size()); List<String> results = new ArrayList<String>(addresses.size());
for (List<String> address : addresses) { for (Map<String, String> address : addresses) {
if (address.size() != 1) { results.add(getAddressField(address, ADDRESS_URI));
throw new InvalidRecordException(epr.toString(),
"Address payload invalid: wrong element count: " +
address.size());
}
results.add(address.get(0));
} }
return results; return results;
} }
/**
* Get a specific field from an address -raising an exception if
* the field is not present
* @param address address to query
* @param field field to resolve
* @return the resolved value. Guaranteed to be non-null.
* @throws InvalidRecordException if the field did not resolve
*/
public static String getAddressField(Map<String, String> address,
String field) throws InvalidRecordException {
String val = address.get(field);
if (val == null) {
throw new InvalidRecordException("", "Missing address field: " + field);
}
return val;
}
/** /**
* Get the address URLs. Guranteed to return at least one address. * Get the address URLs. Guranteed to return at least one address.
* @param epr endpoint * @param epr endpoint
@ -237,4 +238,53 @@ public static List<URL> retrieveAddressURLs(Endpoint epr)
} }
return results; return results;
} }
/**
* Validate the record by checking for null fields and other invalid
* conditions
* @param path path for exceptions
* @param record record to validate. May be null
* @throws InvalidRecordException on invalid entries
*/
public static void validateServiceRecord(String path, ServiceRecord record)
throws InvalidRecordException {
if (record == null) {
throw new InvalidRecordException(path, "Null record");
}
if (!ServiceRecord.RECORD_TYPE.equals(record.type)) {
throw new InvalidRecordException(path,
"invalid record type field: \"" + record.type + "\"");
}
if (record.external != null) {
for (Endpoint endpoint : record.external) {
validateEndpoint(path, endpoint);
}
}
if (record.internal != null) {
for (Endpoint endpoint : record.internal) {
validateEndpoint(path, endpoint);
}
}
}
/**
* Validate the endpoint by checking for null fields and other invalid
* conditions
* @param path path for exceptions
* @param endpoint endpoint to validate. May be null
* @throws InvalidRecordException on invalid entries
*/
public static void validateEndpoint(String path, Endpoint endpoint)
throws InvalidRecordException {
if (endpoint == null) {
throw new InvalidRecordException(path, "Null endpoint");
}
try {
endpoint.validate();
} catch (RuntimeException e) {
throw new InvalidRecordException(path, e.toString());
}
}
} }

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -314,7 +313,7 @@ public static Map<String, ServiceRecord> extractServiceRecords(
Collection<RegistryPathStatus> stats) throws IOException { Collection<RegistryPathStatus> stats) throws IOException {
Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size()); Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
for (RegistryPathStatus stat : stats) { for (RegistryPathStatus stat : stats) {
if (stat.size > ServiceRecordHeader.getLength()) { if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
// maybe has data // maybe has data
String path = join(parentpath, stat.path); String path = join(parentpath, stat.path);
try { try {
@ -344,7 +343,6 @@ public static Map<String, ServiceRecord> extractServiceRecords(
* <p> * <p>
* @param operations operation support for fetches * @param operations operation support for fetches
* @param parentpath path of the parent of all the entries * @param parentpath path of the parent of all the entries
* @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record. * @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored. * @throws IOException for any IO Operation that wasn't ignored.
*/ */
@ -362,7 +360,6 @@ public static Map<String, ServiceRecord> extractServiceRecords(
* <p> * <p>
* @param operations operation support for fetches * @param operations operation support for fetches
* @param parentpath path of the parent of all the entries * @param parentpath path of the parent of all the entries
* @param stats a map of name:value mappings.
* @return a possibly empty map of fullpath:record. * @return a possibly empty map of fullpath:record.
* @throws IOException for any IO Operation that wasn't ignored. * @throws IOException for any IO Operation that wasn't ignored.
*/ */
@ -382,7 +379,7 @@ public static Map<String, ServiceRecord> extractServiceRecords(
*/ */
public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> { public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
public ServiceRecordMarshal() { public ServiceRecordMarshal() {
super(ServiceRecord.class, ServiceRecordHeader.getData()); super(ServiceRecord.class);
} }
} }
} }

View File

@ -21,17 +21,11 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
/** /**
* Raised if there is no {@link ServiceRecord} resolved at the end * Raised if there is no {@link ServiceRecord} resolved at the end
* of the specified path, for reasons such as: * of the specified path.
* <ul> * <p>
* <li>There wasn't enough data to contain a Service Record.</li>
* <li>The start of the data did not match the {@link ServiceRecordHeader}
* header.</li>
* </ul>
*
* There may be valid data of some form at the end of the path, but it does * There may be valid data of some form at the end of the path, but it does
* not appear to be a Service Record. * not appear to be a Service Record.
*/ */

View File

@ -24,9 +24,11 @@
import org.apache.hadoop.registry.client.api.BindFlags; import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
@ -103,10 +105,12 @@ public void bind(String path,
int flags) throws IOException { int flags) throws IOException {
Preconditions.checkArgument(record != null, "null record"); Preconditions.checkArgument(record != null, "null record");
validatePath(path); validatePath(path);
// validate the record before putting it
RegistryTypeUtils.validateServiceRecord(path, record);
LOG.info("Bound at {} : {}", path, record); LOG.info("Bound at {} : {}", path, record);
CreateMode mode = CreateMode.PERSISTENT; CreateMode mode = CreateMode.PERSISTENT;
byte[] bytes = serviceRecordMarshal.toByteswithHeader(record); byte[] bytes = serviceRecordMarshal.toBytes(record);
zkSet(path, mode, bytes, getClientAcls(), zkSet(path, mode, bytes, getClientAcls(),
((flags & BindFlags.OVERWRITE) != 0)); ((flags & BindFlags.OVERWRITE) != 0));
} }
@ -114,7 +118,11 @@ public void bind(String path,
@Override @Override
public ServiceRecord resolve(String path) throws IOException { public ServiceRecord resolve(String path) throws IOException {
byte[] bytes = zkRead(path); byte[] bytes = zkRead(path);
return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
ServiceRecord record = serviceRecordMarshal.fromBytes(path,
bytes, ServiceRecord.RECORD_TYPE);
RegistryTypeUtils.validateServiceRecord(path, record);
return record;
} }
@Override @Override

View File

@ -38,6 +38,8 @@ public interface AddressTypes {
* </pre> * </pre>
*/ */
public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port"; public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
public static final String ADDRESS_HOSTNAME_FIELD = "host";
public static final String ADDRESS_PORT_FIELD = "port";
/** /**

View File

@ -21,14 +21,16 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.binding.JsonSerDeser;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize; import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Description of a single service/component endpoint. * Description of a single service/component endpoint.
@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable {
/** /**
* a list of address tuples tuples whose format depends on the address type * a list of address tuples tuples whose format depends on the address type
*/ */
public List<List<String>> addresses; public List<Map<String, String>> addresses;
/** /**
* Create an empty instance. * Create an empty instance.
@ -84,10 +86,11 @@ public Endpoint(Endpoint that) {
this.api = that.api; this.api = that.api;
this.addressType = that.addressType; this.addressType = that.addressType;
this.protocolType = that.protocolType; this.protocolType = that.protocolType;
this.addresses = new ArrayList<List<String>>(that.addresses.size()); this.addresses = newAddresses(that.addresses.size());
for (List<String> address : addresses) { for (Map<String, String> address : that.addresses) {
List<String> addr2 = new ArrayList<String>(address.size()); Map<String, String> addr2 = new HashMap<String, String>(address.size());
Collections.copy(address, addr2); addr2.putAll(address);
addresses.add(addr2);
} }
} }
@ -101,16 +104,82 @@ public Endpoint(Endpoint that) {
public Endpoint(String api, public Endpoint(String api,
String addressType, String addressType,
String protocolType, String protocolType,
List<List<String>> addrs) { List<Map<String, String>> addrs) {
this.api = api; this.api = api;
this.addressType = addressType; this.addressType = addressType;
this.protocolType = protocolType; this.protocolType = protocolType;
this.addresses = new ArrayList<List<String>>(); this.addresses = newAddresses(0);
if (addrs != null) { if (addrs != null) {
addresses.addAll(addrs); addresses.addAll(addrs);
} }
} }
/**
* Build an endpoint with an empty address list
* @param api API name
* @param addressType address type
* @param protocolType protocol type
*/
public Endpoint(String api,
String addressType,
String protocolType) {
this.api = api;
this.addressType = addressType;
this.protocolType = protocolType;
this.addresses = newAddresses(0);
}
/**
* Build an endpoint with a single address entry.
* <p>
* This constructor is superfluous given the varags constructor is equivalent
* for a single element argument. However, type-erasure in java generics
* causes javac to warn about unchecked generic array creation. This
* constructor, which represents the common "one address" case, does
* not generate compile-time warnings.
* @param api API name
* @param addressType address type
* @param protocolType protocol type
* @param addr address. May be null in which case it is not added
*/
public Endpoint(String api,
String addressType,
String protocolType,
Map<String, String> addr) {
this(api, addressType, protocolType);
if (addr != null) {
addresses.add(addr);
}
}
/**
* Build an endpoint with a list of addresses
* @param api API name
* @param addressType address type
* @param protocolType protocol type
* @param addrs addresses. Null elements will be skipped
*/
public Endpoint(String api,
String addressType,
String protocolType,
Map<String, String>...addrs) {
this(api, addressType, protocolType);
for (Map<String, String> addr : addrs) {
if (addr!=null) {
addresses.add(addr);
}
}
}
/**
* Create a new address structure of the requested size
* @param size size to create
* @return the new list
*/
private List<Map<String, String>> newAddresses(int size) {
return new ArrayList<Map<String, String>>(size);
}
/** /**
* Build an endpoint from a list of URIs; each URI * Build an endpoint from a list of URIs; each URI
* is ASCII-encoded and added to the list of addresses. * is ASCII-encoded and added to the list of addresses.
@ -125,40 +194,16 @@ public Endpoint(String api,
this.addressType = AddressTypes.ADDRESS_URI; this.addressType = AddressTypes.ADDRESS_URI;
this.protocolType = protocolType; this.protocolType = protocolType;
List<List<String>> addrs = new ArrayList<List<String>>(uris.length); List<Map<String, String>> addrs = newAddresses(uris.length);
for (URI uri : uris) { for (URI uri : uris) {
addrs.add(RegistryTypeUtils.tuple(uri.toString())); addrs.add(RegistryTypeUtils.uri(uri.toString()));
} }
this.addresses = addrs; this.addresses = addrs;
} }
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("Endpoint{"); return marshalToString.toString(this);
sb.append("api='").append(api).append('\'');
sb.append(", addressType='").append(addressType).append('\'');
sb.append(", protocolType='").append(protocolType).append('\'');
sb.append(", addresses=");
if (addresses != null) {
sb.append("[ ");
for (List<String> address : addresses) {
sb.append("[ ");
if (address == null) {
sb.append("NULL entry in address list");
} else {
for (String elt : address) {
sb.append('"').append(elt).append("\" ");
}
}
sb.append("] ");
};
sb.append("] ");
} else {
sb.append("(null) ");
}
sb.append('}');
return sb.toString();
} }
/** /**
@ -173,7 +218,7 @@ public void validate() {
Preconditions.checkNotNull(addressType, "null addressType field"); Preconditions.checkNotNull(addressType, "null addressType field");
Preconditions.checkNotNull(protocolType, "null protocolType field"); Preconditions.checkNotNull(protocolType, "null protocolType field");
Preconditions.checkNotNull(addresses, "null addresses field"); Preconditions.checkNotNull(addresses, "null addresses field");
for (List<String> address : addresses) { for (Map<String, String> address : addresses) {
Preconditions.checkNotNull(address, "null element in address"); Preconditions.checkNotNull(address, "null element in address");
} }
} }
@ -184,7 +229,19 @@ public void validate() {
* @throws CloneNotSupportedException * @throws CloneNotSupportedException
*/ */
@Override @Override
protected Object clone() throws CloneNotSupportedException { public Object clone() throws CloneNotSupportedException {
return super.clone(); return super.clone();
} }
/**
* Static instance of service record marshalling
*/
private static class Marshal extends JsonSerDeser<Endpoint> {
private Marshal() {
super(Endpoint.class);
}
}
private static final Marshal marshalToString = new Marshal();
} }

View File

@ -34,15 +34,10 @@ public interface ProtocolTypes {
String PROTOCOL_FILESYSTEM = "hadoop/filesystem"; String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
/** /**
* Classic Hadoop IPC : {@value}. * Hadoop IPC, "classic" or protobuf : {@value}.
*/ */
String PROTOCOL_HADOOP_IPC = "hadoop/IPC"; String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
/**
* Hadoop protocol buffers IPC: {@value}.
*/
String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
/** /**
* Corba IIOP: {@value}. * Corba IIOP: {@value}.
*/ */

View File

@ -21,6 +21,7 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.codehaus.jackson.annotate.JsonAnyGetter; import org.codehaus.jackson.annotate.JsonAnyGetter;
import org.codehaus.jackson.annotate.JsonAnySetter; import org.codehaus.jackson.annotate.JsonAnySetter;
import org.codehaus.jackson.map.annotate.JsonSerialize; import org.codehaus.jackson.map.annotate.JsonSerialize;
@ -40,6 +41,17 @@
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class ServiceRecord implements Cloneable { public class ServiceRecord implements Cloneable {
/**
* A type string which MUST be in the serialized json. This permits
* fast discarding of invalid entries
*/
public static final String RECORD_TYPE = "JSONServiceRecord";
/**
* The type field. This must be the string {@link #RECORD_TYPE}
*/
public String type = RECORD_TYPE;
/** /**
* Description string * Description string
*/ */
@ -233,17 +245,5 @@ protected Object clone() throws CloneNotSupportedException {
return super.clone(); return super.clone();
} }
/**
* Validate the record by checking for null fields and other invalid
* conditions
* @throws NullPointerException if a field is null when it
* MUST be set.
* @throws RuntimeException on invalid entries
*/
public void validate() {
for (Endpoint endpoint : external) {
Preconditions.checkNotNull("null endpoint", endpoint);
endpoint.validate();
}
}
} }

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.types;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Service record header; access to the byte array kept private
* to avoid findbugs warnings of mutability
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServiceRecordHeader {
/**
* Header of a service record: "jsonservicerec"
* By making this over 12 bytes long, we can auto-determine which entries
* in a listing are too short to contain a record without getting their data
*/
private static final byte[] RECORD_HEADER = {
'j', 's', 'o', 'n',
's', 'e', 'r', 'v', 'i', 'c', 'e',
'r', 'e', 'c'
};
/**
* Get the length of the record header
* @return the header length
*/
public static int getLength() {
return RECORD_HEADER.length;
}
/**
* Get a clone of the record header
* @return the new record header.
*/
public static byte[] getData() {
byte[] h = new byte[RECORD_HEADER.length];
System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
return h;
}
}

View File

@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
(* (*
============================================================================
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
============================================================================
*) *)
(* (*
@ -71,13 +73,22 @@ CONSTANTS
MknodeActions \* all possible mkdir actions MknodeActions \* all possible mkdir actions
ASSUME PathChars \in STRING
ASSUME Paths \in STRING
(* Data in records is JSON, hence a string *)
ASSUME Data \in STRING
----------------------------------------------------------------------------------------
(* the registry*) (* the registry*)
VARIABLE registry VARIABLE registry
(* Sequence of actions to apply to the registry *) (* Sequence of actions to apply to the registry *)
VARIABLE actions VARIABLE actions
---------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------
(* Tuple of all variables. *) (* Tuple of all variables. *)
@ -92,7 +103,6 @@ vars == << registry, actions >>
(* Persistence policy *) (* Persistence policy *)
PersistPolicySet == { PersistPolicySet == {
"", \* Undefined; field not present. PERMANENT is implied.
"permanent", \* persists until explicitly removed "permanent", \* persists until explicitly removed
"application", \* persists until the application finishes "application", \* persists until the application finishes
"application-attempt", \* persists until the application attempt finishes "application-attempt", \* persists until the application attempt finishes
@ -104,7 +114,6 @@ TypeInvariant ==
/\ \A p \in PersistPolicies: p \in PersistPolicySet /\ \A p \in PersistPolicies: p \in PersistPolicySet
---------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------
@ -129,6 +138,14 @@ RegistryEntry == [
] ]
(* Define the set of all string to string mappings *)
StringMap == [
STRING |-> STRING
]
(* (*
An endpoint in a service record An endpoint in a service record
*) *)
@ -140,21 +157,14 @@ Endpoint == [
addresses: Addresses addresses: Addresses
] ]
(* Attributes are the set of all string to string mappings *)
Attributes == [
STRING |-> STRING
]
(* (*
A service record A service record
*) *)
ServiceRecord == [ ServiceRecord == [
\* ID -used when applying the persistence policy
yarn_id: STRING,
\* the persistence policy \* This MUST be present: if it is not then the data is not a service record
yarn_persistence: PersistPolicySet, \* This permits shortcut scan & reject of byte arrays without parsing
type: "JSONServiceRecord",
\*A description \*A description
description: STRING, description: STRING,
@ -166,9 +176,34 @@ ServiceRecord == [
internal: Endpoints, internal: Endpoints,
\* Attributes are a function \* Attributes are a function
attributes: Attributes attributes: StringMap
] ]
----------------------------------------------------------------------------------------
(*
There is an operation serialize whose internals are not defined,
Which converts the service records to JSON
*)
CONSTANT serialize(_)
(* A function which returns true iff the byte stream is considered a valid service record. *)
CONSTANT containsServiceRecord(_)
(* A function to deserialize a string to JSON *)
CONSTANT deserialize(_)
ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN
(* Records can be serialized *)
ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r))
(* All strings for which containsServiceRecord() holds can be deserialized *)
ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord
---------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------
@ -304,8 +339,8 @@ validRegistry(R) ==
\* an entry must be the root entry or have a parent entry \* an entry must be the root entry or have a parent entry
/\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path)) /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
\* If the entry has data, it must be a service record \* If the entry has data, it must contain a service record
/\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords) /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data))
---------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------
@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff
*) *)
mknodeSimple(R, path) == mknodeSimple(R, path) ==
LET record == [ path |-> path, data |-> <<>> ] LET entry == [ path |-> path, data |-> <<>> ]
IN \/ exists(R, path) IN \/ exists(R, path)
\/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} )) \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} ))
(* (*
For all parents, the mknodeSimpl() criteria must apply. For all parents, the mknodeSimple() criteria must apply.
This could be defined recursively, though as TLA+ does not support recursion, This could be defined recursively, though as TLA+ does not support recursion,
an alternative is required an alternative is required
@ -350,7 +385,8 @@ an alternative is required
Because this specification is declaring the final state of a operation, not Because this specification is declaring the final state of a operation, not
the implemental, all that is needed is to describe those parents. the implemental, all that is needed is to describe those parents.
It declares that the mkdirSimple state applies to the path and all its parents in the set R' It declares that the mknodeSimple() state applies to the path and all
its parents in the set R'
*) *)
mknodeWithParents(R, path) == mknodeWithParents(R, path) ==
@ -402,7 +438,7 @@ purge(R, path, id, persistence) ==
=> recursiveDelete(R, p2.path) => recursiveDelete(R, p2.path)
(* (*
resolveRecord() resolves the record at a path or fails. resolveEntry() resolves the record entry at a path or fails.
It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
is guaranteed to return the single entry of that set, iff the choice predicate holds. is guaranteed to return the single entry of that set, iff the choice predicate holds.
@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects
the sole entry of the resolve operation. the sole entry of the resolve operation.
*) *)
resolveRecord(R, path) == resolveEntry(R, path) ==
LET l == resolve(R, path) IN LET l == resolve(R, path) IN
/\ Cardinality(l) = 1 /\ Cardinality(l) = 1
/\ CHOOSE e \in l : TRUE /\ CHOOSE e \in l : TRUE
(*
Resolve a record by resolving the entry and deserializing the result
*)
resolveRecord(R, path) ==
deserialize(resolveEntry(R, path))
(* (*
The specific action of putting an entry into a record includes validating the record The specific action of putting an entry into a record includes validating the record
*) *)
validRecordToBind(path, record) == validRecordToBind(path, record) ==
\* The root entry must have permanent persistence \* The root entry must have permanent persistence
isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent" isRootPath(path) => (
\/ record.attributes["yarn:persistence"] = "") record.attributes["yarn:persistence"] = "permanent"
\/ record.attributes["yarn:persistence"]
\/ record.attributes["yarn:persistence"] = {})
(* (*
@ -432,13 +477,12 @@ marshalled as the data in the entry
*) *)
bindRecord(R, path, record) == bindRecord(R, path, record) ==
/\ validRecordToBind(path, record) /\ validRecordToBind(path, record)
/\ bind(R, [path |-> path, data |-> record]) /\ bind(R, [path |-> path, data |-> serialize(record)])
---------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------
(* (*
The action queue can only contain one of the sets of action types, and The action queue can only contain one of the sets of action types, and
by giving each a unique name, those sets are guaranteed to be disjoint by giving each a unique name, those sets are guaranteed to be disjoint

View File

@ -20,7 +20,6 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@ -46,11 +45,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint; import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*;
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
/** /**
* This is a set of static methods to aid testing the registry operations. * This is a set of static methods to aid testing the registry operations.
@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert {
public static final String SC_HADOOP = "org-apache-hadoop"; public static final String SC_HADOOP = "org-apache-hadoop";
public static final String USER = "devteam/"; public static final String USER = "devteam/";
public static final String NAME = "hdfs"; public static final String NAME = "hdfs";
public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs"; public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs";
public static final String API_HDFS = "org_apache_hadoop_namenode_dfs"; public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs";
public static final String USERPATH = RegistryConstants.PATH_USERS + USER; public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/"; public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
public static final String ENTRY_PATH = PARENT_PATH + NAME; public static final String ENTRY_PATH = PARENT_PATH + NAME;
public static final String NNIPC = "nnipc"; public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437";
public static final String IPC2 = "IPC2"; public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF";
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RegistryTestHelper.class); LoggerFactory.getLogger(RegistryTestHelper.class);
public static final String KTUTIL = "ktutil";
private static final RegistryUtils.ServiceRecordMarshal recordMarshal = private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
new RegistryUtils.ServiceRecordMarshal(); new RegistryUtils.ServiceRecordMarshal();
public static final String HTTP_API = "http://";
/** /**
* Assert the path is valid by ZK rules * Assert the path is valid by ZK rules
@ -148,9 +143,9 @@ public static void validateEntry(ServiceRecord record) {
assertEquals(API_WEBHDFS, webhdfs.api); assertEquals(API_WEBHDFS, webhdfs.api);
assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType); assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType); assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
List<List<String>> addressList = webhdfs.addresses; List<Map<String, String>> addressList = webhdfs.addresses;
List<String> url = addressList.get(0); Map<String, String> url = addressList.get(0);
String addr = url.get(0); String addr = url.get("uri");
assertTrue(addr.contains("http")); assertTrue(addr.contains("http"));
assertTrue(addr.contains(":8020")); assertTrue(addr.contains(":8020"));
@ -159,8 +154,9 @@ public static void validateEntry(ServiceRecord record) {
nnipc.protocolType); nnipc.protocolType);
Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2); Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
assertNotNull(ipc2);
Endpoint web = findEndpoint(record, "web", true, 1, 1); Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1);
assertEquals(1, web.addresses.size()); assertEquals(1, web.addresses.size());
assertEquals(1, web.addresses.get(0).size()); assertEquals(1, web.addresses.get(0).size());
} }
@ -275,14 +271,14 @@ public static ServiceRecord buildExampleServiceEntry(String persistence) throws
public static void addSampleEndpoints(ServiceRecord entry, String hostname) public static void addSampleEndpoints(ServiceRecord entry, String hostname)
throws URISyntaxException { throws URISyntaxException {
assertNotNull(hostname); assertNotNull(hostname);
entry.addExternalEndpoint(webEndpoint("web", entry.addExternalEndpoint(webEndpoint(HTTP_API,
new URI("http", hostname + ":80", "/"))); new URI("http", hostname + ":80", "/")));
entry.addExternalEndpoint( entry.addExternalEndpoint(
restEndpoint(API_WEBHDFS, restEndpoint(API_WEBHDFS,
new URI("http", hostname + ":8020", "/"))); new URI("http", hostname + ":8020", "/")));
Endpoint endpoint = ipcEndpoint(API_HDFS, true, null); Endpoint endpoint = ipcEndpoint(API_HDFS, null);
endpoint.addresses.add(tuple(hostname, "8030")); endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030));
entry.addInternalEndpoint(endpoint); entry.addInternalEndpoint(endpoint);
InetSocketAddress localhost = new InetSocketAddress("localhost", 8050); InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
entry.addInternalEndpoint( entry.addInternalEndpoint(
@ -290,9 +286,7 @@ public static void addSampleEndpoints(ServiceRecord entry, String hostname)
8050)); 8050));
entry.addInternalEndpoint( entry.addInternalEndpoint(
RegistryTypeUtils.ipcEndpoint( RegistryTypeUtils.ipcEndpoint(
IPC2, IPC2, localhost));
true,
RegistryTypeUtils.marshall(localhost)));
} }
/** /**

View File

@ -19,9 +19,9 @@
package org.apache.hadoop.registry.client.binding; package org.apache.hadoop.registry.client.binding;
import org.apache.hadoop.registry.RegistryTestHelper; import org.apache.hadoop.registry.RegistryTestHelper;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException; import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
@ -31,8 +31,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.EOFException;
/** /**
* Test record marshalling * Test record marshalling
*/ */
@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper {
public final Timeout testTimeout = new Timeout(10000); public final Timeout testTimeout = new Timeout(10000);
@Rule @Rule
public TestName methodName = new TestName(); public TestName methodName = new TestName();
private static RegistryUtils.ServiceRecordMarshal marshal; private static RegistryUtils.ServiceRecordMarshal marshal;
@BeforeClass @BeforeClass
@ -55,42 +54,65 @@ public static void setupClass() {
public void testRoundTrip() throws Throwable { public void testRoundTrip() throws Throwable {
String persistence = PersistencePolicies.PERMANENT; String persistence = PersistencePolicies.PERMANENT;
ServiceRecord record = createRecord(persistence); ServiceRecord record = createRecord(persistence);
record.set("customkey","customvalue"); record.set("customkey", "customvalue");
record.set("customkey2","customvalue2"); record.set("customkey2", "customvalue2");
RegistryTypeUtils.validateServiceRecord("", record);
LOG.info(marshal.toJson(record)); LOG.info(marshal.toJson(record));
byte[] bytes = marshal.toBytes(record); byte[] bytes = marshal.toBytes(record);
ServiceRecord r2 = marshal.fromBytes("", bytes, 0); ServiceRecord r2 = marshal.fromBytes("", bytes);
assertMatches(record, r2); assertMatches(record, r2);
RegistryTypeUtils.validateServiceRecord("", r2);
} }
@Test
public void testRoundTripHeaders() throws Throwable {
ServiceRecord record = createRecord(PersistencePolicies.CONTAINER);
byte[] bytes = marshal.toByteswithHeader(record);
ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes);
assertMatches(record, r2);
@Test(expected = NoRecordException.class)
public void testUnmarshallNoData() throws Throwable {
marshal.fromBytes("src", new byte[]{});
} }
@Test(expected = NoRecordException.class) @Test(expected = NoRecordException.class)
public void testRoundTripBadHeaders() throws Throwable { public void testUnmarshallNotEnoughData() throws Throwable {
ServiceRecord record = createRecord(PersistencePolicies.APPLICATION); // this is nominally JSON -but without the service record header
byte[] bytes = marshal.toByteswithHeader(record); marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE);
bytes[1] = 0x01;
marshal.fromBytesWithHeader("src", bytes);
} }
@Test(expected = NoRecordException.class) @Test(expected = InvalidRecordException.class)
public void testUnmarshallHeaderTooShort() throws Throwable {
marshal.fromBytesWithHeader("src", new byte[]{'a'});
}
@Test(expected = EOFException.class)
public void testUnmarshallNoBody() throws Throwable { public void testUnmarshallNoBody() throws Throwable {
byte[] bytes = ServiceRecordHeader.getData(); byte[] bytes = "this is not valid JSON at all and should fail".getBytes();
marshal.fromBytesWithHeader("src", bytes); marshal.fromBytes("src", bytes);
} }
@Test(expected = InvalidRecordException.class)
public void testUnmarshallWrongType() throws Throwable {
byte[] bytes = "{'type':''}".getBytes();
ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes);
RegistryTypeUtils.validateServiceRecord("validating", serviceRecord);
}
@Test(expected = NoRecordException.class)
public void testUnmarshallWrongLongType() throws Throwable {
ServiceRecord record = new ServiceRecord();
record.type = "ThisRecordHasALongButNonMatchingType";
byte[] bytes = marshal.toBytes(record);
ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
bytes, ServiceRecord.RECORD_TYPE);
}
@Test(expected = NoRecordException.class)
public void testUnmarshallNoType() throws Throwable {
ServiceRecord record = new ServiceRecord();
record.type = "NoRecord";
byte[] bytes = marshal.toBytes(record);
ServiceRecord serviceRecord = marshal.fromBytes("marshalling",
bytes, ServiceRecord.RECORD_TYPE);
}
@Test(expected = InvalidRecordException.class)
public void testRecordValidationWrongType() throws Throwable {
ServiceRecord record = new ServiceRecord();
record.type = "NotAServiceRecordType";
RegistryTypeUtils.validateServiceRecord("validating", record);
}
@Test @Test
public void testUnknownFieldsRoundTrip() throws Throwable { public void testUnknownFieldsRoundTrip() throws Throwable {
@ -102,8 +124,8 @@ public void testUnknownFieldsRoundTrip() throws Throwable {
assertEquals("2", record.get("intval")); assertEquals("2", record.get("intval"));
assertNull(record.get("null")); assertNull(record.get("null"));
assertEquals("defval", record.get("null", "defval")); assertEquals("defval", record.get("null", "defval"));
byte[] bytes = marshal.toByteswithHeader(record); byte[] bytes = marshal.toBytes(record);
ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes); ServiceRecord r2 = marshal.fromBytes("", bytes);
assertEquals("value", r2.get("key")); assertEquals("value", r2.get("key"));
assertEquals("2", r2.get("intval")); assertEquals("2", r2.get("intval"));
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.AbstractRegistryTest; import org.apache.hadoop.registry.AbstractRegistryTest;
import org.apache.hadoop.registry.client.api.BindFlags; import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.exceptions.NoRecordException; import org.apache.hadoop.registry.client.exceptions.NoRecordException;
@ -91,10 +92,8 @@ public void testLsParent() throws Throwable {
childStats.values()); childStats.values());
assertEquals(1, records.size()); assertEquals(1, records.size());
ServiceRecord record = records.get(ENTRY_PATH); ServiceRecord record = records.get(ENTRY_PATH);
assertNotNull(record); RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record);
record.validate();
assertMatches(written, record); assertMatches(written, record);
} }
@Test @Test

View File

@ -352,6 +352,10 @@ application.
<td>Name</td> <td>Name</td>
<td>Description</td> <td>Description</td>
</tr> </tr>
<tr>
<td>type: String</td>
<td>Always: "JSONServiceRecord"</td>
</tr>
<tr> <tr>
<td>description: String</td> <td>description: String</td>
<td>Human-readable description.</td> <td>Human-readable description.</td>
@ -366,6 +370,8 @@ application.
</tr> </tr>
</table> </table>
The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing.
### YARN Persistence policies ### YARN Persistence policies
The YARN Resource Manager integration integrates cleanup of service records The YARN Resource Manager integration integrates cleanup of service records
@ -379,7 +385,6 @@ any use of the registry without the RM's participation.
The attributes, `yarn:id` and `yarn:persistence` specify which records The attributes, `yarn:id` and `yarn:persistence` specify which records
*and any child entries* may be deleted as the associated YARN components complete. *and any child entries* may be deleted as the associated YARN components complete.
The `yarn:id` field defines the application, attempt or container ID to match; The `yarn:id` field defines the application, attempt or container ID to match;
the `yarn:persistence` attribute defines the trigger for record cleanup, and the `yarn:persistence` attribute defines the trigger for record cleanup, and
implicitly the type of the contents of the `yarn:id` field. implicitly the type of the contents of the `yarn:id` field.
@ -432,31 +437,32 @@ up according the lifecycle of that application.
<td>Description</td> <td>Description</td>
</tr> </tr>
<tr> <tr>
<td>addresses: List[List[String]]</td> <td>api: URI as String</td>
<td>a list of address tuples whose format depends on the address type</td> <td>API implemented at the end of the binding</td>
</tr>
<tr>
<td>addressType: String</td>
<td>format of the binding</td>
</tr>
<tr> <tr>
<td>protocol: String</td> <td>protocol: String</td>
<td>Protocol. Examples: <td>Protocol. Examples:
`http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td> `http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ...</td>
</tr> </tr>
<tr> <tr>
<td>api: String</td> <td>addressType: String</td>
<td>API implemented at the end of the binding</td> <td>format of the binding</td>
</tr> </tr>
</tr>
<tr>
<td>addresses: List[Map[String, String]]</td>
<td>a list of address maps</td>
</tr>
</table> </table>
All string fields have a limit on size, to dissuade services from hiding All string fields have a limit on size, to dissuade services from hiding
complex JSON structures in the text description. complex JSON structures in the text description.
### Field: Address Type #### Field `addressType`: Address Type
The addressType field defines the string format of entries. The `addressType` field defines the string format of entries.
Having separate types is that tools (such as a web viewer) can process binding Having separate types is that tools (such as a web viewer) can process binding
strings without having to recognize the protocol. strings without having to recognize the protocol.
@ -467,43 +473,58 @@ strings without having to recognize the protocol.
<td>binding format</td> <td>binding format</td>
</tr> </tr>
<tr> <tr>
<td>`url`</td> <td>uri</td>
<td>`[URL]`</td> <td>uri:URI of endpoint</td>
</tr> </tr>
<tr> <tr>
<td>`hostname`</td> <td>hostname</td>
<td>`[hostname]`</td> <td>hostname: service host</td>
</tr> </tr>
<tr> <tr>
<td>`inetaddress`</td> <td>inetaddress</td>
<td>`[hostname, port]`</td> <td>hostname: service host, port: service port</td>
</tr> </tr>
<tr> <tr>
<td>`path`</td> <td>path</td>
<td>`[/path/to/something]`</td> <td>path: generic unix filesystem path</td>
</tr> </tr>
<tr> <tr>
<td>`zookeeper`</td> <td>zookeeper</td>
<td>`[quorum-entry, path]`</td> <td>hostname: service host, port: service port, path: ZK path</td>
</tr> </tr>
</table> </table>
An actual zookeeper binding consists of a list of `hostname:port` bindings the In the zookeeper binding, every entry represents a single node in quorum,
quorum— and the path within. In the proposed schema, every quorum entry will be the `hostname` and `port` fields defining the hostname of the ZK instance
listed as a triple of `[hostname, port, path]`. Client applications do not and the port on which it is listening. The `path` field lists zookeeper path
expect the path to de be different across the quorum. The first entry in the for applications to use. For example, for HBase this would refer to the znode
list of quorum hosts MUST define the path to be used by all clients. Later containing information about the HBase cluster.
entries SHOULD list the same path, though clients MUST ignore these.
The path MUST be identical across all address elements in the `addresses` list.
This ensures that any single address contains enough information to connect
to the quorum and connect to the relevant znode.
New Address types may be defined; if not standard please prefix with the New Address types may be defined; if not standard please prefix with the
character sequence `"x-"`. character sequence `"x-"`.
#### **Field: API** ### Field `api`: API identifier
The API field MUST contain a URI that identifies the specific API of an endpoint.
These MUST be unique to an API to avoid confusion.
The following strategies are suggested to provide unique URIs for an API
1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service
2. A URL to the svn/git hosted document defining a REST API
3. the `classpath` schema followed by a path to a class or package in an application.
4. The `uuid` schema with a generated UUID.
It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document
* `http://` : A web site for humans
* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based)
APIs may be unique to a service class, or may be common across by service
classes. They MUST be given unique names. These MAY be based on service
packages but MAY be derived from other naming schemes:
### Examples of Service Entries ### Examples of Service Entries
@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer.
{ {
"description" : "tomcat-based web application", "description" : "tomcat-based web application",
"registrationTime" : 1408638082444,
"external" : [ { "external" : [ {
"api" : "www", "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri", "addressType" : "uri",
"protocolType" : "REST", "protocol" : "REST",
"addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ] "addresses" : [
{ "uri" : "http://loadbalancer/" },
{ "uri" : "http://loadbalancer2/" }
]
} ], } ],
"internal" : [ ] "internal" : [ ]
} }
@ -545,21 +568,23 @@ will trigger the deletion of this entry
/users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001 /users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001
{ {
"registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000001", "yarn:id" : "container_1408631738011_0001_01_000001",
"yarn:persistence" : "3", "yarn:persistence" : "container",
"description" : null, "description" : "",
"external" : [ { "external" : [ {
"api" : "www", "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri", "addressType" : "uri",
"protocolType" : "REST", "protocol" : "REST",
"addresses" : [ [ "http://rack4server3:43572" ] ] "addresses" : [{ "uri" : "rack4server3:43572" } ]
} ], } ],
"internal" : [ { "internal" : [ {
"api" : "jmx", "api" : "classpath:javax.management.jmx",
"addressType" : "host/port", "addressType" : "host/port",
"protocolType" : "JMX", "protocol" : "rmi",
"addresses" : [ [ "rack4server3", "43573" ] ] "addresses" : [ {
"host" : "rack4server3",
"port" : "48551"
} ]
} ] } ]
} }
@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal.
{ {
"registrationTime" : 1408638082445, "registrationTime" : 1408638082445,
"yarn:id" : "container_1408631738011_0001_01_000002", "yarn:id" : "container_1408631738011_0001_01_000002",
"yarn:persistence" : "3", "yarn:persistence" : "container",
"description" : null, "description" : null,
"external" : [ { "external" : [ {
"api" : "www", "api" : "http://internal.example.org/restapis/scheduler/20141026v1",
"addressType" : "uri", "addressType" : "uri",
"protocolType" : "REST", "protocol" : "REST",
"addresses" : [ [ "http://rack1server28:35881" ] ] "addresses" : [ [ "http://rack1server28:35881" ] ]
} ], } ],
"internal" : [ { "internal" : [ {
"api" : "jmx", "api" : "classpath:javax.management.jmx",
"addressType" : "host/port", "addressType" : "host/port",
"protocolType" : "JMX", "protocol" : "rmi",
"addresses" : [ [ "rack1server28", "35882" ] ] "addresses" : [ {
"host" : "rack1server28",
"port" : "48551"
} ]
} ] } ]
} }
@ -887,3 +915,106 @@ Implementations may throttle update operations.
**Rate of Polling** **Rate of Polling**
Clients which poll the registry may be throttled. Clients which poll the registry may be throttled.
# Complete service record example
Below is a (non-normative) example of a service record retrieved
from a YARN application.
{
"type" : "JSONServiceRecord",
"description" : "Slider Application Master",
"yarn:persistence" : "application",
"yarn:id" : "application_1414052463672_0028",
"external" : [ {
"api" : "classpath:org.apache.slider.appmaster",
"addressType" : "host/port",
"protocol" : "hadoop/IPC",
"addresses" : [ {
"port" : "48551",
"host" : "nn.example.com"
} ]
}, {
"api" : "http://",
"addressType" : "uri",
"protocol" : "web",
"addresses" : [ {
"uri" : "http://nn.example.com:40743"
} ]
}, {
"api" : "classpath:org.apache.slider.management",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt"
} ]
}, {
"api" : "classpath:org.apache.slider.publisher",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher"
} ]
}, {
"api" : "classpath:org.apache.slider.registry",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "http://nn.example.com:40743/ws/v1/slider/registry"
} ]
}, {
"api" : "classpath:org.apache.slider.publisher.configurations",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider"
} ]
}, {
"api" : "classpath:org.apache.slider.publisher.exports",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports"
} ]
} ],
"internal" : [ {
"api" : "classpath:org.apache.slider.agents.secure",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "https://nn.example.com:52705/ws/v1/slider/agents"
} ]
}, {
"api" : "classpath:org.apache.slider.agents.oneway",
"addressType" : "uri",
"protocol" : "REST",
"addresses" : [ {
"uri" : "https://nn.example.com:33425/ws/v1/slider/agents"
} ]
} ]
}
It publishes a number of endpoints, both internal and external.
External:
1. The IPC hostname and port for client-AM communications
1. URL to the AM's web UI
1. A series of REST URLs under the web UI for specific application services.
The details are irrelevant —note that they use an application-specific API
value to ensure uniqueness.
Internal:
1. Two URLS to REST APIs offered by the AM for containers deployed by
the application itself.
Python agents running in the containers retrieve the internal endpoint
URLs to communicate with their AM. The record is resolved on container startup
and cached until communications problems occur. At that point the registry is
queried for the current record, then an attempt is made to reconnect to the AM.
Here "connectivity" problems means both "low level socket/IO errors" and
"failures in HTTPS authentication". The agents use two-way HTTPS authentication
—if the AM fails and another application starts listening on the same ports
it will trigger an authentication failure and hence service record reread.