YARN-4224. Support fetching entities by UID and change the REST
interface to conform to current REST APIs' in YARN. (Varun Saxena via gtcarrera9)
This commit is contained in:
parent
6934b05c71
commit
9d40d9d34c
|
@ -504,6 +504,19 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set UID in info which will be then used for query by UI.
|
||||
* @param uidKey key for UID in info.
|
||||
* @param uId UID to be set for the key.
|
||||
*/
|
||||
public void setUID(String uidKey, String uId) {
|
||||
if (real == null) {
|
||||
info.put(uidKey, uId);
|
||||
} else {
|
||||
real.addInfo(uidKey, uId);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return (getId() != null && getType() != null);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice;
|
||||
|
||||
/**
|
||||
* Encapsulates timeline context information.
|
||||
*/
|
||||
public class TimelineContext {
|
||||
|
||||
private String clusterId;
|
||||
private String userId;
|
||||
private String flowName;
|
||||
private Long flowRunId;
|
||||
private String appId;
|
||||
|
||||
public TimelineContext() {
|
||||
this(null, null, null, 0L, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((appId == null) ? 0 : appId.hashCode());
|
||||
result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
|
||||
result = prime * result + ((flowName == null) ? 0 : flowName.hashCode());
|
||||
result = prime * result + ((flowRunId == null) ? 0 : flowRunId.hashCode());
|
||||
result = prime * result + ((userId == null) ? 0 : userId.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TimelineContext other = (TimelineContext) obj;
|
||||
if (appId == null) {
|
||||
if (other.appId != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!appId.equals(other.appId)) {
|
||||
return false;
|
||||
}
|
||||
if (clusterId == null) {
|
||||
if (other.clusterId != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!clusterId.equals(other.clusterId)) {
|
||||
return false;
|
||||
}
|
||||
if (flowName == null) {
|
||||
if (other.flowName != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!flowName.equals(other.flowName)) {
|
||||
return false;
|
||||
}
|
||||
if (flowRunId == null) {
|
||||
if (other.flowRunId != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!flowRunId.equals(other.flowRunId)) {
|
||||
return false;
|
||||
}
|
||||
if (userId == null) {
|
||||
if (other.userId != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!userId.equals(other.userId)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public TimelineContext(String clusterId, String userId, String flowName,
|
||||
Long flowRunId, String appId) {
|
||||
this.clusterId = clusterId;
|
||||
this.userId = userId;
|
||||
this.flowName = flowName;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(String cluster) {
|
||||
this.clusterId = cluster;
|
||||
}
|
||||
|
||||
public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public void setUserId(String user) {
|
||||
this.userId = user;
|
||||
}
|
||||
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
public void setFlowName(String flow) {
|
||||
this.flowName = flow;
|
||||
}
|
||||
|
||||
public Long getFlowRunId() {
|
||||
return flowRunId;
|
||||
}
|
||||
|
||||
public void setFlowRunId(long runId) {
|
||||
this.flowRunId = runId;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String app) {
|
||||
this.appId = app;
|
||||
}
|
||||
}
|
|
@ -18,74 +18,58 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||
|
||||
public class TimelineCollectorContext {
|
||||
import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
|
||||
|
||||
/**
|
||||
* Encapsulates context information required by collector during a put.
|
||||
*/
|
||||
public class TimelineCollectorContext extends TimelineContext {
|
||||
|
||||
private String clusterId;
|
||||
private String userId;
|
||||
private String flowName;
|
||||
private String flowVersion;
|
||||
private long flowRunId;
|
||||
private String appId;
|
||||
|
||||
public TimelineCollectorContext() {
|
||||
this(null, null, null, null, 0L, null);
|
||||
}
|
||||
|
||||
public TimelineCollectorContext(String clusterId, String userId,
|
||||
String flowName, String flowVersion, long flowRunId, String appId) {
|
||||
this.clusterId = clusterId;
|
||||
this.userId = userId;
|
||||
this.flowName = flowName;
|
||||
String flowName, String flowVersion, Long flowRunId, String appId) {
|
||||
super(clusterId, userId, flowName, flowRunId, appId);
|
||||
this.flowVersion = flowVersion;
|
||||
this.flowRunId = flowRunId;
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result =
|
||||
prime * result + ((flowVersion == null) ? 0 : flowVersion.hashCode());
|
||||
return result + super.hashCode();
|
||||
}
|
||||
|
||||
public void setClusterId(String clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
public void setUserId(String userId) {
|
||||
this.userId = userId;
|
||||
}
|
||||
|
||||
public String getFlowName() {
|
||||
return flowName;
|
||||
}
|
||||
|
||||
public void setFlowName(String flowName) {
|
||||
this.flowName = flowName;
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
TimelineCollectorContext other = (TimelineCollectorContext) obj;
|
||||
if (flowVersion == null) {
|
||||
if (other.flowVersion != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!flowVersion.equals(other.flowVersion)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public String getFlowVersion() {
|
||||
return flowVersion;
|
||||
}
|
||||
|
||||
public void setFlowVersion(String flowVersion) {
|
||||
this.flowVersion = flowVersion;
|
||||
public void setFlowVersion(String version) {
|
||||
this.flowVersion = version;
|
||||
}
|
||||
|
||||
public long getFlowRunId() {
|
||||
return flowRunId;
|
||||
}
|
||||
|
||||
public void setFlowRunId(long flowRunId) {
|
||||
this.flowRunId = flowRunId;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Package org.apache.hadoop.server.timelineservice contains classes to be used
|
||||
* across timeline reader and collector.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.timelineservice;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
|
||||
|
||||
/**
|
||||
* Encapsulates fields necessary to make a query in timeline reader.
|
||||
*/
|
||||
public class TimelineReaderContext extends TimelineContext {
|
||||
|
||||
private String entityType;
|
||||
private String entityId;
|
||||
public TimelineReaderContext(String clusterId, String userId, String flowName,
|
||||
Long flowRunId, String appId, String entityType, String entityId) {
|
||||
super(clusterId, userId, flowName, flowRunId, appId);
|
||||
this.entityType = entityType;
|
||||
this.entityId = entityId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + ((entityId == null) ? 0 : entityId.hashCode());
|
||||
result =
|
||||
prime * result + ((entityType == null) ? 0 : entityType.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
TimelineReaderContext other = (TimelineReaderContext) obj;
|
||||
if (entityId == null) {
|
||||
if (other.entityId != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!entityId.equals(other.entityId)) {
|
||||
return false;
|
||||
}
|
||||
if (entityType == null) {
|
||||
if (other.entityType != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!entityType.equals(other.entityType)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public String getEntityType() {
|
||||
return entityType;
|
||||
}
|
||||
|
||||
public void setEntityType(String type) {
|
||||
this.entityType = type;
|
||||
}
|
||||
|
||||
public String getEntityId() {
|
||||
return entityId;
|
||||
}
|
||||
|
||||
public void setEntityId(String id) {
|
||||
this.entityId = id;
|
||||
}
|
||||
}
|
|
@ -27,15 +27,22 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class TimelineReaderManager extends AbstractService {
|
||||
|
||||
@VisibleForTesting
|
||||
public static final String UID_KEY = "UID";
|
||||
private TimelineReader reader;
|
||||
|
||||
public TimelineReaderManager(TimelineReader timelineReader) {
|
||||
|
@ -59,13 +66,63 @@ public class TimelineReaderManager extends AbstractService {
|
|||
return clusterId;
|
||||
}
|
||||
|
||||
private static TimelineEntityType getTimelineEntityType(String entityType) {
|
||||
if (entityType == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return TimelineEntityType.valueOf(entityType);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fill UID in the info field of entity based on the query(identified by
|
||||
* entity type).
|
||||
* @param entityType Entity type of query.
|
||||
* @param entity Timeline Entity.
|
||||
* @param context Context defining the query.
|
||||
*/
|
||||
private static void fillUID(TimelineEntityType entityType,
|
||||
TimelineEntity entity, TimelineReaderContext context) {
|
||||
if (entityType != null) {
|
||||
switch(entityType) {
|
||||
case YARN_FLOW_ACTIVITY:
|
||||
FlowActivityEntity activityEntity = (FlowActivityEntity)entity;
|
||||
context.setUserId(activityEntity.getUser());
|
||||
context.setFlowName(activityEntity.getFlowName());
|
||||
entity.setUID(UID_KEY,
|
||||
TimelineUIDConverter.FLOW_UID.encodeUID(context));
|
||||
return;
|
||||
case YARN_FLOW_RUN:
|
||||
FlowRunEntity runEntity = (FlowRunEntity)entity;
|
||||
context.setFlowRunId(runEntity.getRunId());
|
||||
entity.setUID(UID_KEY,
|
||||
TimelineUIDConverter.FLOWRUN_UID.encodeUID(context));
|
||||
return;
|
||||
case YARN_APPLICATION:
|
||||
context.setAppId(entity.getId());
|
||||
entity.setUID(UID_KEY,
|
||||
TimelineUIDConverter.APPLICATION_UID.encodeUID(context));
|
||||
return;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
context.setEntityType(entity.getType());
|
||||
context.setEntityId(entity.getId());
|
||||
entity.setUID(UID_KEY,
|
||||
TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a set of entities matching given predicates. The meaning of each
|
||||
* argument has been documented with {@link TimelineReader#getEntities}.
|
||||
*
|
||||
* @see TimelineReader#getEntities
|
||||
*/
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
|
@ -73,10 +130,20 @@ public class TimelineReaderManager extends AbstractService {
|
|||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
String cluster = getClusterID(clusterId, getConfig());
|
||||
return reader.getEntities(userId, cluster, flowName, flowRunId, appId,
|
||||
Set<TimelineEntity> entities =
|
||||
reader.getEntities(userId, cluster, flowName, flowRunId, appId,
|
||||
entityType, limit, createdTimeBegin, createdTimeEnd, relatesTo,
|
||||
isRelatedTo, infoFilters, configFilters, metricFilters, eventFilters,
|
||||
null, null, fieldsToRetrieve);
|
||||
if (entities != null) {
|
||||
TimelineEntityType type = getTimelineEntityType(entityType);
|
||||
TimelineReaderContext context = new TimelineReaderContext(cluster, userId,
|
||||
flowName, flowRunId, appId, entityType, null);
|
||||
for (TimelineEntity entity : entities) {
|
||||
fillUID(type, entity, context);
|
||||
}
|
||||
}
|
||||
return entities;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,7 +156,16 @@ public class TimelineReaderManager extends AbstractService {
|
|||
String flowName, Long flowRunId, String appId, String entityType,
|
||||
String entityId, EnumSet<Field> fields) throws IOException {
|
||||
String cluster = getClusterID(clusterId, getConfig());
|
||||
return reader.getEntity(userId, cluster, flowName, flowRunId, appId,
|
||||
TimelineEntity entity =
|
||||
reader.getEntity(userId, cluster, flowName, flowRunId, appId,
|
||||
entityType, entityId, null, null, fields);
|
||||
|
||||
if (entity != null) {
|
||||
TimelineEntityType type = getTimelineEntityType(entityType);
|
||||
TimelineReaderContext context = new TimelineReaderContext(cluster, userId,
|
||||
flowName, flowRunId, appId, entityType, null);
|
||||
fillUID(type, entity, context);
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
/**
|
||||
* Set of utility methods to be used across timeline reader.
|
||||
*/
|
||||
final class TimelineReaderUtils {
|
||||
private TimelineReaderUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Split the passed string along the passed delimiter character while looking
|
||||
* for escape char to interpret the splitted parts correctly. For delimiter or
|
||||
* escape character to be interpreted as part of the string, they have to be
|
||||
* escaped by putting an escape character in front.
|
||||
* @param str string to be split.
|
||||
* @param delimiterChar delimiter used for splitting.
|
||||
* @param escapeChar delimiter and escape character will be escaped using this
|
||||
* character.
|
||||
* @return a list of strings after split.
|
||||
* @throws IllegalArgumentException if string is not properly escaped.
|
||||
*/
|
||||
static List<String> split(final String str, final char delimiterChar,
|
||||
final char escapeChar) throws IllegalArgumentException {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
int len = str.length();
|
||||
if (len == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<String> list = new ArrayList<String>();
|
||||
// Keeps track of offset of the passed string.
|
||||
int offset = 0;
|
||||
// Indicates start offset from which characters will be copied from original
|
||||
// string to destination string. Resets when an escape or delimiter char is
|
||||
// encountered.
|
||||
int startOffset = 0;
|
||||
StringBuilder builder = new StringBuilder(len);
|
||||
// Iterate over the string till we reach the end.
|
||||
while (offset < len) {
|
||||
if (str.charAt(offset) == escapeChar) {
|
||||
// An escape character must be followed by a delimiter or escape char
|
||||
// but we have reached the end and have no further character to look at.
|
||||
if (offset + 1 >= len) {
|
||||
throw new IllegalArgumentException(
|
||||
"Escape char not properly escaped.");
|
||||
}
|
||||
char nextChar = str.charAt(offset + 1);
|
||||
// Next character must be a delimiter or an escape char.
|
||||
if (nextChar != escapeChar && nextChar != delimiterChar) {
|
||||
throw new IllegalArgumentException(
|
||||
"Escape char or delimiter char not properly escaped.");
|
||||
}
|
||||
// Copy contents from the offset where last escape or delimiter char was
|
||||
// encountered.
|
||||
if (startOffset < offset) {
|
||||
builder.append(str.substring(startOffset, offset));
|
||||
}
|
||||
builder.append(nextChar);
|
||||
offset += 2;
|
||||
// Reset the start offset as an escape char has been encountered.
|
||||
startOffset = offset;
|
||||
continue;
|
||||
} else if (str.charAt(offset) == delimiterChar) {
|
||||
// A delimiter has been encountered without an escape character.
|
||||
// String needs to be split here. Copy remaining chars and add the
|
||||
// string to list.
|
||||
builder.append(str.substring(startOffset, offset));
|
||||
list.add(builder.toString());
|
||||
// Reset the start offset as a delimiter has been encountered.
|
||||
startOffset = ++offset;
|
||||
builder = new StringBuilder(len - offset);
|
||||
continue;
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
// Copy rest of the characters.
|
||||
if (!str.isEmpty()) {
|
||||
builder.append(str.substring(startOffset));
|
||||
}
|
||||
// Add the last part of delimited string to list.
|
||||
list.add(builder.toString());
|
||||
return list;
|
||||
}
|
||||
|
||||
private static String escapeString(final String str, final char delimiterChar,
|
||||
final char escapeChar) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
int len = str.length();
|
||||
if (len == 0) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder builder = new StringBuilder();
|
||||
// Keeps track of offset of the passed string.
|
||||
int offset = 0;
|
||||
// Indicates start offset from which characters will be copied from original
|
||||
// string to destination string. Resets when an escape or delimiter char is
|
||||
// encountered.
|
||||
int startOffset = 0;
|
||||
// Iterate over the string till we reach the end.
|
||||
while (offset < len) {
|
||||
char charAtOffset = str.charAt(offset);
|
||||
if (charAtOffset == escapeChar || charAtOffset == delimiterChar) {
|
||||
// If an escape or delimiter character is encountered, copy characters
|
||||
// from the offset where escape or delimiter was last encountered.
|
||||
if (startOffset < offset) {
|
||||
builder.append(str.substring(startOffset, offset));
|
||||
}
|
||||
// Append escape char before delimiter/escape char.
|
||||
builder.append(escapeChar).append(charAtOffset);
|
||||
// Reset start offset for copying characters when next escape/delimiter
|
||||
// char is encountered.
|
||||
startOffset = offset + 1;
|
||||
}
|
||||
offset++;
|
||||
}
|
||||
// Copy remaining characters.
|
||||
builder.append(str.substring(startOffset));
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Join different strings in the passed string array delimited by passed
|
||||
* delimiter with delimiter and escape character escaped using passed escape
|
||||
* char.
|
||||
* @param strs strings to be joined.
|
||||
* @param delimiterChar delimiter used to join strings.
|
||||
* @param escapeChar escape character used to escape delimiter and escape
|
||||
* char.
|
||||
* @return a single string joined using delimiter and properly escaped.
|
||||
*/
|
||||
static String joinAndEscapeStrings(final String[] strs,
|
||||
final char delimiterChar, final char escapeChar) {
|
||||
int len = strs.length;
|
||||
// Escape each string in string array.
|
||||
for (int index = 0; index < len; index++) {
|
||||
if (strs[index] == null) {
|
||||
return null;
|
||||
}
|
||||
strs[index] = escapeString(strs[index], delimiterChar, escapeChar);
|
||||
}
|
||||
// Join the strings after they have been escaped.
|
||||
return StringUtils.join(strs, delimiterChar);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,222 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
|
||||
/**
|
||||
* Set of utility methods to be used by timeline reader web services.
|
||||
*/
|
||||
final class TimelineReaderWebServicesUtils {
|
||||
private TimelineReaderWebServicesUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a delimited string and convert it into a set of strings. For
|
||||
* instance, if delimiter is ",", then the string should be represented as
|
||||
* "value1,value2,value3".
|
||||
* @param str delimited string.
|
||||
* @param delimiter string is delimited by this delimiter.
|
||||
* @return set of strings.
|
||||
*/
|
||||
static Set<String> parseValuesStr(String str, String delimiter) {
|
||||
if (str == null || str.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Set<String> strSet = new HashSet<String>();
|
||||
String[] strs = str.split(delimiter);
|
||||
for (String aStr : strs) {
|
||||
strSet.add(aStr.trim());
|
||||
}
|
||||
return strSet;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> void parseKeyValues(Map<String, T> map, String str,
|
||||
String pairsDelim, String keyValuesDelim, boolean stringValue,
|
||||
boolean multipleValues) {
|
||||
String[] pairs = str.split(pairsDelim);
|
||||
for (String pair : pairs) {
|
||||
if (pair == null || pair.trim().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
String[] pairStrs = pair.split(keyValuesDelim);
|
||||
if (pairStrs.length < 2) {
|
||||
continue;
|
||||
}
|
||||
if (!stringValue) {
|
||||
try {
|
||||
Object value =
|
||||
GenericObjectMapper.OBJECT_READER.readValue(pairStrs[1].trim());
|
||||
map.put(pairStrs[0].trim(), (T) value);
|
||||
} catch (IOException e) {
|
||||
map.put(pairStrs[0].trim(), (T) pairStrs[1].trim());
|
||||
}
|
||||
} else {
|
||||
String key = pairStrs[0].trim();
|
||||
if (multipleValues) {
|
||||
Set<String> values = new HashSet<String>();
|
||||
for (int i = 1; i < pairStrs.length; i++) {
|
||||
values.add(pairStrs[i].trim());
|
||||
}
|
||||
map.put(key, (T) values);
|
||||
} else {
|
||||
map.put(key, (T) pairStrs[1].trim());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a delimited string and convert it into a map of key-values with each
|
||||
* key having a set of values. Both the key and values are interpreted as
|
||||
* strings.
|
||||
* For instance, if pairsDelim is "," and keyValuesDelim is ":", then the
|
||||
* string should be represented as
|
||||
* "key1:value11:value12:value13,key2:value21,key3:value31:value32".
|
||||
* @param str delimited string represented as multiple keys having multiple
|
||||
* values.
|
||||
* @param pairsDelim key-values pairs are delimited by this delimiter.
|
||||
* @param keyValuesDelim values for a key are delimited by this delimiter.
|
||||
* @return a map of key-values with each key having a set of values.
|
||||
*/
|
||||
static Map<String, Set<String>> parseKeyStrValuesStr(String str,
|
||||
String pairsDelim, String keyValuesDelim) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, Set<String>> map = new HashMap<String, Set<String>>();
|
||||
parseKeyValues(map, str, pairsDelim, keyValuesDelim, true, true);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a delimited string and convert it into a map of key-value pairs with
|
||||
* both the key and value interpreted as strings.
|
||||
* For instance, if pairsDelim is "," and keyValDelim is ":", then the string
|
||||
* should be represented as "key1:value1,key2:value2,key3:value3".
|
||||
* @param str delimited string represented as key-value pairs.
|
||||
* @param pairsDelim key-value pairs are delimited by this delimiter.
|
||||
* @param keyValuesDelim key and value are delimited by this delimiter.
|
||||
* @return a map of key-value pairs with both key and value being strings.
|
||||
*/
|
||||
static Map<String, String> parseKeyStrValueStr(String str,
|
||||
String pairsDelim, String keyValDelim) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, String> map = new HashMap<String, String>();
|
||||
parseKeyValues(map, str, pairsDelim, keyValDelim, true, false);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a delimited string and convert it into a map of key-value pairs with
|
||||
* key being a string and value interpreted as any object.
|
||||
* For instance, if pairsDelim is "," and keyValDelim is ":", then the string
|
||||
* should be represented as "key1:value1,key2:value2,key3:value3".
|
||||
* @param str delimited string represented as key-value pairs.
|
||||
* @param pairsDelim key-value pairs are delimited by this delimiter.
|
||||
* @param keyValuesDelim key and value are delimited by this delimiter.
|
||||
* @return a map of key-value pairs with key being a string and value amy
|
||||
* object.
|
||||
*/
|
||||
static Map<String, Object> parseKeyStrValueObj(String str,
|
||||
String pairsDelim, String keyValDelim) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
parseKeyValues(map, str, pairsDelim, keyValDelim, false, false);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interprets passed string as set of fields delimited by passed delimiter.
|
||||
* For instance, if delimiter is ",", then the passed string should be
|
||||
* represented as "METRICS,CONFIGS" where the delimited parts of the string
|
||||
* present in {@link Field}.
|
||||
* @param str passed string.
|
||||
* @param delimiter string delimiter.
|
||||
* @return a set of {@link Field}.
|
||||
*/
|
||||
static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
String[] strs = str.split(delimiter);
|
||||
EnumSet<Field> fieldList = EnumSet.noneOf(Field.class);
|
||||
for (String s : strs) {
|
||||
fieldList.add(Field.valueOf(s.trim().toUpperCase()));
|
||||
}
|
||||
return fieldList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interpret passed string as a long.
|
||||
* @param str Passed string.
|
||||
* @return long representation if string is not null, null otherwise.
|
||||
*/
|
||||
static Long parseLongStr(String str) {
|
||||
return str == null ? null : Long.parseLong(str.trim());
|
||||
}
|
||||
|
||||
/**
|
||||
* Trims the passed string if its not null.
|
||||
* @param str Passed string.
|
||||
* @return trimmed string if string is not null, null otherwise.
|
||||
*/
|
||||
static String parseStr(String str) {
|
||||
return str == null ? null : str.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get UGI from HTTP request.
|
||||
* @param req HTTP request.
|
||||
* @return UGI.
|
||||
*/
|
||||
static UserGroupInformation getUser(HttpServletRequest req) {
|
||||
String remoteUser = req.getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
return callerUGI;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get username from caller UGI.
|
||||
* @param callerUGI caller UGI.
|
||||
* @return username.
|
||||
*/
|
||||
static String getUserName(UserGroupInformation callerUGI) {
|
||||
return ((callerUGI != null) ? callerUGI.getUserName().trim() : "");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,245 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used for encoding/decoding UID which will be used for query by UI.
|
||||
*/
|
||||
enum TimelineUIDConverter {
|
||||
// Flow UID should contain cluster, user and flow name.
|
||||
FLOW_UID {
|
||||
@Override
|
||||
String encodeUID(TimelineReaderContext context) {
|
||||
if (context == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getClusterId() == null || context.getUserId() == null ||
|
||||
context.getFlowName() == null) {
|
||||
return null;
|
||||
}
|
||||
String[] flowNameTupleArr = {context.getClusterId(), context.getUserId(),
|
||||
context.getFlowName()};
|
||||
return joinAndEscapeUIDParts(flowNameTupleArr);
|
||||
}
|
||||
|
||||
@Override
|
||||
TimelineReaderContext decodeUID(String uId) throws Exception {
|
||||
if (uId == null) {
|
||||
return null;
|
||||
}
|
||||
List<String> flowNameTupleList = splitUID(uId);
|
||||
// Should have 3 parts i.e. cluster, user and flow name.
|
||||
if (flowNameTupleList.size() != 3) {
|
||||
return null;
|
||||
}
|
||||
return new TimelineReaderContext(flowNameTupleList.get(0),
|
||||
flowNameTupleList.get(1), flowNameTupleList.get(2), null,
|
||||
null, null, null);
|
||||
}
|
||||
},
|
||||
|
||||
// Flowrun UID should contain cluster, user, flow name and flowrun id.
|
||||
FLOWRUN_UID{
|
||||
@Override
|
||||
String encodeUID(TimelineReaderContext context) {
|
||||
if (context == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getClusterId() == null || context.getUserId() == null ||
|
||||
context.getFlowName() == null || context.getFlowRunId() == null) {
|
||||
return null;
|
||||
}
|
||||
String[] flowRunTupleArr = {context.getClusterId(), context.getUserId(),
|
||||
context.getFlowName(), context.getFlowRunId().toString()};
|
||||
return joinAndEscapeUIDParts(flowRunTupleArr);
|
||||
}
|
||||
|
||||
@Override
|
||||
TimelineReaderContext decodeUID(String uId) throws Exception {
|
||||
if (uId == null) {
|
||||
return null;
|
||||
}
|
||||
List<String> flowRunTupleList = splitUID(uId);
|
||||
// Should have 4 parts i.e. cluster, user, flow name and flowrun id.
|
||||
if (flowRunTupleList.size() != 4) {
|
||||
return null;
|
||||
}
|
||||
return new TimelineReaderContext(flowRunTupleList.get(0),
|
||||
flowRunTupleList.get(1), flowRunTupleList.get(2),
|
||||
Long.parseLong(flowRunTupleList.get(3)), null, null, null);
|
||||
}
|
||||
},
|
||||
|
||||
// Application UID should contain cluster, user, flow name, flowrun id
|
||||
// and app id OR cluster and app id(i.e.without flow context info).
|
||||
APPLICATION_UID{
|
||||
@Override
|
||||
String encodeUID(TimelineReaderContext context) {
|
||||
if (context == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getClusterId() == null || context.getAppId() == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getUserId() != null && context.getFlowName() != null &&
|
||||
context.getFlowRunId() != null) {
|
||||
// Flow information exists.
|
||||
String[] appTupleArr = {context.getClusterId(), context.getUserId(),
|
||||
context.getFlowName(), context.getFlowRunId().toString(),
|
||||
context.getAppId()};
|
||||
return joinAndEscapeUIDParts(appTupleArr);
|
||||
} else {
|
||||
// Only cluster and app information exists. Flow info does not exist.
|
||||
String[] appTupleArr = {context.getClusterId(), context.getAppId()};
|
||||
return joinAndEscapeUIDParts(appTupleArr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
TimelineReaderContext decodeUID(String uId) throws Exception {
|
||||
if (uId == null) {
|
||||
return null;
|
||||
}
|
||||
List<String> appTupleList = splitUID(uId);
|
||||
// Should have 5 parts i.e. cluster, user, flow name, flowrun id
|
||||
// and app id OR should have 2 parts i.e. cluster and app id.
|
||||
if (appTupleList.size() == 5) {
|
||||
// Flow information exists.
|
||||
return new TimelineReaderContext(appTupleList.get(0),
|
||||
appTupleList.get(1), appTupleList.get(2),
|
||||
Long.parseLong(appTupleList.get(3)), appTupleList.get(4),
|
||||
null, null);
|
||||
} else if (appTupleList.size() == 2) {
|
||||
// Flow information does not exist.
|
||||
return new TimelineReaderContext(appTupleList.get(0), null, null, null,
|
||||
appTupleList.get(1), null, null);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Generic Entity UID should contain cluster, user, flow name, flowrun id,
|
||||
// app id, entity type and entity id OR should contain cluster, appid, entity
|
||||
// type and entity id(i.e.without flow context info).
|
||||
GENERIC_ENTITY_UID {
|
||||
@Override
|
||||
String encodeUID(TimelineReaderContext context) {
|
||||
if (context == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getClusterId() == null || context.getAppId() == null ||
|
||||
context.getEntityType() == null || context.getEntityId() == null) {
|
||||
return null;
|
||||
}
|
||||
if (context.getUserId() != null && context.getFlowName() != null &&
|
||||
context.getFlowRunId() != null) {
|
||||
// Flow information exists.
|
||||
String[] entityTupleArr = {context.getClusterId(), context.getUserId(),
|
||||
context.getFlowName(), context.getFlowRunId().toString(),
|
||||
context.getAppId(), context.getEntityType(), context.getEntityId()};
|
||||
return joinAndEscapeUIDParts(entityTupleArr);
|
||||
} else {
|
||||
// Only entity and app information exists. Flow info does not exist.
|
||||
String[] entityTupleArr = {context.getClusterId(), context.getAppId(),
|
||||
context.getEntityType(), context.getEntityId()};
|
||||
return joinAndEscapeUIDParts(entityTupleArr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
TimelineReaderContext decodeUID(String uId) throws Exception {
|
||||
if (uId == null) {
|
||||
return null;
|
||||
}
|
||||
List<String> entityTupleList = splitUID(uId);
|
||||
// Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id,
|
||||
// entity type and entity id OR should have 4 parts i.e. cluster, app id,
|
||||
// entity type and entity id.
|
||||
if (entityTupleList.size() == 7) {
|
||||
// Flow information exists.
|
||||
return new TimelineReaderContext(entityTupleList.get(0),
|
||||
entityTupleList.get(1), entityTupleList.get(2),
|
||||
Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4),
|
||||
entityTupleList.get(5), entityTupleList.get(6));
|
||||
} else if (entityTupleList.size() == 4) {
|
||||
// Flow information does not exist.
|
||||
return new TimelineReaderContext(entityTupleList.get(0), null, null,
|
||||
null, entityTupleList.get(1), entityTupleList.get(2),
|
||||
entityTupleList.get(3));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Delimiter used for UID.
|
||||
*/
|
||||
public static final char UID_DELIMITER_CHAR = '!';
|
||||
|
||||
/**
|
||||
* Escape Character used if delimiter or escape character itself is part of
|
||||
* different components of UID.
|
||||
*/
|
||||
public static final char UID_ESCAPE_CHAR = '*';
|
||||
|
||||
/**
|
||||
* Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
|
||||
* @param uid
|
||||
* @return a list of different parts of UID split across delimiter.
|
||||
* @throws IllegalArgumentException if UID is not properly escaped.
|
||||
*/
|
||||
private static List<String> splitUID(String uid)
|
||||
throws IllegalArgumentException {
|
||||
return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with
|
||||
* delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if
|
||||
* UID parts contain them.
|
||||
* @param parts an array of UID parts to be joined.
|
||||
* @return a string joined using the delimiter with escape and delimiter
|
||||
* characters escaped if they are part of the string parts to be joined.
|
||||
* Returns null if one of the parts is null.
|
||||
*/
|
||||
private static String joinAndEscapeUIDParts(String[] parts) {
|
||||
return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR,
|
||||
UID_ESCAPE_CHAR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes UID depending on UID implementation.
|
||||
* @param context
|
||||
* @return UID represented as a string.
|
||||
*/
|
||||
abstract String encodeUID(TimelineReaderContext context);
|
||||
|
||||
/**
|
||||
* Decodes UID depending on UID implementation.
|
||||
* @param uId
|
||||
* @return a {@link TimelineReaderContext} object if UID passed can be
|
||||
* decoded, null otherwise.
|
||||
* @throws Exception
|
||||
*/
|
||||
abstract TimelineReaderContext decodeUID(String uId) throws Exception;
|
||||
}
|
|
@ -26,6 +26,8 @@ import java.util.Set;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
|
||||
|
@ -87,10 +89,14 @@ public interface TimelineReader extends Service {
|
|||
* @param fieldsToRetrieve
|
||||
* Specifies which fields of the entity object to retrieve(optional), see
|
||||
* {@link Field}. If null, retrieves 4 fields namely entity id,
|
||||
* entity type and entity created time. All entities will be returned if
|
||||
* entity type and entity created time. All fields will be returned if
|
||||
* {@link Field#ALL} is specified.
|
||||
* @return a {@link TimelineEntity} instance or null. The entity will
|
||||
* contain the metadata plus the given fields to retrieve.
|
||||
* If entityType is YARN_FLOW_RUN, entity returned is of type
|
||||
* {@link FlowRunEntity}.
|
||||
* For all other entity types, entity returned is of type
|
||||
* {@link TimelineEntity}.
|
||||
* @throws IOException
|
||||
*/
|
||||
TimelineEntity getEntity(String userId, String clusterId, String flowName,
|
||||
|
@ -167,12 +173,18 @@ public interface TimelineReader extends Service {
|
|||
* @param fieldsToRetrieve
|
||||
* Specifies which fields of the entity object to retrieve(optional), see
|
||||
* {@link Field}. If null, retrieves 4 fields namely entity id,
|
||||
* entity type and entity created time. All entities will be returned if
|
||||
* entity type and entity created time. All fields will be returned if
|
||||
* {@link Field#ALL} is specified.
|
||||
* @return A set of {@link TimelineEntity} instances of the given entity type
|
||||
* in the given context scope which matches the given predicates
|
||||
* ordered by created time, descending. Each entity will only contain the
|
||||
* metadata(id, type and created time) plus the given fields to retrieve.
|
||||
* If entityType is YARN_FLOW_ACTIVITY, entities returned are of type
|
||||
* {@link FlowActivityEntity}.
|
||||
* If entityType is YARN_FLOW_RUN, entities returned are of type
|
||||
* {@link FlowRunEntity}.
|
||||
* For all other entity types, entities returned are of type
|
||||
* {@link TimelineEntity}.
|
||||
* @throws IOException
|
||||
*/
|
||||
Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -195,9 +196,9 @@ class GenericEntityReader extends TimelineEntityReader {
|
|||
AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
||||
((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
|
||||
} else {
|
||||
throw new IOException(
|
||||
"Unable to find the context flow ID and flow run ID for clusterId=" +
|
||||
clusterId + ", appId=" + appId);
|
||||
throw new NotFoundException(
|
||||
"Unable to find the context flow ID and flow run ID for clusterId=" +
|
||||
clusterId + ", appId=" + appId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestTimelineReaderUtils {
|
||||
|
||||
@Test
|
||||
public void testSplitUsingEscapeAndDelimChar() throws Exception {
|
||||
List<String> list =
|
||||
TimelineReaderUtils.split("*!cluster!*!b**o***!xer!oozie**", '!', '*');
|
||||
String[] arr = new String[list.size()];
|
||||
arr = list.toArray(arr);
|
||||
assertArrayEquals(new String[] {"!cluster", "!b*o*!xer", "oozie*"}, arr);
|
||||
list = TimelineReaderUtils.split("*!cluster!*!b**o***!xer!!", '!', '*');
|
||||
arr = new String[list.size()];
|
||||
arr = list.toArray(arr);
|
||||
assertArrayEquals(new String[] {"!cluster", "!b*o*!xer", "", ""}, arr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJoinAndEscapeStrings() throws Exception {
|
||||
assertEquals("*!cluster!*!b**o***!xer!oozie**",
|
||||
TimelineReaderUtils.joinAndEscapeStrings(
|
||||
new String[] { "!cluster", "!b*o*!xer", "oozie*"}, '!', '*'));
|
||||
assertEquals("*!cluster!*!b**o***!xer!!",
|
||||
TimelineReaderUtils.joinAndEscapeStrings(
|
||||
new String[] { "!cluster", "!b*o*!xer", "", ""}, '!', '*'));
|
||||
assertNull(TimelineReaderUtils.joinAndEscapeStrings(
|
||||
new String[] { "!cluster", "!b*o*!xer", null, ""}, '!', '*'));
|
||||
}
|
||||
}
|
|
@ -166,7 +166,7 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_1");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app/id_1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -188,8 +188,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_1?userid=user1&" +
|
||||
"flowname=flow1&flowrunid=1");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app/id_1?" +
|
||||
"userid=user1&flowname=flow1&flowrunid=1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -208,7 +208,8 @@ public class TestTimelineReaderWebServices {
|
|||
try {
|
||||
// Fields are case insensitive.
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_1?fields=CONFIGS,Metrics,info");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app/id_1?" +
|
||||
"fields=CONFIGS,Metrics,info");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -217,7 +218,10 @@ public class TestTimelineReaderWebServices {
|
|||
assertEquals("app", entity.getType());
|
||||
assertEquals(3, entity.getConfigs().size());
|
||||
assertEquals(3, entity.getMetrics().size());
|
||||
assertEquals(1, entity.getInfo().size());
|
||||
assertTrue("UID should be present",
|
||||
entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
|
||||
// Includes UID.
|
||||
assertEquals(2, entity.getInfo().size());
|
||||
// No events will be returned as events are not part of fields.
|
||||
assertEquals(0, entity.getEvents().size());
|
||||
} finally {
|
||||
|
@ -230,7 +234,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_1?fields=ALL");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app/id_1?" +
|
||||
"fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -239,7 +244,10 @@ public class TestTimelineReaderWebServices {
|
|||
assertEquals("app", entity.getType());
|
||||
assertEquals(3, entity.getConfigs().size());
|
||||
assertEquals(3, entity.getMetrics().size());
|
||||
assertEquals(1, entity.getInfo().size());
|
||||
assertTrue("UID should be present",
|
||||
entity.getInfo().containsKey(TimelineReaderManager.UID_KEY));
|
||||
// Includes UID.
|
||||
assertEquals(2, entity.getInfo().size());
|
||||
assertEquals(2, entity.getEvents().size());
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
@ -251,7 +259,7 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/app1/app/id_10");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app/id_10");
|
||||
verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
@ -263,7 +271,7 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/app1/app/id_1");
|
||||
"timeline/apps/app1/entities/app/id_1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -272,7 +280,7 @@ public class TestTimelineReaderWebServices {
|
|||
assertEquals("app", entity.getType());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/app1/app");
|
||||
"timeline/apps/app1/entities/app");
|
||||
resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -289,7 +297,7 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -312,7 +320,7 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?limit=2");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?limit=2");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -326,7 +334,7 @@ public class TestTimelineReaderWebServices {
|
|||
entities.contains(newEntity("app", "id_4")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?limit=3");
|
||||
"clusters/cluster1/apps/app1/entities/app?limit=3");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -344,8 +352,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?createdtimestart=1425016502030&"
|
||||
+ "createdtimeend=1425016502060");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"createdtimestart=1425016502030&createdtimeend=1425016502060");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -356,7 +364,8 @@ public class TestTimelineReaderWebServices {
|
|||
entities.contains(newEntity("app", "id_4")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?createdtimeend=1425016502010");
|
||||
"clusters/cluster1/apps/app1/entities/app?createdtimeend" +
|
||||
"=1425016502010");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -366,7 +375,8 @@ public class TestTimelineReaderWebServices {
|
|||
entities.contains(newEntity("app", "id_4")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?createdtimestart=1425016502010");
|
||||
"clusters/cluster1/apps/app1/entities/app?createdtimestart=" +
|
||||
"1425016502010");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -384,7 +394,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?relatesto=flow:flow1");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?relatesto=" +
|
||||
"flow:flow1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -395,8 +406,8 @@ public class TestTimelineReaderWebServices {
|
|||
entities.contains(newEntity("app", "id_1")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?isrelatedto=type1:tid1_2,type2:" +
|
||||
"tid2_1%60");
|
||||
"clusters/cluster1/apps/app1/entities/app?isrelatedto=" +
|
||||
"type1:tid1_2,type2:tid2_1%60");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -406,8 +417,8 @@ public class TestTimelineReaderWebServices {
|
|||
entities.contains(newEntity("app", "id_1")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app?isrelatedto=type1:tid1_1:tid1_2" +
|
||||
",type2:tid2_1%60");
|
||||
"clusters/cluster1/apps/app1/entities/app?isrelatedto=" +
|
||||
"type1:tid1_1:tid1_2,type2:tid2_1%60");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -425,8 +436,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?conffilters=config_1:123," +
|
||||
"config_3:abc");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"conffilters=config_1:123,config_3:abc");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -447,7 +458,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?infofilters=info2:3.5");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"infofilters=info2:3.5");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -466,7 +478,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?metricfilters=metric3");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"metricfilters=metric3");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -487,7 +500,8 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?eventfilters=event_2,event_4");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"eventfilters=event_2,event_4");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -506,10 +520,11 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
|
||||
"isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
|
||||
"flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
|
||||
"&createdtimestart=1425016502030&createdtimeend=1425016502060");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?" +
|
||||
"metricfilters=metric7&isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1" +
|
||||
"%60&relatesto=flow:flow1&eventfilters=event_2,event_4&infofilters=" +
|
||||
"info2:3.5&createdtimestart=1425016502030&createdtimeend=" +
|
||||
"1425016502060");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -526,15 +541,15 @@ public class TestTimelineReaderWebServices {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/app1/app?flowrunid=a23b");
|
||||
"timeline/clusters/cluster1/apps/app1/entities/app?flowrunid=a23b");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entity/cluster1/app1/app/id_1?flowrunid=2ab15");
|
||||
"clusters/cluster1/apps/app1/entities/app/id_1?flowrunid=2ab15");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"entities/cluster1/app1/app/?limit=#$561av");
|
||||
"clusters/cluster1/apps/app1/entities/app?limit=#$561av");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
|
|
@ -28,8 +28,10 @@ import java.net.HttpURLConnection;
|
|||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.text.DateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -349,7 +351,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrun/user1/cluster1/flow_name/1002345678919");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
|
||||
"1002345678919");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -366,7 +369,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
|
||||
// Query without specifying cluster ID.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrun/user1/flow_name/1002345678919");
|
||||
"timeline/users/user1/flows/flow_name/runs/1002345678919");
|
||||
resp = getResponse(client, uri);
|
||||
entity = resp.getEntity(FlowRunEntity.class);
|
||||
assertNotNull(entity);
|
||||
|
@ -390,7 +393,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<FlowRunEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
|
@ -408,8 +411,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
assertEquals(0, entity.getMetrics().size());
|
||||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name?limit=1");
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"clusters/cluster1/users/user1/flows/flow_name/runs?limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -424,7 +427,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name?" +
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
|
||||
"createdtimestart=1425016501030");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
|
@ -440,7 +443,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name?" +
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
|
||||
"createdtimestart=1425016500999&createdtimeend=1425016501035");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
|
@ -459,7 +462,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name?" +
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
|
||||
"createdtimeend=1425016501030");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
|
@ -475,7 +478,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowruns/user1/cluster1/flow_name?fields=metrics");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
|
||||
"fields=metrics");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
|
@ -497,12 +501,263 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesByUID() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
// Query all flows.
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<FlowActivityEntity> flowEntities =
|
||||
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
assertNotNull(flowEntities);
|
||||
assertEquals(2, flowEntities.size());
|
||||
List<String> listFlowUIDs = new ArrayList<String>();
|
||||
for (FlowActivityEntity entity : flowEntities) {
|
||||
String flowUID =
|
||||
(String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
listFlowUIDs.add(flowUID);
|
||||
assertEquals(TimelineUIDConverter.FLOW_UID.encodeUID(
|
||||
new TimelineReaderContext(entity.getCluster(), entity.getUser(),
|
||||
entity.getFlowName(), null, null, null, null)), flowUID);
|
||||
assertTrue((entity.getId().endsWith("@flow_name") &&
|
||||
entity.getFlowRuns().size() == 2) ||
|
||||
(entity.getId().endsWith("@flow_name2") &&
|
||||
entity.getFlowRuns().size() == 1));
|
||||
}
|
||||
|
||||
// Query flowruns based on UID returned in query above.
|
||||
List<String> listFlowRunUIDs = new ArrayList<String>();
|
||||
for (String flowUID : listFlowUIDs) {
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flow-uid/" + flowUID + "/runs");
|
||||
resp = getResponse(client, uri);
|
||||
Set<FlowRunEntity> frEntities =
|
||||
resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
|
||||
assertNotNull(frEntities);
|
||||
for (FlowRunEntity entity : frEntities) {
|
||||
String flowRunUID =
|
||||
(String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
listFlowRunUIDs.add(flowRunUID);
|
||||
assertEquals(TimelineUIDConverter.FLOWRUN_UID.encodeUID(
|
||||
new TimelineReaderContext("cluster1", entity.getUser(),
|
||||
entity.getName(), entity.getRunId(), null, null, null)),
|
||||
flowRunUID);
|
||||
}
|
||||
}
|
||||
assertEquals(3, listFlowRunUIDs.size());
|
||||
|
||||
// Query single flowrun based on UIDs' returned in query to get flowruns.
|
||||
for (String flowRunUID : listFlowRunUIDs) {
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/run-uid/" + flowRunUID);
|
||||
resp = getResponse(client, uri);
|
||||
FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
|
||||
assertNotNull(entity);
|
||||
}
|
||||
|
||||
// Query apps based on UIDs' returned in query to get flowruns.
|
||||
List<String> listAppUIDs = new ArrayList<String>();
|
||||
for (String flowRunUID : listFlowRunUIDs) {
|
||||
TimelineReaderContext context =
|
||||
TimelineUIDConverter.FLOWRUN_UID.decodeUID(flowRunUID);
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/run-uid/" + flowRunUID + "/apps");
|
||||
resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> appEntities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(appEntities);
|
||||
for (TimelineEntity entity : appEntities) {
|
||||
String appUID =
|
||||
(String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
listAppUIDs.add(appUID);
|
||||
assertEquals(TimelineUIDConverter.APPLICATION_UID.encodeUID(
|
||||
new TimelineReaderContext(context.getClusterId(),
|
||||
context.getUserId(), context.getFlowName(),
|
||||
context.getFlowRunId(), entity.getId(), null, null)), appUID);
|
||||
}
|
||||
}
|
||||
assertEquals(4, listAppUIDs.size());
|
||||
|
||||
// Query single app based on UIDs' returned in query to get apps.
|
||||
for (String appUID : listAppUIDs) {
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app-uid/" + appUID);
|
||||
resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
}
|
||||
|
||||
// Query entities based on UIDs' returned in query to get apps and
|
||||
// a specific entity type(in this case type1).
|
||||
List<String> listEntityUIDs = new ArrayList<String>();
|
||||
for (String appUID : listAppUIDs) {
|
||||
TimelineReaderContext context =
|
||||
TimelineUIDConverter.APPLICATION_UID.decodeUID(appUID);
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app-uid/" + appUID + "/entities/type1");
|
||||
resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
for (TimelineEntity entity : entities) {
|
||||
String entityUID =
|
||||
(String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
listEntityUIDs.add(entityUID);
|
||||
assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(
|
||||
new TimelineReaderContext(context.getClusterId(),
|
||||
context.getUserId(), context.getFlowName(),
|
||||
context.getFlowRunId(), context.getAppId(), "type1",
|
||||
entity.getId())), entityUID);
|
||||
}
|
||||
}
|
||||
assertEquals(2, listEntityUIDs.size());
|
||||
|
||||
// Query single entity based on UIDs' returned in query to get entities.
|
||||
for (String entityUID : listEntityUIDs) {
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity-uid/" + entityUID);
|
||||
resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flow-uid/dummy:flow/runs");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/run-uid/dummy:flowrun");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
// Run Id is not a numerical value.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/run-uid/some:dummy:flow:123v456");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/run-uid/dummy:flowrun/apps");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app-uid/dummy:app");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app-uid/dummy:app/entities/type1");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity-uid/dummy:entity");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
String appUIDWithFlowInfo =
|
||||
"cluster1!user1!flow_name!1002345678919!application_1111111111_1111";
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+
|
||||
"timeline/app-uid/" + appUIDWithFlowInfo);
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(appEntity1);
|
||||
assertEquals(
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType());
|
||||
assertEquals("application_1111111111_1111", appEntity1.getId());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"app-uid/" + appUIDWithFlowInfo + "/entities/type1");
|
||||
resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities1 =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities1);
|
||||
assertEquals(2, entities1.size());
|
||||
for (TimelineEntity entity : entities1) {
|
||||
assertNotNull(entity.getInfo());
|
||||
assertEquals(1, entity.getInfo().size());
|
||||
String uid =
|
||||
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
assertNotNull(uid);
|
||||
assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") ||
|
||||
uid.equals(appUIDWithFlowInfo + "!type1!entity2"));
|
||||
}
|
||||
|
||||
String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"app-uid/" + appUIDWithoutFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(appEntity2);
|
||||
assertEquals(
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType());
|
||||
assertEquals("application_1111111111_1111", appEntity2.getId());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
|
||||
"app-uid/" + appUIDWithoutFlowInfo + "/entities/type1");
|
||||
resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities2 =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities2);
|
||||
assertEquals(2, entities2.size());
|
||||
for (TimelineEntity entity : entities2) {
|
||||
assertNotNull(entity.getInfo());
|
||||
assertEquals(1, entity.getInfo().size());
|
||||
String uid =
|
||||
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
|
||||
assertNotNull(uid);
|
||||
assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") ||
|
||||
uid.equals(appUIDWithoutFlowInfo + "!type1!entity2"));
|
||||
}
|
||||
|
||||
String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"entity-uid/" + entityUIDWithFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(singleEntity1);
|
||||
assertEquals("type1", singleEntity1.getType());
|
||||
assertEquals("entity1", singleEntity1.getId());
|
||||
|
||||
String entityUIDWithoutFlowInfo =
|
||||
appUIDWithoutFlowInfo + "!type1!entity1";
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
|
||||
"entity-uid/" + entityUIDWithoutFlowInfo);
|
||||
resp = getResponse(client, uri);;
|
||||
TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(singleEntity2);
|
||||
assertEquals("type1", singleEntity2.getType());
|
||||
assertEquals("entity1", singleEntity2.getId());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUIDNotProperlyEscaped() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
String appUID =
|
||||
"cluster1!user*1!flow_name!1002345678919!application_1111111111_1111";
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+
|
||||
"timeline/app-uid/" + appUID);
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlows() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1");
|
||||
"timeline/clusters/cluster1/flows");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<FlowActivityEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
|
@ -524,7 +779,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
assertEquals(2, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?limit=1");
|
||||
"timeline/clusters/cluster1/flows?limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -532,8 +787,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
|
||||
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=" + fmt.format(dayTs) + "-" +
|
||||
fmt.format(dayTs + (2*86400000L)));
|
||||
"timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs) +
|
||||
"-" + fmt.format(dayTs + (2*86400000L)));
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -546,7 +801,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=" +
|
||||
"timeline/clusters/cluster1/flows?daterange=" +
|
||||
fmt.format(dayTs + (4*86400000L)));
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
|
@ -554,7 +809,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
assertEquals(0, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=-" +
|
||||
"timeline/clusters/cluster1/flows?daterange=-" +
|
||||
fmt.format(dayTs + (2*86400000L)));
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
|
@ -562,7 +817,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
assertEquals(2, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=" +
|
||||
"timeline/clusters/cluster1/flows?daterange=" +
|
||||
fmt.format(dayTs - (2*86400000L)) + "-");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
|
@ -570,19 +825,19 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
assertEquals(2, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=20150711:20150714");
|
||||
"timeline/clusters/cluster1/flows?daterange=20150711:20150714");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=20150714-20150711");
|
||||
"timeline/clusters/cluster1/flows?daterange=20150714-20150711");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=2015071129-20150712");
|
||||
"timeline/clusters/cluster1/flows?daterange=2015071129-20150712");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster1?daterange=20150711-2015071243");
|
||||
"timeline/clusters/cluster1/flows?daterange=20150711-2015071243");
|
||||
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
@ -594,7 +849,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/application_1111111111_1111?" +
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1111?" +
|
||||
"userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
|
@ -612,7 +867,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/application_1111111111_2222?userid=user1" +
|
||||
"timeline/apps/application_1111111111_2222?userid=user1" +
|
||||
"&fields=metrics&flowname=flow_name&flowrunid=1002345678919");
|
||||
resp = getResponse(client, uri);
|
||||
entity = resp.getEntity(TimelineEntity.class);
|
||||
|
@ -635,7 +890,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/application_1111111111_1111?" +
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1111?" +
|
||||
"fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
|
@ -661,7 +916,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entity/cluster1/application_1111111111_1111/type1/entity1");
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
|
||||
"entities/type1/entity1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
|
@ -677,7 +933,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/entities/cluster1/application_1111111111_1111/type1");
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
|
||||
"entities/type1");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -697,8 +954,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/user1/cluster1/flow_name/1002345678919?" +
|
||||
"fields=ALL");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
|
||||
"1002345678919/apps?fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -714,14 +971,15 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
|
||||
// Query without specifying cluster ID.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/user1/flow_name/1002345678919");
|
||||
"timeline/users/user1/flows/flow_name/runs/1002345678919/apps");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(2, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/user1/flow_name/1002345678919?limit=1");
|
||||
"timeline/users/user1/flows/flow_name/runs/1002345678919/" +
|
||||
"apps?limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -736,7 +994,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/cluster1/flow_name?fields=ALL");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
|
||||
"fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -754,14 +1013,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
|
||||
// Query without specifying cluster ID.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/flow_name");
|
||||
"timeline/users/user1/flows/flow_name/apps");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(3, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/flow_name?limit=1");
|
||||
"timeline/users/user1/flows/flow_name/apps?limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -777,8 +1036,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
try {
|
||||
String entityType = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/cluster1/flow_name?eventfilters=" +
|
||||
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
|
||||
"eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -788,8 +1047,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
newEntity(entityType, "application_1111111111_1111")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/cluster1/flow_name?metricfilters=" +
|
||||
"HDFS_BYTES_READ");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
|
||||
"metricfilters=HDFS_BYTES_READ");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -798,8 +1057,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
newEntity(entityType, "application_1111111111_1111")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/cluster1/flow_name?conffilters=" +
|
||||
"cfg1:value1");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
|
||||
"conffilters=cfg1:value1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
|
@ -816,7 +1075,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrun/user1/cluster1/flow_name/1002345678929");
|
||||
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
|
||||
"1002345678929");
|
||||
verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
@ -828,7 +1088,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flows/cluster2");
|
||||
"timeline/clusters/cluster2/flows");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<FlowActivityEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||
|
@ -845,8 +1105,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/user1/cluster1/flow_name/1002345678919/" +
|
||||
"application_1111111111_1378");
|
||||
"timeline/clusters/cluster1/apps/application_1111111111_1378");
|
||||
verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
||||
} finally {
|
||||
client.destroy();
|
||||
|
@ -858,7 +1117,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/user1/cluster2/flow_name/1002345678919");
|
||||
"timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" +
|
||||
"1002345678919/apps");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
@ -875,7 +1135,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
|
|||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/user1/cluster2/flow_name55");
|
||||
"timeline/clusters/cluster2/users/user1/flows/flow_name55/apps");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestTimelineUIDConverter {
|
||||
|
||||
@Test
|
||||
public void testUIDEncodingDecoding() throws Exception {
|
||||
TimelineReaderContext context = new TimelineReaderContext(
|
||||
"!cluster", "!b*o*!xer", "oozie*", null, null, null, null);
|
||||
String uid = TimelineUIDConverter.FLOW_UID.encodeUID(context);
|
||||
assertEquals("*!cluster!*!b**o***!xer!oozie**", uid);
|
||||
assertEquals(context, TimelineUIDConverter.FLOW_UID.decodeUID(uid));
|
||||
|
||||
context = new TimelineReaderContext("!cluster*", "!b*o!!x!*er", "*oozie!",
|
||||
123L, null, null, null);
|
||||
uid = TimelineUIDConverter.FLOWRUN_UID.encodeUID(context);
|
||||
assertEquals("*!cluster**!*!b**o*!*!x*!**er!**oozie*!!123", uid);
|
||||
assertEquals(context, TimelineUIDConverter.FLOWRUN_UID.decodeUID(uid));
|
||||
|
||||
context = new TimelineReaderContext("yarn_cluster", "root", "hive_join",
|
||||
1234L, "application_1111111111_1111", null, null);
|
||||
uid = TimelineUIDConverter.APPLICATION_UID.encodeUID(context);
|
||||
assertEquals(
|
||||
"yarn_cluster!root!hive_join!1234!application_1111111111_1111", uid);
|
||||
assertEquals(context, TimelineUIDConverter.APPLICATION_UID.decodeUID(uid));
|
||||
context = new TimelineReaderContext("yarn_cluster", null, null, null,
|
||||
"application_1111111111_1111", null, null);
|
||||
uid = TimelineUIDConverter.APPLICATION_UID.encodeUID(context);
|
||||
assertEquals("yarn_cluster!application_1111111111_1111", uid);
|
||||
assertEquals(context, TimelineUIDConverter.APPLICATION_UID.decodeUID(uid));
|
||||
|
||||
context = new TimelineReaderContext("yarn_cluster", "root", "hive_join",
|
||||
1234L, "application_1111111111_1111", "YARN_CONTAINER",
|
||||
"container_1111111111_1111_01_000001");
|
||||
uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context);
|
||||
assertEquals("yarn_cluster!root!hive_join!1234!application_1111111111_1111!"
|
||||
+ "YARN_CONTAINER!container_1111111111_1111_01_000001", uid);
|
||||
assertEquals(
|
||||
context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid));
|
||||
context = new TimelineReaderContext("yarn_cluster",null, null, null,
|
||||
"application_1111111111_1111", "YARN_CONTAINER",
|
||||
"container_1111111111_1111_01_000001");
|
||||
uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context);
|
||||
assertEquals("yarn_cluster!application_1111111111_1111!YARN_CONTAINER!" +
|
||||
"container_1111111111_1111_01_000001", uid);
|
||||
assertEquals(
|
||||
context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUIDNotProperlyEscaped() throws Exception {
|
||||
try {
|
||||
TimelineUIDConverter.FLOW_UID.decodeUID("*!cluster!*!b*o***!xer!oozie**");
|
||||
fail("UID not properly escaped. Exception should have been thrown.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
TimelineUIDConverter.FLOW_UID.decodeUID("*!cluster!*!b**o***!xer!oozie*");
|
||||
fail("UID not properly escaped. Exception should have been thrown.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
TimelineUIDConverter.FLOW_UID.decodeUID("*!cluster!*!b**o***xer!oozie*");
|
||||
fail("UID not properly escaped. Exception should have been thrown.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
}
|
||||
assertNull(TimelineUIDConverter.
|
||||
FLOW_UID.decodeUID("!cluster!*!b**o***!xer!oozie**"));
|
||||
assertNull(TimelineUIDConverter.
|
||||
FLOW_UID.decodeUID("*!cluster!*!b**o**!xer!oozie**"));
|
||||
}
|
||||
}
|
|
@ -127,7 +127,7 @@ public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
|
|||
TimelineEntities te = new TimelineEntities();
|
||||
te.addEntity(getTestAggregationTimelineEntity());
|
||||
TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
|
||||
"user1", "testFlow", null, 0, null);
|
||||
"user1", "testFlow", null, 0L, null);
|
||||
storage.writeAggregatedEntity(context, te,
|
||||
aggregationInfo);
|
||||
|
||||
|
|
Loading…
Reference in New Issue