YARN-5993. Allow native services quicklinks to be exported for each component. Contributed by Billie Rinaldi

This commit is contained in:
Gour Saha 2016-12-28 15:24:36 -08:00 committed by Jian He
parent 7d685f2fb3
commit a5e20f0fc1
6 changed files with 178 additions and 174 deletions

View File

@ -183,17 +183,17 @@ public final class SliderUtils {
return !isUnset(s); return !isUnset(s);
} }
public static boolean isEmpty(List l) { public static boolean isEmpty(Collection l) {
return l == null || l.isEmpty(); return l == null || l.isEmpty();
} }
/** /**
* Probe for a list existing and not being empty * Probe for a collection existing and not being empty
* @param l list * @param l collection
* @return true if the reference is valid and it contains entries * @return true if the reference is valid and it contains entries
*/ */
public static boolean isNotEmpty(List l) { public static boolean isNotEmpty(Collection l) {
return l != null && !l.isEmpty(); return l != null && !l.isEmpty();
} }

View File

@ -106,6 +106,26 @@ public class ExportEntry {
this.validUntil = validUntil; this.validUntil = validUntil;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExportEntry that = (ExportEntry) o;
if (value != null ? !value.equals(that.value) : that.value != null)
return false;
return containerId != null ? containerId.equals(that.containerId) :
that.containerId == null;
}
@Override
public int hashCode() {
int result = value != null ? value.hashCode() : 0;
result = 31 * result + (containerId != null ? containerId.hashCode() : 0);
return result;
}
@Override @Override
public String toString() { public String toString() {
return new StringBuilder("ExportEntry{"). return new StringBuilder("ExportEntry{").

View File

@ -26,8 +26,10 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
/** /**
* JSON-serializable description of a published key-val configuration. * JSON-serializable description of a published key-val configuration.
@ -41,7 +43,7 @@ public class PublishedExports {
public String description; public String description;
public long updated; public long updated;
public String updatedTime; public String updatedTime;
public Map<String, List<ExportEntry>> entries = new HashMap<>(); public Map<String, Set<ExportEntry>> entries = new HashMap<>();
public PublishedExports() { public PublishedExports() {
} }
@ -62,7 +64,7 @@ public class PublishedExports {
* @param entries entries to put * @param entries entries to put
*/ */
public PublishedExports(String description, public PublishedExports(String description,
Iterable<Map.Entry<String, List<ExportEntry>>> entries) { Iterable<Entry<String, Set<ExportEntry>>> entries) {
this.description = description; this.description = description;
putValues(entries); putValues(entries);
} }
@ -87,15 +89,22 @@ public class PublishedExports {
this.updatedTime = new Date(updated).toString(); this.updatedTime = new Date(updated).toString();
} }
public Map<String, Set<ExportEntry>> sortedEntries() {
Map<String, Set<ExportEntry>> sortedEntries = new TreeMap<>();
sortedEntries.putAll(entries);
return sortedEntries;
}
/** /**
* Set the values from an iterable (this includes a Hadoop Configuration and Java properties * Set the values from an iterable (this includes a Hadoop Configuration and Java properties
* object). Any existing value set is discarded * object). Any existing value set is discarded
* *
* @param entries entries to put * @param values values to put
*/ */
public void putValues(Iterable<Map.Entry<String, List<ExportEntry>>> entries) { public void putValues(Iterable<Map.Entry<String, Set<ExportEntry>>> values) {
this.entries = new HashMap<String, List<ExportEntry>>(); this.entries = new HashMap<>();
for (Map.Entry<String, List<ExportEntry>> entry : entries) { for (Map.Entry<String, Set<ExportEntry>> entry : values) {
this.entries.put(entry.getKey(), entry.getValue()); this.entries.put(entry.getKey(), entry.getValue());
} }
} }

View File

@ -75,6 +75,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@ -1053,32 +1054,6 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
return hosts; return hosts;
} }
/**
* Return a list of hostnames based on current ClusterNodes.
* @param values cluster nodes
* @return list of hosts
*/
public Iterable<String> getHostNamesList(Collection<ClusterNode> values) {
List<String> hosts = new ArrayList<>();
for (ClusterNode cn : values) {
hosts.add(cn.hostname);
}
return hosts;
}
/**
* Return a list of IPs based on current ClusterNodes.
* @param values cluster nodes
* @return list of hosts
*/
public Iterable<String> getIPsList(Collection<ClusterNode> values) {
List<String> hosts = new ArrayList<>();
for (ClusterNode cn : values) {
hosts.add(cn.ip);
}
return hosts;
}
/** /**
* Update ServiceRecord in Registry with IP and hostname. * Update ServiceRecord in Registry with IP and hostname.
* @param amState access to AM state * @param amState access to AM state
@ -1148,27 +1123,30 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
* Publish an export group. * Publish an export group.
* @param exportGroup export groups * @param exportGroup export groups
* @param amState access to AM state * @param amState access to AM state
* @param roleGroup component group * @param groupName export group name
*/ */
public void publishExportGroup(Map<String, List<ExportEntry>> exportGroup, public void publishExportGroup(
StateAccessForProviders amState, String roleGroup) { Map<String, Set<ExportEntry>> exportGroup,
StateAccessForProviders amState, String groupName) {
// Publish in old format for the time being // Publish in old format for the time being
Map<String, String> simpleEntries = new HashMap<>(); Map<String, String> simpleEntries = new HashMap<>();
for (Entry<String, List<ExportEntry>> entry : exportGroup.entrySet()) { for (Entry<String, Set<ExportEntry>> entry : exportGroup.entrySet()) {
List<ExportEntry> exports = entry.getValue(); Set<ExportEntry> exports = entry.getValue();
if (SliderUtils.isNotEmpty(exports)) { if (SliderUtils.isNotEmpty(exports)) {
// there is no support for multiple exports per name, so extract only Set<String> values = new TreeSet<>();
// the first one for (ExportEntry export : exports) {
simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue()); values.add(export.getValue());
}
simpleEntries.put(entry.getKey(), StringUtils.join(",", values));
} }
} }
publishApplicationInstanceData(roleGroup, roleGroup, publishApplicationInstanceData(groupName, groupName,
simpleEntries.entrySet(), amState); simpleEntries.entrySet(), amState);
PublishedExports exports = new PublishedExports(roleGroup); PublishedExports exports = new PublishedExports(groupName);
exports.setUpdated(new Date().getTime()); exports.setUpdated(new Date().getTime());
exports.putValues(exportGroup.entrySet()); exports.putValues(exportGroup.entrySet());
amState.getPublishedExportsSet().put(roleGroup, exports); amState.getPublishedExportsSet().put(groupName, exports);
} }
public Map<String, String> getExports(ConfTreeOperations appConf, public Map<String, String> getExports(ConfTreeOperations appConf,
@ -1179,75 +1157,26 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
return exports; return exports;
} }
private static final String COMPONENT_TAG = "component"; public String getGroupKey(String roleGroup, ConfTreeOperations appConf) {
private static final String HOST_FOLDER_FORMAT = "%s:%s"; String rolePrefix = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, "");
private static final String CONTAINER_LOGS_TAG = "container_log_dirs"; return getNameOrGroupKey(rolePrefix, roleGroup);
private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
/**
* Format the folder locations and publish in the registry service.
* @param folders folder information
* @param containerId container ID
* @param hostFqdn host FQDN
* @param componentName component name
*/
public void publishFolderPaths(Map<String, String> folders,
String containerId, String componentName, String hostFqdn,
StateAccessForProviders amState,
Map<String, ExportEntry> logFolderExports,
Map<String, ExportEntry> workFolderExports) {
Date now = new Date();
for (Map.Entry<String, String> entry : folders.entrySet()) {
ExportEntry exportEntry = new ExportEntry();
exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn,
entry.getValue()));
exportEntry.setContainerId(containerId);
exportEntry.setLevel(COMPONENT_TAG);
exportEntry.setTag(componentName);
exportEntry.setUpdatedTime(now.toString());
if (entry.getKey().equals("AGENT_LOG_ROOT") ||
entry.getKey().equals("LOG_DIR")) {
synchronized (logFolderExports) {
logFolderExports.put(containerId, exportEntry);
}
} else {
synchronized (workFolderExports) {
workFolderExports.put(containerId, exportEntry);
}
}
log.info("Updating log and pwd folders for container {}", containerId);
}
PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
exports.setUpdated(now.getTime());
synchronized (logFolderExports) {
updateExportsFromList(exports, logFolderExports);
}
amState.getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
exports = new PublishedExports(CONTAINER_PWDS_TAG);
exports.setUpdated(now.getTime());
synchronized (workFolderExports) {
updateExportsFromList(exports, workFolderExports);
}
amState.getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
} }
/** public String getNameKey(String roleName, String roleGroup,
* Update the export data from the map. ConfTreeOperations appConf) {
* @param exports published exports String rolePrefix = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, "");
* @param folderExports folder exports return getNameOrGroupKey(rolePrefix, roleName);
*/ }
private void updateExportsFromList(PublishedExports exports,
Map<String, ExportEntry> folderExports) { public String getNameOrGroupKey(String rolePrefix, String roleNameOrGroup) {
Map<String, List<ExportEntry>> perComponentList = new HashMap<>(); if (!rolePrefix.isEmpty()) {
for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet()) { if (!roleNameOrGroup.startsWith(rolePrefix)) {
String componentName = logEntry.getValue().getTag(); log.warn("Something went wrong, {} doesn't start with {}",
if (!perComponentList.containsKey(componentName)) { roleNameOrGroup, rolePrefix);
perComponentList.put(componentName, new ArrayList<ExportEntry>()); return null;
} }
perComponentList.get(componentName).add(logEntry.getValue()); roleNameOrGroup = roleNameOrGroup.substring(rolePrefix.length());
} }
exports.putValues(perComponentList.entrySet()); return roleNameOrGroup.toUpperCase(Locale.ENGLISH);
} }
} }

View File

@ -46,27 +46,24 @@ import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole; import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils; import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.server.appmaster.state.RoleInstance; import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Scanner; import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.slider.api.RoleKeys.ROLE_PREFIX;
public class DockerProviderService extends AbstractProviderService implements public class DockerProviderService extends AbstractProviderService implements
ProviderCore, ProviderCore,
DockerKeys, DockerKeys,
@ -77,10 +74,16 @@ public class DockerProviderService extends AbstractProviderService implements
private static final ProviderUtils providerUtils = new ProviderUtils(log); private static final ProviderUtils providerUtils = new ProviderUtils(log);
private static final String EXPORT_GROUP = "quicklinks"; private static final String EXPORT_GROUP = "quicklinks";
private static final String APPLICATION_TAG = "application"; private static final String APPLICATION_TAG = "application";
private static final String HOST_KEY_FORMAT = "${%s_HOST}";
private static final String IP_KEY_FORMAT = "${%s_IP}";
private static final String VARIABLE_INDICATOR = "${";
private String clusterName = null; private String clusterName = null;
private SliderFileSystem fileSystem = null; private SliderFileSystem fileSystem = null;
private final Map<String, Set<ExportEntry>> exportMap =
new ConcurrentHashMap<>();
protected DockerProviderService() { protected DockerProviderService() {
super("DockerProviderService"); super("DockerProviderService");
} }
@ -118,9 +121,6 @@ public class DockerProviderService extends AbstractProviderService implements
String roleName = providerRole.name; String roleName = providerRole.name;
String roleGroup = providerRole.group; String roleGroup = providerRole.group;
initializeApplicationConfiguration(instanceDefinition, fileSystem,
roleGroup);
log.info("Build launch context for Docker"); log.info("Build launch context for Docker");
log.debug(instanceDefinition.toString()); log.debug(instanceDefinition.toString());
@ -279,6 +279,23 @@ public class DockerProviderService extends AbstractProviderService implements
} }
} }
@Override
public void notifyContainerCompleted(ContainerId containerId) {
if (containerId != null) {
String containerIdStr = containerId.toString();
log.info("Removing container exports for {}", containerIdStr);
for (Set<ExportEntry> exportEntries : exportMap.values()) {
for (Iterator<ExportEntry> iter = exportEntries.iterator();
iter.hasNext();) {
ExportEntry entry = iter.next();
if (containerIdStr.equals(entry.getContainerId())) {
iter.remove();
}
}
}
}
}
@Override @Override
public boolean processContainerStatus(ContainerId containerId, public boolean processContainerStatus(ContainerId containerId,
ContainerStatus status) { ContainerStatus status) {
@ -301,7 +318,7 @@ public class DockerProviderService extends AbstractProviderService implements
containerIdStr, roleName, status.getIPs(), status.getHost()); containerIdStr, roleName, status.getIPs(), status.getHost());
publishExportGroups(containerIdStr, roleName, roleGroup, publishExportGroups(containerIdStr, roleName, roleGroup,
status.getHost()); status.getHost(), status.getIPs());
return false; return false;
} }
@ -312,17 +329,13 @@ public class DockerProviderService extends AbstractProviderService implements
* are substituted with the actual hostnames of the containers. * are substituted with the actual hostnames of the containers.
*/ */
protected void publishExportGroups(String containerId, protected void publishExportGroups(String containerId,
String roleName, String roleGroup, String thisHost) { String roleName, String roleGroup, String thisHost, List<String> ips) {
ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
Map<String, String> exports = providerUtils.getExports( Map<String, String> exports = providerUtils.getExports(
getAmState().getAppConfSnapshot(), roleGroup); getAmState().getAppConfSnapshot(), roleGroup);
String hostKeyFormat = "${%s_HOST}";
String hostNameKeyFormat = "${%s_HOSTNAME}";
String ipKeyFormat = "${%s_IP}";
// publish export groups if any // publish export groups if any
Map<String, String> standardTokens = providerUtils.getStandardTokenMap( Map<String, String> standardTokens = providerUtils.getStandardTokenMap(
appConf, internalsConf, roleName, roleGroup, containerId, appConf, internalsConf, roleName, roleGroup, containerId,
@ -331,38 +344,33 @@ public class DockerProviderService extends AbstractProviderService implements
appConf.getComponent(roleGroup).options, standardTokens); appConf.getComponent(roleGroup).options, standardTokens);
replaceTokens.putAll(standardTokens); replaceTokens.putAll(standardTokens);
String rolePrefix = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, ""); String roleNameKey = providerUtils.getNameKey(roleName, roleGroup,
for (Map.Entry<String, Map<String, ClusterNode>> entry : appConf);
getAmState().getRoleClusterNodeMapping().entrySet()) { String roleNameIPKey = null;
String otherRolePrefix = appConf.getComponentOpt(entry.getKey(), if (roleNameKey != null) {
ROLE_PREFIX, ""); replaceTokens.put(String.format(HOST_KEY_FORMAT, roleNameKey), thisHost);
if (!otherRolePrefix.equals(rolePrefix)) { roleNameIPKey = Pattern.quote(String.format(IP_KEY_FORMAT, roleNameKey));
// hostname replacements are only made within role prefix groups } else {
continue; // should not happen, but log if it does
log.info("Not replacing HOST or IP tokens because key was null for {}",
roleName);
}
String roleGroupKey = providerUtils.getGroupKey(roleGroup, appConf);
String roleGroupIPKey = null;
if (roleGroupKey != null) {
if (roleNameKey == null || !roleGroupKey.equals(roleNameKey)) {
replaceTokens.put(String.format(HOST_KEY_FORMAT, roleGroupKey),
thisHost);
roleGroupIPKey = Pattern.quote(String.format(IP_KEY_FORMAT,
roleGroupKey));
} }
String key = entry.getKey(); } else {
if (!rolePrefix.isEmpty()) { // should not happen, but log if it does
if (!key.startsWith(rolePrefix)) { log.info("Not replacing HOST or IP tokens because key was null for {}",
log.warn("Something went wrong, {} doesn't start with {}", key, roleGroup);
rolePrefix);
continue;
}
key = key.substring(rolePrefix.length());
}
key = key.toUpperCase(Locale.ENGLISH);
String host = providerUtils.getHostsList(
entry.getValue().values(), true).iterator().next();
replaceTokens.put(String.format(hostKeyFormat, key), host);
String hostName = providerUtils.getHostNamesList(
entry.getValue().values()).iterator().next();
replaceTokens.put(String.format(hostNameKeyFormat, key), hostName);
String ip = providerUtils.getIPsList(
entry.getValue().values()).iterator().next();
replaceTokens.put(String.format(ipKeyFormat, key), ip);
} }
replaceTokens.put("${THIS_HOST}", thisHost); replaceTokens.put("${THIS_HOST}", thisHost);
Map<String, List<ExportEntry>> entries = new HashMap<>();
for (Entry<String, String> export : exports.entrySet()) { for (Entry<String, String> export : exports.entrySet()) {
String value = export.getValue(); String value = export.getValue();
// replace host names and site properties // replace host names and site properties
@ -372,20 +380,55 @@ public class DockerProviderService extends AbstractProviderService implements
value = value.replaceAll(Pattern.quote(token), entry.getValue()); value = value.replaceAll(Pattern.quote(token), entry.getValue());
} }
} }
ExportEntry entry = new ExportEntry(); Set<String> values = new HashSet<>();
entry.setLevel(APPLICATION_TAG); for (String ip : ips) {
entry.setValue(value); values.add(substituteIP(roleNameIPKey, roleGroupIPKey, ip, value));
entry.setUpdatedTime(new Date().toString()); }
// over-write, app exports are singletons for (String exportValue : values) {
entries.put(export.getKey(), new ArrayList(Arrays.asList(entry))); if (exportValue.contains(VARIABLE_INDICATOR)) {
log.info("Preparing to publish. Key {} and Value {}", // not all variables have been substituted, so do not export
export.getKey(), value); continue;
}
ExportEntry entry = new ExportEntry();
entry.setContainerId(containerId);
entry.setLevel(APPLICATION_TAG);
entry.setValue(exportValue);
entry.setUpdatedTime(new Date().toString());
Set<ExportEntry> exportEntries = getExportEntries(export.getKey());
exportEntries.add(entry);
log.info("Preparing to publish for {}. Key {} and Value {}",
roleName, export.getKey(), entry);
}
} }
if (!entries.isEmpty()) { if (!exportMap.isEmpty()) {
providerUtils.publishExportGroup(entries, getAmState(), EXPORT_GROUP); providerUtils.publishExportGroup(exportMap, getAmState(), EXPORT_GROUP);
} }
} }
protected String substituteIP(String roleNameIPKey, String roleGroupIPKey,
String ip, String value) {
if (roleNameIPKey != null) {
value = value.replaceAll(roleNameIPKey, ip);
}
if (roleGroupIPKey != null) {
value = value.replaceAll(roleGroupIPKey, ip);
}
return value;
}
protected Set<ExportEntry> getExportEntries(String key) {
if (!this.exportMap.containsKey(key)) {
synchronized (this.exportMap) {
if (!this.exportMap.containsKey(key)) {
this.exportMap.put(key, Collections.newSetFromMap(
new ConcurrentHashMap<>()));
}
}
}
return this.exportMap.get(key);
}
@Override @Override
public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) { public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc); Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc);

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.StatusKeys; import org.apache.slider.api.StatusKeys;
import org.apache.slider.api.types.ApplicationLivenessInformation; import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.RoleStatistics;
import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.registry.docstore.ExportEntry; import org.apache.slider.core.registry.docstore.ExportEntry;
import org.apache.slider.core.registry.docstore.PublishedExports; import org.apache.slider.core.registry.docstore.PublishedExports;
@ -42,6 +41,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_COMPONENTS; import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_COMPONENTS;
@ -291,12 +291,15 @@ public class IndexBlock extends SliderHamletBlock {
LI<UL<Hamlet>> item = ul.li(); LI<UL<Hamlet>> item = ul.li();
item.span().$class("bold")._(export.description)._(); item.span().$class("bold")._(export.description)._();
UL sublist = item.ul(); UL sublist = item.ul();
for (Entry<String, List<ExportEntry>> entry : export.entries.entrySet()) { for (Entry<String, Set<ExportEntry>> entry : export.sortedEntries()
LI sublistItem = sublist.li()._(entry.getKey()); .entrySet()) {
for (ExportEntry exportEntry : entry.getValue()) { if (SliderUtils.isNotEmpty(entry.getValue())) {
sublistItem._(exportEntry.getValue()); LI sublistItem = sublist.li()._(entry.getKey());
for (ExportEntry exportEntry : entry.getValue()) {
sublistItem._(exportEntry.getValue());
}
sublistItem._();
} }
sublistItem._();
} }
sublist._(); sublist._();
item._(); item._();