From 5924e74d550b3ac5e5d65c2fc80275095de1c0e1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 6 Nov 2014 20:21:25 +0000 Subject: [PATCH] YARN-2768 Improved Yarn Registry service record structure (stevel) --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/registry/cli/RegistryCli.java | 25 +- .../registry/client/binding/JsonSerDeser.java | 144 +++++------ .../client/binding/RegistryTypeUtils.java | 166 ++++++++----- .../client/binding/RegistryUtils.java | 7 +- .../client/exceptions/NoRecordException.java | 10 +- .../impl/zk/RegistryOperationsService.java | 12 +- .../registry/client/types/AddressTypes.java | 2 + .../registry/client/types/Endpoint.java | 131 +++++++--- .../registry/client/types/ProtocolTypes.java | 7 +- .../registry/client/types/ServiceRecord.java | 26 +- .../client/types/ServiceRecordHeader.java | 59 ----- .../src/main/tla/yarnregistry.tla | 94 +++++-- .../hadoop/registry/RegistryTestHelper.java | 36 ++- .../client/binding/TestMarshalling.java | 76 ++++-- .../operations/TestRegistryOperations.java | 5 +- .../site/markdown/registry/yarn-registry.md | 233 ++++++++++++++---- 17 files changed, 626 insertions(+), 409 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8335d2b1c4f..6689d894772 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -710,6 +710,8 @@ Release 2.6.0 - UNRELEASED YARN-2677 registry punycoding of usernames doesn't fix all usernames to be DNS-valid (stevel) + YARN-2768 Improved Yarn Registry service record structure (stevel) + --- YARN-2598 GHS should show N/A instead of null for the inaccessible information diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java index 863039e2e8b..bf2b5e5a54d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/cli/RegistryCli.java @@ -24,6 +24,7 @@ import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; import org.apache.commons.cli.CommandLine; @@ -174,24 +175,22 @@ public class RegistryCli extends Configured implements Tool { ServiceRecord record = registry.resolve(argsList.get(1)); for (Endpoint endpoint : record.external) { - if ((endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_WEBUI)) - || (endpoint.protocolType.equals(ProtocolTypes.PROTOCOL_REST))) { - sysout.print(" Endpoint(ProtocolType=" - + endpoint.protocolType + ", Api=" - + endpoint.api + "); Uris are: "); - } else { - sysout.print(" Endpoint(ProtocolType=" + sysout.println(" Endpoint(ProtocolType=" + endpoint.protocolType + ", Api=" + endpoint.api + ");" + " Addresses(AddressType=" + endpoint.addressType + ") are: "); - } - for (List a : endpoint.addresses) { - sysout.print(a + " "); - } - sysout.println(); - } + for (Map address : endpoint.addresses) { + sysout.println(" [ "); + for (Map.Entry entry : address.entrySet()) { + sysout.println(" " + entry.getKey() + + ": \"" + entry.getValue() + "\""); + } + sysout.println(" ]"); + } + sysout.println(); + } return 0; } catch (Exception e) { syserr.println(analyzeException("resolve", e, argsList)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java index e086e3694f6..af4e4f409c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/JsonSerDeser.java @@ -19,6 +19,7 @@ package org.apache.hadoop.registry.client.binding; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; @@ -45,8 +46,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; /** * Support for marshalling objects to and from JSON. @@ -62,30 +61,30 @@ public class JsonSerDeser { private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeser.class); private static final String UTF_8 = "UTF-8"; - public static final String E_NO_SERVICE_RECORD = "No service record at path"; + public static final String E_NO_DATA = "No data at path"; + public static final String E_DATA_TOO_SHORT = "Data at path too short"; + public static final String E_MISSING_MARKER_STRING = + "Missing marker string: "; private final Class classType; private final ObjectMapper mapper; - private final byte[] header; /** * Create an instance bound to a specific type * @param classType class to marshall - * @param header byte array to use as header */ - public JsonSerDeser(Class classType, byte[] header) { + public JsonSerDeser(Class classType) { Preconditions.checkArgument(classType != null, "null classType"); - Preconditions.checkArgument(header != null, "null header"); this.classType = classType; this.mapper = new ObjectMapper(); mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - // make an immutable copy to keep findbugs happy. - byte[] h = new byte[header.length]; - System.arraycopy(header, 0, h, 0, header.length); - this.header = h; } + /** + * Get the simple name of the class type to be marshalled + * @return the name of the class being marshalled + */ public String getName() { return classType.getSimpleName(); } @@ -183,7 +182,7 @@ public class JsonSerDeser { if (count != len) { throw new EOFException(path.toString() + ": read finished prematurely"); } - return fromBytes(path.toString(), b, 0); + return fromBytes(path.toString(), b); } /** @@ -206,8 +205,7 @@ public class JsonSerDeser { * @throws IOException on any failure */ private void writeJsonAsBytes(T instance, - DataOutputStream dataOutputStream) throws - IOException { + DataOutputStream dataOutputStream) throws IOException { try { byte[] b = toBytes(instance); dataOutputStream.write(b); @@ -227,37 +225,51 @@ public class JsonSerDeser { return json.getBytes(UTF_8); } - /** - * Convert JSON To bytes, inserting the header - * @param instance instance to convert - * @return a byte array - * @throws IOException - */ - public byte[] toByteswithHeader(T instance) throws IOException { - byte[] body = toBytes(instance); - - ByteBuffer buffer = ByteBuffer.allocate(body.length + header.length); - buffer.put(header); - buffer.put(body); - return buffer.array(); - } - /** * Deserialize from a byte array * @param path path the data came from * @param bytes byte array - * @return offset in the array to read from * @throws IOException all problems * @throws EOFException not enough data * @throws InvalidRecordException if the parsing failed -the record is invalid */ - public T fromBytes(String path, byte[] bytes, int offset) throws IOException, + public T fromBytes(String path, byte[] bytes) throws IOException, InvalidRecordException { - int data = bytes.length - offset; - if (data <= 0) { - throw new EOFException("No data at " + path); + return fromBytes(path, bytes, ""); + } + + /** + * Deserialize from a byte array, optionally checking for a marker string. + *

+ * If the marker parameter is supplied (and not empty), then its presence + * will be verified before the JSON parsing takes place; it is a fast-fail + * check. If not found, an {@link InvalidRecordException} exception will be + * raised + * @param path path the data came from + * @param bytes byte array + * @param marker an optional string which, if set, MUST be present in the + * UTF-8 parsed payload. + * @return The parsed record + * @throws IOException all problems + * @throws EOFException not enough data + * @throws InvalidRecordException if the JSON parsing failed. + * @throws NoRecordException if the data is not considered a record: either + * it is too short or it did not contain the marker string. + */ + public T fromBytes(String path, byte[] bytes, String marker) + throws IOException, NoRecordException, InvalidRecordException { + int len = bytes.length; + if (len == 0 ) { + throw new NoRecordException(path, E_NO_DATA); + } + if (StringUtils.isNotEmpty(marker) && len < marker.length()) { + throw new NoRecordException(path, E_DATA_TOO_SHORT); + } + String json = new String(bytes, 0, len, UTF_8); + if (StringUtils.isNotEmpty(marker) + && !json.contains(marker)) { + throw new NoRecordException(path, E_MISSING_MARKER_STRING + marker); } - String json = new String(bytes, offset, data, UTF_8); try { return fromJson(json); } catch (JsonProcessingException e) { @@ -266,52 +278,7 @@ public class JsonSerDeser { } /** - * Read from a byte array to a type, checking the header first - * @param path source of data - * @param buffer buffer - * @return the parsed structure - * Null if the record was too short or the header did not match - * @throws IOException on a failure - * @throws NoRecordException if header checks implied there was no record - * @throws InvalidRecordException if record parsing failed - */ - @SuppressWarnings("unchecked") - public T fromBytesWithHeader(String path, byte[] buffer) throws IOException { - int hlen = header.length; - int blen = buffer.length; - if (hlen > 0) { - if (blen < hlen) { - throw new NoRecordException(path, E_NO_SERVICE_RECORD); - } - byte[] magic = Arrays.copyOfRange(buffer, 0, hlen); - if (!Arrays.equals(header, magic)) { - LOG.debug("start of entry does not match service record header at {}", - path); - throw new NoRecordException(path, E_NO_SERVICE_RECORD); - } - } - return fromBytes(path, buffer, hlen); - } - - /** - * Check if a buffer has a header which matches this record type - * @param buffer buffer - * @return true if there is a match - * @throws IOException - */ - public boolean headerMatches(byte[] buffer) throws IOException { - int hlen = header.length; - int blen = buffer.length; - boolean matches = false; - if (blen > hlen) { - byte[] magic = Arrays.copyOfRange(buffer, 0, hlen); - matches = Arrays.equals(header, magic); - } - return matches; - } - - /** - * Convert an object to a JSON string + * Convert an instance to a JSON string * @param instance instance to convert * @return a JSON string description * @throws JsonParseException parse problems @@ -324,4 +291,19 @@ public class JsonSerDeser { return mapper.writeValueAsString(instance); } + /** + * Convert an instance to a string form for output. This is a robust + * operation which will convert any JSON-generating exceptions into + * error text. + * @param instance non-null instance + * @return a JSON string + */ + public String toString(T instance) { + Preconditions.checkArgument(instance != null, "Null instance argument"); + try { + return toJson(instance); + } catch (IOException e) { + return "Failed to convert to a string: " + e; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java index b4254a3beba..ec59d5985a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java @@ -22,17 +22,19 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; -import org.apache.hadoop.registry.client.types.AddressTypes; +import static org.apache.hadoop.registry.client.types.AddressTypes.*; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Static methods to work with registry types —primarily endpoints and the @@ -94,79 +96,66 @@ public class RegistryTypeUtils { Preconditions.checkArgument(protocolType != null, "null protocolType"); Preconditions.checkArgument(hostname != null, "null hostname"); return new Endpoint(api, - AddressTypes.ADDRESS_HOSTNAME_AND_PORT, + ADDRESS_HOSTNAME_AND_PORT, protocolType, - tuplelist(hostname, Integer.toString(port))); + hostnamePortPair(hostname, port)); } /** * Create an IPC endpoint * @param api API - * @param protobuf flag to indicate whether or not the IPC uses protocol - * buffers * @param address the address as a tuple of (hostname, port) * @return the new endpoint */ - public static Endpoint ipcEndpoint(String api, - boolean protobuf, List address) { - ArrayList> addressList = new ArrayList>(); - if (address != null) { - addressList.add(address); - } + public static Endpoint ipcEndpoint(String api, InetSocketAddress address) { return new Endpoint(api, - AddressTypes.ADDRESS_HOSTNAME_AND_PORT, - protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF - : ProtocolTypes.PROTOCOL_HADOOP_IPC, - addressList); + ADDRESS_HOSTNAME_AND_PORT, + ProtocolTypes.PROTOCOL_HADOOP_IPC, + address== null ? null: hostnamePortPair(address)); } /** - * Create a single-element list of tuples from the input. - * that is, an input ("a","b","c") is converted into a list - * in the form [["a","b","c"]] - * @param t1 tuple elements - * @return a list containing a single tuple + * Create a single entry map + * @param key map entry key + * @param val map entry value + * @return a 1 entry map. */ - public static List> tuplelist(String... t1) { - List> outer = new ArrayList>(); - outer.add(tuple(t1)); - return outer; + public static Map map(String key, String val) { + Map map = new HashMap(1); + map.put(key, val); + return map; } /** - * Create a tuples from the input. - * that is, an input ("a","b","c") is converted into a list - * in the form ["a","b","c"] - * @param t1 tuple elements - * @return a single tuple as a list + * Create a URI + * @param uri value + * @return a 1 entry map. */ - public static List tuple(String... t1) { - return Arrays.asList(t1); + public static Map uri(String uri) { + return map(ADDRESS_URI, uri); } /** - * Create a tuples from the input, converting all to Strings in the process - * that is, an input ("a", 7, true) is converted into a list - * in the form ["a","7,"true"] - * @param t1 tuple elements - * @return a single tuple as a list + * Create a (hostname, port) address pair + * @param hostname hostname + * @param port port + * @return a 1 entry map. */ - public static List tuple(Object... t1) { - List l = new ArrayList(t1.length); - for (Object t : t1) { - l.add(t.toString()); - } - return l; + public static Map hostnamePortPair(String hostname, int port) { + Map map = + map(ADDRESS_HOSTNAME_FIELD, hostname); + map.put(ADDRESS_PORT_FIELD, Integer.toString(port)); + return map; } /** - * Convert a socket address pair into a string tuple, (host, port). - * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups. - * @param address an address - * @return an element for the address list + * Create a (hostname, port) address pair + * @param address socket address whose hostname and port are used for the + * generated address. + * @return a 1 entry map. */ - public static List marshall(InetSocketAddress address) { - return tuple(address.getHostName(), address.getPort()); + public static Map hostnamePortPair(InetSocketAddress address) { + return hostnamePortPair(address.getHostName(), address.getPort()); } /** @@ -199,24 +188,36 @@ public class RegistryTypeUtils { if (epr == null) { return null; } - requireAddressType(AddressTypes.ADDRESS_URI, epr); - List> addresses = epr.addresses; + requireAddressType(ADDRESS_URI, epr); + List> addresses = epr.addresses; if (addresses.size() < 1) { throw new InvalidRecordException(epr.toString(), "No addresses in endpoint"); } List results = new ArrayList(addresses.size()); - for (List address : addresses) { - if (address.size() != 1) { - throw new InvalidRecordException(epr.toString(), - "Address payload invalid: wrong element count: " + - address.size()); - } - results.add(address.get(0)); + for (Map address : addresses) { + results.add(getAddressField(address, ADDRESS_URI)); } return results; } + /** + * Get a specific field from an address -raising an exception if + * the field is not present + * @param address address to query + * @param field field to resolve + * @return the resolved value. Guaranteed to be non-null. + * @throws InvalidRecordException if the field did not resolve + */ + public static String getAddressField(Map address, + String field) throws InvalidRecordException { + String val = address.get(field); + if (val == null) { + throw new InvalidRecordException("", "Missing address field: " + field); + } + return val; + } + /** * Get the address URLs. Guranteed to return at least one address. * @param epr endpoint @@ -237,4 +238,53 @@ public class RegistryTypeUtils { } return results; } + + /** + * Validate the record by checking for null fields and other invalid + * conditions + * @param path path for exceptions + * @param record record to validate. May be null + * @throws InvalidRecordException on invalid entries + */ + public static void validateServiceRecord(String path, ServiceRecord record) + throws InvalidRecordException { + if (record == null) { + throw new InvalidRecordException(path, "Null record"); + } + if (!ServiceRecord.RECORD_TYPE.equals(record.type)) { + throw new InvalidRecordException(path, + "invalid record type field: \"" + record.type + "\""); + } + + if (record.external != null) { + for (Endpoint endpoint : record.external) { + validateEndpoint(path, endpoint); + } + } + if (record.internal != null) { + for (Endpoint endpoint : record.internal) { + validateEndpoint(path, endpoint); + } + } + } + + /** + * Validate the endpoint by checking for null fields and other invalid + * conditions + * @param path path for exceptions + * @param endpoint endpoint to validate. May be null + * @throws InvalidRecordException on invalid entries + */ + public static void validateEndpoint(String path, Endpoint endpoint) + throws InvalidRecordException { + if (endpoint == null) { + throw new InvalidRecordException(path, "Null endpoint"); + } + try { + endpoint.validate(); + } catch (RuntimeException e) { + throw new InvalidRecordException(path, e.toString()); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java index 8caf4002fee..68dc84e7bf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java @@ -33,7 +33,6 @@ import org.apache.hadoop.registry.client.exceptions.NoRecordException; import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.ServiceRecordHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -314,7 +313,7 @@ public class RegistryUtils { Collection stats) throws IOException { Map results = new HashMap(stats.size()); for (RegistryPathStatus stat : stats) { - if (stat.size > ServiceRecordHeader.getLength()) { + if (stat.size > ServiceRecord.RECORD_TYPE.length()) { // maybe has data String path = join(parentpath, stat.path); try { @@ -344,7 +343,6 @@ public class RegistryUtils { *

* @param operations operation support for fetches * @param parentpath path of the parent of all the entries - * @param stats a map of name:value mappings. * @return a possibly empty map of fullpath:record. * @throws IOException for any IO Operation that wasn't ignored. */ @@ -362,7 +360,6 @@ public class RegistryUtils { *

* @param operations operation support for fetches * @param parentpath path of the parent of all the entries - * @param stats a map of name:value mappings. * @return a possibly empty map of fullpath:record. * @throws IOException for any IO Operation that wasn't ignored. */ @@ -382,7 +379,7 @@ public class RegistryUtils { */ public static class ServiceRecordMarshal extends JsonSerDeser { public ServiceRecordMarshal() { - super(ServiceRecord.class, ServiceRecordHeader.getData()); + super(ServiceRecord.class); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java index 160433f0814..b81b9d41341 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java @@ -21,17 +21,11 @@ package org.apache.hadoop.registry.client.exceptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.ServiceRecordHeader; /** * Raised if there is no {@link ServiceRecord} resolved at the end - * of the specified path, for reasons such as: - *

    - *
  • There wasn't enough data to contain a Service Record.
  • - *
  • The start of the data did not match the {@link ServiceRecordHeader} - * header.
  • - *
- * + * of the specified path. + *

* There may be valid data of some form at the end of the path, but it does * not appear to be a Service Record. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java index 7c01bdf433e..271ab254633 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java @@ -24,9 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.registry.client.api.BindFlags; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.zookeeper.CreateMode; @@ -103,10 +105,12 @@ public class RegistryOperationsService extends CuratorService int flags) throws IOException { Preconditions.checkArgument(record != null, "null record"); validatePath(path); + // validate the record before putting it + RegistryTypeUtils.validateServiceRecord(path, record); LOG.info("Bound at {} : {}", path, record); CreateMode mode = CreateMode.PERSISTENT; - byte[] bytes = serviceRecordMarshal.toByteswithHeader(record); + byte[] bytes = serviceRecordMarshal.toBytes(record); zkSet(path, mode, bytes, getClientAcls(), ((flags & BindFlags.OVERWRITE) != 0)); } @@ -114,7 +118,11 @@ public class RegistryOperationsService extends CuratorService @Override public ServiceRecord resolve(String path) throws IOException { byte[] bytes = zkRead(path); - return serviceRecordMarshal.fromBytesWithHeader(path, bytes); + + ServiceRecord record = serviceRecordMarshal.fromBytes(path, + bytes, ServiceRecord.RECORD_TYPE); + RegistryTypeUtils.validateServiceRecord(path, record); + return record; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java index 192819c8d7d..36dbf0ce66e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java @@ -38,6 +38,8 @@ public interface AddressTypes { * */ public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port"; + public static final String ADDRESS_HOSTNAME_FIELD = "host"; + public static final String ADDRESS_PORT_FIELD = "port"; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java index 51418d9c9e5..e4effb42c86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java @@ -21,14 +21,16 @@ package org.apache.hadoop.registry.client.types; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.binding.JsonSerDeser; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; import java.net.URI; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Description of a single service/component endpoint. @@ -67,7 +69,7 @@ public final class Endpoint implements Cloneable { /** * a list of address tuples —tuples whose format depends on the address type */ - public List> addresses; + public List> addresses; /** * Create an empty instance. @@ -84,10 +86,11 @@ public final class Endpoint implements Cloneable { this.api = that.api; this.addressType = that.addressType; this.protocolType = that.protocolType; - this.addresses = new ArrayList>(that.addresses.size()); - for (List address : addresses) { - List addr2 = new ArrayList(address.size()); - Collections.copy(address, addr2); + this.addresses = newAddresses(that.addresses.size()); + for (Map address : that.addresses) { + Map addr2 = new HashMap(address.size()); + addr2.putAll(address); + addresses.add(addr2); } } @@ -101,16 +104,82 @@ public final class Endpoint implements Cloneable { public Endpoint(String api, String addressType, String protocolType, - List> addrs) { + List> addrs) { this.api = api; this.addressType = addressType; this.protocolType = protocolType; - this.addresses = new ArrayList>(); + this.addresses = newAddresses(0); if (addrs != null) { addresses.addAll(addrs); } } + /** + * Build an endpoint with an empty address list + * @param api API name + * @param addressType address type + * @param protocolType protocol type + */ + public Endpoint(String api, + String addressType, + String protocolType) { + this.api = api; + this.addressType = addressType; + this.protocolType = protocolType; + this.addresses = newAddresses(0); + } + + /** + * Build an endpoint with a single address entry. + *

+ * This constructor is superfluous given the varags constructor is equivalent + * for a single element argument. However, type-erasure in java generics + * causes javac to warn about unchecked generic array creation. This + * constructor, which represents the common "one address" case, does + * not generate compile-time warnings. + * @param api API name + * @param addressType address type + * @param protocolType protocol type + * @param addr address. May be null —in which case it is not added + */ + public Endpoint(String api, + String addressType, + String protocolType, + Map addr) { + this(api, addressType, protocolType); + if (addr != null) { + addresses.add(addr); + } + } + + /** + * Build an endpoint with a list of addresses + * @param api API name + * @param addressType address type + * @param protocolType protocol type + * @param addrs addresses. Null elements will be skipped + */ + public Endpoint(String api, + String addressType, + String protocolType, + Map...addrs) { + this(api, addressType, protocolType); + for (Map addr : addrs) { + if (addr!=null) { + addresses.add(addr); + } + } + } + + /** + * Create a new address structure of the requested size + * @param size size to create + * @return the new list + */ + private List> newAddresses(int size) { + return new ArrayList>(size); + } + /** * Build an endpoint from a list of URIs; each URI * is ASCII-encoded and added to the list of addresses. @@ -125,40 +194,16 @@ public final class Endpoint implements Cloneable { this.addressType = AddressTypes.ADDRESS_URI; this.protocolType = protocolType; - List> addrs = new ArrayList>(uris.length); + List> addrs = newAddresses(uris.length); for (URI uri : uris) { - addrs.add(RegistryTypeUtils.tuple(uri.toString())); + addrs.add(RegistryTypeUtils.uri(uri.toString())); } this.addresses = addrs; } @Override public String toString() { - final StringBuilder sb = new StringBuilder("Endpoint{"); - sb.append("api='").append(api).append('\''); - sb.append(", addressType='").append(addressType).append('\''); - sb.append(", protocolType='").append(protocolType).append('\''); - - sb.append(", addresses="); - if (addresses != null) { - sb.append("[ "); - for (List address : addresses) { - sb.append("[ "); - if (address == null) { - sb.append("NULL entry in address list"); - } else { - for (String elt : address) { - sb.append('"').append(elt).append("\" "); - } - } - sb.append("] "); - }; - sb.append("] "); - } else { - sb.append("(null) "); - } - sb.append('}'); - return sb.toString(); + return marshalToString.toString(this); } /** @@ -173,7 +218,7 @@ public final class Endpoint implements Cloneable { Preconditions.checkNotNull(addressType, "null addressType field"); Preconditions.checkNotNull(protocolType, "null protocolType field"); Preconditions.checkNotNull(addresses, "null addresses field"); - for (List address : addresses) { + for (Map address : addresses) { Preconditions.checkNotNull(address, "null element in address"); } } @@ -184,7 +229,19 @@ public final class Endpoint implements Cloneable { * @throws CloneNotSupportedException */ @Override - protected Object clone() throws CloneNotSupportedException { + public Object clone() throws CloneNotSupportedException { return super.clone(); } + + + /** + * Static instance of service record marshalling + */ + private static class Marshal extends JsonSerDeser { + private Marshal() { + super(Endpoint.class); + } + } + + private static final Marshal marshalToString = new Marshal(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java index f225cf08775..b836b0003c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java @@ -34,15 +34,10 @@ public interface ProtocolTypes { String PROTOCOL_FILESYSTEM = "hadoop/filesystem"; /** - * Classic Hadoop IPC : {@value}. + * Hadoop IPC, "classic" or protobuf : {@value}. */ String PROTOCOL_HADOOP_IPC = "hadoop/IPC"; - /** - * Hadoop protocol buffers IPC: {@value}. - */ - String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf"; - /** * Corba IIOP: {@value}. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java index 378127fc026..9403d3168e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java @@ -21,6 +21,7 @@ package org.apache.hadoop.registry.client.types; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.codehaus.jackson.annotate.JsonAnyGetter; import org.codehaus.jackson.annotate.JsonAnySetter; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -40,6 +41,17 @@ import java.util.Map; @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public class ServiceRecord implements Cloneable { + /** + * A type string which MUST be in the serialized json. This permits + * fast discarding of invalid entries + */ + public static final String RECORD_TYPE = "JSONServiceRecord"; + + /** + * The type field. This must be the string {@link #RECORD_TYPE} + */ + public String type = RECORD_TYPE; + /** * Description string */ @@ -233,17 +245,5 @@ public class ServiceRecord implements Cloneable { return super.clone(); } - /** - * Validate the record by checking for null fields and other invalid - * conditions - * @throws NullPointerException if a field is null when it - * MUST be set. - * @throws RuntimeException on invalid entries - */ - public void validate() { - for (Endpoint endpoint : external) { - Preconditions.checkNotNull("null endpoint", endpoint); - endpoint.validate(); - } - } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java deleted file mode 100644 index 2f75dba5a33..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.registry.client.types; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Service record header; access to the byte array kept private - * to avoid findbugs warnings of mutability - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class ServiceRecordHeader { - /** - * Header of a service record: "jsonservicerec" - * By making this over 12 bytes long, we can auto-determine which entries - * in a listing are too short to contain a record without getting their data - */ - private static final byte[] RECORD_HEADER = { - 'j', 's', 'o', 'n', - 's', 'e', 'r', 'v', 'i', 'c', 'e', - 'r', 'e', 'c' - }; - - /** - * Get the length of the record header - * @return the header length - */ - public static int getLength() { - return RECORD_HEADER.length; - } - - /** - * Get a clone of the record header - * @return the new record header. - */ - public static byte[] getData() { - byte[] h = new byte[RECORD_HEADER.length]; - System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length); - return h; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla index 1c19adead44..a950475f402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla @@ -4,6 +4,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC (* +============================================================================ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +20,7 @@ EXTENDS FiniteSets, Sequences, Naturals, TLC * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. +============================================================================ *) (* @@ -71,13 +73,22 @@ CONSTANTS MknodeActions \* all possible mkdir actions +ASSUME PathChars \in STRING +ASSUME Paths \in STRING + +(* Data in records is JSON, hence a string *) +ASSUME Data \in STRING + +---------------------------------------------------------------------------------------- (* the registry*) VARIABLE registry + (* Sequence of actions to apply to the registry *) VARIABLE actions + ---------------------------------------------------------------------------------------- (* Tuple of all variables. *) @@ -92,7 +103,6 @@ vars == << registry, actions >> (* Persistence policy *) PersistPolicySet == { - "", \* Undefined; field not present. PERMANENT is implied. "permanent", \* persists until explicitly removed "application", \* persists until the application finishes "application-attempt", \* persists until the application attempt finishes @@ -104,7 +114,6 @@ TypeInvariant == /\ \A p \in PersistPolicies: p \in PersistPolicySet - ---------------------------------------------------------------------------------------- @@ -129,6 +138,14 @@ RegistryEntry == [ ] +(* Define the set of all string to string mappings *) + +StringMap == [ + STRING |-> STRING +] + + + (* An endpoint in a service record *) @@ -140,21 +157,14 @@ Endpoint == [ addresses: Addresses ] -(* Attributes are the set of all string to string mappings *) - -Attributes == [ -STRING |-> STRING -] - (* A service record *) ServiceRecord == [ - \* ID -used when applying the persistence policy - yarn_id: STRING, - \* the persistence policy - yarn_persistence: PersistPolicySet, + \* This MUST be present: if it is not then the data is not a service record + \* This permits shortcut scan & reject of byte arrays without parsing + type: "JSONServiceRecord", \*A description description: STRING, @@ -166,9 +176,34 @@ ServiceRecord == [ internal: Endpoints, \* Attributes are a function - attributes: Attributes + attributes: StringMap ] +---------------------------------------------------------------------------------------- + +(* + There is an operation serialize whose internals are not defined, + Which converts the service records to JSON + *) + +CONSTANT serialize(_) + +(* A function which returns true iff the byte stream is considered a valid service record. *) +CONSTANT containsServiceRecord(_) + +(* A function to deserialize a string to JSON *) +CONSTANT deserialize(_) + +ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN + +(* Records can be serialized *) +ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r)) + +(* All strings for which containsServiceRecord() holds can be deserialized *) +ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord + + + ---------------------------------------------------------------------------------------- @@ -304,8 +339,8 @@ validRegistry(R) == \* an entry must be the root entry or have a parent entry /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path)) - \* If the entry has data, it must be a service record - /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords) + \* If the entry has data, it must contain a service record + /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data)) ---------------------------------------------------------------------------------------- @@ -336,13 +371,13 @@ mknode() adds a new empty entry where there was none before, iff *) mknodeSimple(R, path) == - LET record == [ path |-> path, data |-> <<>> ] + LET entry == [ path |-> path, data |-> <<>> ] IN \/ exists(R, path) - \/ (exists(R, parent(path)) /\ canBind(R, record) /\ (R' = R \union {record} )) + \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} )) (* -For all parents, the mknodeSimpl() criteria must apply. +For all parents, the mknodeSimple() criteria must apply. This could be defined recursively, though as TLA+ does not support recursion, an alternative is required @@ -350,7 +385,8 @@ an alternative is required Because this specification is declaring the final state of a operation, not the implemental, all that is needed is to describe those parents. -It declares that the mkdirSimple state applies to the path and all its parents in the set R' +It declares that the mknodeSimple() state applies to the path and all +its parents in the set R' *) mknodeWithParents(R, path) == @@ -402,7 +438,7 @@ purge(R, path, id, persistence) == => recursiveDelete(R, p2.path) (* -resolveRecord() resolves the record at a path or fails. +resolveEntry() resolves the record entry at a path or fails. It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator is guaranteed to return the single entry of that set, iff the choice predicate holds. @@ -411,19 +447,28 @@ Using a predicate of TRUE, it always succeeds, so this function selects the sole entry of the resolve operation. *) -resolveRecord(R, path) == +resolveEntry(R, path) == LET l == resolve(R, path) IN /\ Cardinality(l) = 1 /\ CHOOSE e \in l : TRUE +(* + Resolve a record by resolving the entry and deserializing the result + *) +resolveRecord(R, path) == + deserialize(resolveEntry(R, path)) + + (* The specific action of putting an entry into a record includes validating the record *) validRecordToBind(path, record) == \* The root entry must have permanent persistence - isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent" - \/ record.attributes["yarn:persistence"] = "") + isRootPath(path) => ( + record.attributes["yarn:persistence"] = "permanent" + \/ record.attributes["yarn:persistence"] + \/ record.attributes["yarn:persistence"] = {}) (* @@ -432,13 +477,12 @@ marshalled as the data in the entry *) bindRecord(R, path, record) == /\ validRecordToBind(path, record) - /\ bind(R, [path |-> path, data |-> record]) + /\ bind(R, [path |-> path, data |-> serialize(record)]) ---------------------------------------------------------------------------------------- - (* The action queue can only contain one of the sets of action types, and by giving each a unique name, those sets are guaranteed to be disjoint diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java index 460ecad876a..91602e1d3b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java @@ -20,7 +20,6 @@ package org.apache.hadoop.registry; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -46,11 +45,7 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; -import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint; -import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint; -import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint; -import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple; -import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint; +import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.*; /** * This is a set of static methods to aid testing the registry operations. @@ -61,18 +56,18 @@ public class RegistryTestHelper extends Assert { public static final String SC_HADOOP = "org-apache-hadoop"; public static final String USER = "devteam/"; public static final String NAME = "hdfs"; - public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs"; - public static final String API_HDFS = "org_apache_hadoop_namenode_dfs"; + public static final String API_WEBHDFS = "classpath:org.apache.hadoop.namenode.webhdfs"; + public static final String API_HDFS = "classpath:org.apache.hadoop.namenode.dfs"; public static final String USERPATH = RegistryConstants.PATH_USERS + USER; public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/"; public static final String ENTRY_PATH = PARENT_PATH + NAME; - public static final String NNIPC = "nnipc"; - public static final String IPC2 = "IPC2"; + public static final String NNIPC = "uuid:423C2B93-C927-4050-AEC6-6540E6646437"; + public static final String IPC2 = "uuid:0663501D-5AD3-4F7E-9419-52F5D6636FCF"; private static final Logger LOG = LoggerFactory.getLogger(RegistryTestHelper.class); - public static final String KTUTIL = "ktutil"; private static final RegistryUtils.ServiceRecordMarshal recordMarshal = new RegistryUtils.ServiceRecordMarshal(); + public static final String HTTP_API = "http://"; /** * Assert the path is valid by ZK rules @@ -148,9 +143,9 @@ public class RegistryTestHelper extends Assert { assertEquals(API_WEBHDFS, webhdfs.api); assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType); assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType); - List> addressList = webhdfs.addresses; - List url = addressList.get(0); - String addr = url.get(0); + List> addressList = webhdfs.addresses; + Map url = addressList.get(0); + String addr = url.get("uri"); assertTrue(addr.contains("http")); assertTrue(addr.contains(":8020")); @@ -159,8 +154,9 @@ public class RegistryTestHelper extends Assert { nnipc.protocolType); Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2); + assertNotNull(ipc2); - Endpoint web = findEndpoint(record, "web", true, 1, 1); + Endpoint web = findEndpoint(record, HTTP_API, true, 1, 1); assertEquals(1, web.addresses.size()); assertEquals(1, web.addresses.get(0).size()); } @@ -275,14 +271,14 @@ public class RegistryTestHelper extends Assert { public static void addSampleEndpoints(ServiceRecord entry, String hostname) throws URISyntaxException { assertNotNull(hostname); - entry.addExternalEndpoint(webEndpoint("web", + entry.addExternalEndpoint(webEndpoint(HTTP_API, new URI("http", hostname + ":80", "/"))); entry.addExternalEndpoint( restEndpoint(API_WEBHDFS, new URI("http", hostname + ":8020", "/"))); - Endpoint endpoint = ipcEndpoint(API_HDFS, true, null); - endpoint.addresses.add(tuple(hostname, "8030")); + Endpoint endpoint = ipcEndpoint(API_HDFS, null); + endpoint.addresses.add(RegistryTypeUtils.hostnamePortPair(hostname, 8030)); entry.addInternalEndpoint(endpoint); InetSocketAddress localhost = new InetSocketAddress("localhost", 8050); entry.addInternalEndpoint( @@ -290,9 +286,7 @@ public class RegistryTestHelper extends Assert { 8050)); entry.addInternalEndpoint( RegistryTypeUtils.ipcEndpoint( - IPC2, - true, - RegistryTypeUtils.marshall(localhost))); + IPC2, localhost)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java index 14e3b1fa631..f1814d30707 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/client/binding/TestMarshalling.java @@ -19,9 +19,9 @@ package org.apache.hadoop.registry.client.binding; import org.apache.hadoop.registry.RegistryTestHelper; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; import org.apache.hadoop.registry.client.exceptions.NoRecordException; import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.registry.client.types.ServiceRecordHeader; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; import org.junit.BeforeClass; import org.junit.Rule; @@ -31,8 +31,6 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; - /** * Test record marshalling */ @@ -44,6 +42,7 @@ public class TestMarshalling extends RegistryTestHelper { public final Timeout testTimeout = new Timeout(10000); @Rule public TestName methodName = new TestName(); + private static RegistryUtils.ServiceRecordMarshal marshal; @BeforeClass @@ -55,42 +54,65 @@ public class TestMarshalling extends RegistryTestHelper { public void testRoundTrip() throws Throwable { String persistence = PersistencePolicies.PERMANENT; ServiceRecord record = createRecord(persistence); - record.set("customkey","customvalue"); - record.set("customkey2","customvalue2"); + record.set("customkey", "customvalue"); + record.set("customkey2", "customvalue2"); + RegistryTypeUtils.validateServiceRecord("", record); LOG.info(marshal.toJson(record)); byte[] bytes = marshal.toBytes(record); - ServiceRecord r2 = marshal.fromBytes("", bytes, 0); + ServiceRecord r2 = marshal.fromBytes("", bytes); assertMatches(record, r2); + RegistryTypeUtils.validateServiceRecord("", r2); } - @Test - public void testRoundTripHeaders() throws Throwable { - ServiceRecord record = createRecord(PersistencePolicies.CONTAINER); - byte[] bytes = marshal.toByteswithHeader(record); - ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes); - assertMatches(record, r2); + @Test(expected = NoRecordException.class) + public void testUnmarshallNoData() throws Throwable { + marshal.fromBytes("src", new byte[]{}); } @Test(expected = NoRecordException.class) - public void testRoundTripBadHeaders() throws Throwable { - ServiceRecord record = createRecord(PersistencePolicies.APPLICATION); - byte[] bytes = marshal.toByteswithHeader(record); - bytes[1] = 0x01; - marshal.fromBytesWithHeader("src", bytes); + public void testUnmarshallNotEnoughData() throws Throwable { + // this is nominally JSON -but without the service record header + marshal.fromBytes("src", new byte[]{'{','}'}, ServiceRecord.RECORD_TYPE); } - @Test(expected = NoRecordException.class) - public void testUnmarshallHeaderTooShort() throws Throwable { - marshal.fromBytesWithHeader("src", new byte[]{'a'}); - } - - @Test(expected = EOFException.class) + @Test(expected = InvalidRecordException.class) public void testUnmarshallNoBody() throws Throwable { - byte[] bytes = ServiceRecordHeader.getData(); - marshal.fromBytesWithHeader("src", bytes); + byte[] bytes = "this is not valid JSON at all and should fail".getBytes(); + marshal.fromBytes("src", bytes); } + @Test(expected = InvalidRecordException.class) + public void testUnmarshallWrongType() throws Throwable { + byte[] bytes = "{'type':''}".getBytes(); + ServiceRecord serviceRecord = marshal.fromBytes("marshalling", bytes); + RegistryTypeUtils.validateServiceRecord("validating", serviceRecord); + } + + @Test(expected = NoRecordException.class) + public void testUnmarshallWrongLongType() throws Throwable { + ServiceRecord record = new ServiceRecord(); + record.type = "ThisRecordHasALongButNonMatchingType"; + byte[] bytes = marshal.toBytes(record); + ServiceRecord serviceRecord = marshal.fromBytes("marshalling", + bytes, ServiceRecord.RECORD_TYPE); + } + + @Test(expected = NoRecordException.class) + public void testUnmarshallNoType() throws Throwable { + ServiceRecord record = new ServiceRecord(); + record.type = "NoRecord"; + byte[] bytes = marshal.toBytes(record); + ServiceRecord serviceRecord = marshal.fromBytes("marshalling", + bytes, ServiceRecord.RECORD_TYPE); + } + + @Test(expected = InvalidRecordException.class) + public void testRecordValidationWrongType() throws Throwable { + ServiceRecord record = new ServiceRecord(); + record.type = "NotAServiceRecordType"; + RegistryTypeUtils.validateServiceRecord("validating", record); + } @Test public void testUnknownFieldsRoundTrip() throws Throwable { @@ -102,8 +124,8 @@ public class TestMarshalling extends RegistryTestHelper { assertEquals("2", record.get("intval")); assertNull(record.get("null")); assertEquals("defval", record.get("null", "defval")); - byte[] bytes = marshal.toByteswithHeader(record); - ServiceRecord r2 = marshal.fromBytesWithHeader("", bytes); + byte[] bytes = marshal.toBytes(record); + ServiceRecord r2 = marshal.fromBytes("", bytes); assertEquals("value", r2.get("key")); assertEquals("2", r2.get("intval")); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java index 7a7f88cd51c..853d7f17909 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/operations/TestRegistryOperations.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.registry.AbstractRegistryTest; import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.exceptions.NoRecordException; @@ -91,10 +92,8 @@ public class TestRegistryOperations extends AbstractRegistryTest { childStats.values()); assertEquals(1, records.size()); ServiceRecord record = records.get(ENTRY_PATH); - assertNotNull(record); - record.validate(); + RegistryTypeUtils.validateServiceRecord(ENTRY_PATH, record); assertMatches(written, record); - } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md index a2a5009660f..b38d9fba5ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/registry/yarn-registry.md @@ -352,6 +352,10 @@ application. Name Description + + type: String + Always: "JSONServiceRecord" + description: String Human-readable description. @@ -366,6 +370,8 @@ application. +The type field MUST be `"JSONServiceRecord"`. Mandating this string allows future record types *and* permits rapid rejection of byte arrays that lack this string before attempting JSON parsing. + ### YARN Persistence policies The YARN Resource Manager integration integrates cleanup of service records @@ -379,7 +385,6 @@ any use of the registry without the RM's participation. The attributes, `yarn:id` and `yarn:persistence` specify which records *and any child entries* may be deleted as the associated YARN components complete. - The `yarn:id` field defines the application, attempt or container ID to match; the `yarn:persistence` attribute defines the trigger for record cleanup, and implicitly the type of the contents of the `yarn:id` field. @@ -432,31 +437,32 @@ up according the lifecycle of that application. Description - addresses: List[List[String]] - a list of address tuples whose format depends on the address type - - - addressType: String - format of the binding - + api: URI as String + API implemented at the end of the binding protocol: String Protocol. Examples: `http`, `https`, `hadoop-rpc`, `zookeeper`, `web`, `REST`, `SOAP`, ... - api: String - API implemented at the end of the binding + addressType: String + format of the binding + + + addresses: List[Map[String, String]] + a list of address maps + + All string fields have a limit on size, to dissuade services from hiding complex JSON structures in the text description. -### Field: Address Type +#### Field `addressType`: Address Type -The addressType field defines the string format of entries. +The `addressType` field defines the string format of entries. Having separate types is that tools (such as a web viewer) can process binding strings without having to recognize the protocol. @@ -467,43 +473,58 @@ strings without having to recognize the protocol. binding format - `url` - `[URL]` + uri + uri:URI of endpoint - `hostname` - `[hostname]` + hostname + hostname: service host - `inetaddress` - `[hostname, port]` + inetaddress + hostname: service host, port: service port - `path` - `[/path/to/something]` + path + path: generic unix filesystem path - `zookeeper` - `[quorum-entry, path]` + zookeeper + hostname: service host, port: service port, path: ZK path -An actual zookeeper binding consists of a list of `hostname:port` bindings –the -quorum— and the path within. In the proposed schema, every quorum entry will be -listed as a triple of `[hostname, port, path]`. Client applications do not -expect the path to de be different across the quorum. The first entry in the -list of quorum hosts MUST define the path to be used by all clients. Later -entries SHOULD list the same path, though clients MUST ignore these. +In the zookeeper binding, every entry represents a single node in quorum, +the `hostname` and `port` fields defining the hostname of the ZK instance +and the port on which it is listening. The `path` field lists zookeeper path +for applications to use. For example, for HBase this would refer to the znode +containing information about the HBase cluster. + +The path MUST be identical across all address elements in the `addresses` list. +This ensures that any single address contains enough information to connect +to the quorum and connect to the relevant znode. New Address types may be defined; if not standard please prefix with the character sequence `"x-"`. -#### **Field: API** +### Field `api`: API identifier + +The API field MUST contain a URI that identifies the specific API of an endpoint. +These MUST be unique to an API to avoid confusion. + +The following strategies are suggested to provide unique URIs for an API + +1. The SOAP/WS-* convention of using the URL to where the WSDL defining the service +2. A URL to the svn/git hosted document defining a REST API +3. the `classpath` schema followed by a path to a class or package in an application. +4. The `uuid` schema with a generated UUID. + +It is hoped that standard API URIs will be defined for common APIs. Two such non-normative APIs are used in this document + +* `http://` : A web site for humans +* `classpath:javax.management.jmx`: and endpoint supporting the JMX management protocol (RMI-based) -APIs may be unique to a service class, or may be common across by service -classes. They MUST be given unique names. These MAY be based on service -packages but MAY be derived from other naming schemes: ### Examples of Service Entries @@ -524,12 +545,14 @@ overall application. It exports the URL to a load balancer. { "description" : "tomcat-based web application", - "registrationTime" : 1408638082444, "external" : [ { - "api" : "www", + "api" : "http://internal.example.org/restapis/scheduler/20141026v1", "addressType" : "uri", - "protocolType" : "REST", - "addresses" : [ [ "http://loadbalancer/" ] [ "http://loadbalancer2/" ] ] + "protocol" : "REST", + "addresses" : [ + { "uri" : "http://loadbalancer/" }, + { "uri" : "http://loadbalancer2/" } + ] } ], "internal" : [ ] } @@ -545,21 +568,23 @@ will trigger the deletion of this entry /users/devteam/org-apache-tomcat/test1/components/container-1408631738011-0001-01-000001 { - "registrationTime" : 1408638082445, "yarn:id" : "container_1408631738011_0001_01_000001", - "yarn:persistence" : "3", - "description" : null, + "yarn:persistence" : "container", + "description" : "", "external" : [ { - "api" : "www", + "api" : "http://internal.example.org/restapis/scheduler/20141026v1", "addressType" : "uri", - "protocolType" : "REST", - "addresses" : [ [ "http://rack4server3:43572" ] ] + "protocol" : "REST", + "addresses" : [{ "uri" : "rack4server3:43572" } ] } ], "internal" : [ { - "api" : "jmx", + "api" : "classpath:javax.management.jmx", "addressType" : "host/port", - "protocolType" : "JMX", - "addresses" : [ [ "rack4server3", "43573" ] ] + "protocol" : "rmi", + "addresses" : [ { + "host" : "rack4server3", + "port" : "48551" + } ] } ] } @@ -571,19 +596,22 @@ external endpoint, the JMX addresses as internal. { "registrationTime" : 1408638082445, "yarn:id" : "container_1408631738011_0001_01_000002", - "yarn:persistence" : "3", + "yarn:persistence" : "container", "description" : null, "external" : [ { - "api" : "www", + "api" : "http://internal.example.org/restapis/scheduler/20141026v1", "addressType" : "uri", - "protocolType" : "REST", + "protocol" : "REST", "addresses" : [ [ "http://rack1server28:35881" ] ] } ], "internal" : [ { - "api" : "jmx", + "api" : "classpath:javax.management.jmx", "addressType" : "host/port", - "protocolType" : "JMX", - "addresses" : [ [ "rack1server28", "35882" ] ] + "protocol" : "rmi", + "addresses" : [ { + "host" : "rack1server28", + "port" : "48551" + } ] } ] } @@ -887,3 +915,106 @@ Implementations may throttle update operations. **Rate of Polling** Clients which poll the registry may be throttled. + +# Complete service record example + +Below is a (non-normative) example of a service record retrieved +from a YARN application. + + + { + "type" : "JSONServiceRecord", + "description" : "Slider Application Master", + "yarn:persistence" : "application", + "yarn:id" : "application_1414052463672_0028", + "external" : [ { + "api" : "classpath:org.apache.slider.appmaster", + "addressType" : "host/port", + "protocol" : "hadoop/IPC", + "addresses" : [ { + "port" : "48551", + "host" : "nn.example.com" + } ] + }, { + "api" : "http://", + "addressType" : "uri", + "protocol" : "web", + "addresses" : [ { + "uri" : "http://nn.example.com:40743" + } ] + }, { + "api" : "classpath:org.apache.slider.management", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "http://nn.example.com:40743/ws/v1/slider/mgmt" + } ] + }, { + "api" : "classpath:org.apache.slider.publisher", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher" + } ] + }, { + "api" : "classpath:org.apache.slider.registry", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "http://nn.example.com:40743/ws/v1/slider/registry" + } ] + }, { + "api" : "classpath:org.apache.slider.publisher.configurations", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/slider" + } ] + }, { + "api" : "classpath:org.apache.slider.publisher.exports", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "http://nn.example.com:40743/ws/v1/slider/publisher/exports" + } ] + } ], + "internal" : [ { + "api" : "classpath:org.apache.slider.agents.secure", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "https://nn.example.com:52705/ws/v1/slider/agents" + } ] + }, { + "api" : "classpath:org.apache.slider.agents.oneway", + "addressType" : "uri", + "protocol" : "REST", + "addresses" : [ { + "uri" : "https://nn.example.com:33425/ws/v1/slider/agents" + } ] + } ] + } + +It publishes a number of endpoints, both internal and external. + +External: + +1. The IPC hostname and port for client-AM communications +1. URL to the AM's web UI +1. A series of REST URLs under the web UI for specific application services. +The details are irrelevant —note that they use an application-specific API +value to ensure uniqueness. + +Internal: +1. Two URLS to REST APIs offered by the AM for containers deployed by + the application itself. + +Python agents running in the containers retrieve the internal endpoint +URLs to communicate with their AM. The record is resolved on container startup +and cached until communications problems occur. At that point the registry is +queried for the current record, then an attempt is made to reconnect to the AM. + +Here "connectivity" problems means both "low level socket/IO errors" and +"failures in HTTPS authentication". The agents use two-way HTTPS authentication +—if the AM fails and another application starts listening on the same ports +it will trigger an authentication failure and hence service record reread.