YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee)
(cherry picked from commit 07433c2ad52df9e844dbd90020c277d3df844dcd)
This commit is contained in:
parent
9422d9b50d
commit
9e5155be36
|
@ -536,4 +536,9 @@
|
||||||
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
|
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
|
||||||
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
|
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<!-- Object cast is based on the event type -->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
|
||||||
|
<Bug pattern="BC_UNCONFIRMED_CAST" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -29,7 +29,9 @@ import javax.xml.bind.annotation.XmlRootElement;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The basic timeline entity data structure for timeline service v2. Timeline
|
* The basic timeline entity data structure for timeline service v2. Timeline
|
||||||
|
@ -133,7 +135,8 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
private HashMap<String, Object> info = new HashMap<>();
|
private HashMap<String, Object> info = new HashMap<>();
|
||||||
private HashMap<String, String> configs = new HashMap<>();
|
private HashMap<String, String> configs = new HashMap<>();
|
||||||
private Set<TimelineMetric> metrics = new HashSet<>();
|
private Set<TimelineMetric> metrics = new HashSet<>();
|
||||||
private Set<TimelineEvent> events = new HashSet<>();
|
// events should be sorted by timestamp in descending order
|
||||||
|
private NavigableSet<TimelineEvent> events = new TreeSet<>();
|
||||||
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
|
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
|
||||||
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
|
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
|
||||||
private long createdTime;
|
private long createdTime;
|
||||||
|
@ -334,7 +337,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@XmlElement(name = "events")
|
@XmlElement(name = "events")
|
||||||
public Set<TimelineEvent> getEvents() {
|
public NavigableSet<TimelineEvent> getEvents() {
|
||||||
if (real == null) {
|
if (real == null) {
|
||||||
return events;
|
return events;
|
||||||
} else {
|
} else {
|
||||||
|
@ -342,7 +345,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setEvents(Set<TimelineEvent> events) {
|
public void setEvents(NavigableSet<TimelineEvent> events) {
|
||||||
if (real == null) {
|
if (real == null) {
|
||||||
this.events = events;
|
this.events = events;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
import org.codehaus.jackson.JsonGenerationException;
|
import org.codehaus.jackson.JsonGenerationException;
|
||||||
import org.codehaus.jackson.map.JsonMappingException;
|
import org.codehaus.jackson.map.JsonMappingException;
|
||||||
|
@ -119,61 +120,34 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
private static void fillFields(TimelineEntity finalEntity,
|
private static void fillFields(TimelineEntity finalEntity,
|
||||||
TimelineEntity real, EnumSet<Field> fields) {
|
TimelineEntity real, EnumSet<Field> fields) {
|
||||||
if (fields.contains(Field.ALL)) {
|
if (fields.contains(Field.ALL)) {
|
||||||
finalEntity.setConfigs(real.getConfigs());
|
fields = EnumSet.allOf(Field.class);
|
||||||
finalEntity.setMetrics(real.getMetrics());
|
|
||||||
finalEntity.setInfo(real.getInfo());
|
|
||||||
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
|
||||||
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
|
||||||
finalEntity.setEvents(real.getEvents());
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
for (Field field : fields) {
|
for (Field field : fields) {
|
||||||
switch(field) {
|
switch(field) {
|
||||||
case CONFIGS:
|
case CONFIGS:
|
||||||
finalEntity.setConfigs(real.getConfigs());
|
finalEntity.setConfigs(real.getConfigs());
|
||||||
break;
|
break;
|
||||||
case METRICS:
|
case METRICS:
|
||||||
finalEntity.setMetrics(real.getMetrics());
|
finalEntity.setMetrics(real.getMetrics());
|
||||||
break;
|
break;
|
||||||
case INFO:
|
case INFO:
|
||||||
finalEntity.setInfo(real.getInfo());
|
finalEntity.setInfo(real.getInfo());
|
||||||
break;
|
break;
|
||||||
case IS_RELATED_TO:
|
case IS_RELATED_TO:
|
||||||
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
||||||
break;
|
break;
|
||||||
case RELATES_TO:
|
case RELATES_TO:
|
||||||
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
|
||||||
break;
|
break;
|
||||||
case EVENTS:
|
case EVENTS:
|
||||||
finalEntity.setEvents(real.getEvents());
|
finalEntity.setEvents(real.getEvents());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean matchFilter(Object infoValue, Object filterValue) {
|
|
||||||
return infoValue.equals(filterValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
|
|
||||||
Map<String, ? extends Object> filters) {
|
|
||||||
if (entityInfo == null || entityInfo.isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
|
|
||||||
Object infoValue = entityInfo.get(filter.getKey());
|
|
||||||
if (infoValue == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!matchFilter(infoValue, filter.getValue())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getFlowRunPath(String userId, String clusterId, String flowId,
|
private String getFlowRunPath(String userId, String clusterId, String flowId,
|
||||||
Long flowRunId, String appId)
|
Long flowRunId, String appId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -186,10 +160,10 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
|
String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
|
||||||
clusterId + "/" + APP_FLOW_MAPPING_FILE;
|
clusterId + "/" + APP_FLOW_MAPPING_FILE;
|
||||||
try (BufferedReader reader =
|
try (BufferedReader reader =
|
||||||
new BufferedReader(new InputStreamReader(
|
new BufferedReader(new InputStreamReader(
|
||||||
new FileInputStream(
|
new FileInputStream(
|
||||||
appFlowMappingFile), Charset.forName("UTF-8")));
|
appFlowMappingFile), Charset.forName("UTF-8")));
|
||||||
CSVParser parser = new CSVParser(reader, csvFormat)) {
|
CSVParser parser = new CSVParser(reader, csvFormat)) {
|
||||||
for (CSVRecord record : parser.getRecords()) {
|
for (CSVRecord record : parser.getRecords()) {
|
||||||
if (record.size() < 4) {
|
if (record.size() < 4) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -207,36 +181,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
throw new IOException("Unable to get flow info");
|
throw new IOException("Unable to get flow info");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
|
|
||||||
Set<String> metricFilters) {
|
|
||||||
Set<String> tempMetrics = new HashSet<String>();
|
|
||||||
for (TimelineMetric metric : metrics) {
|
|
||||||
tempMetrics.add(metric.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String metricFilter : metricFilters) {
|
|
||||||
if (!tempMetrics.contains(metricFilter)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
|
|
||||||
Set<String> eventFilters) {
|
|
||||||
Set<String> tempEvents = new HashSet<String>();
|
|
||||||
for (TimelineEvent event : entityEvents) {
|
|
||||||
tempEvents.add(event.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String eventFilter : eventFilters) {
|
|
||||||
if (!tempEvents.contains(eventFilter)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
|
private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
|
||||||
EnumSet<Field> fieldsToRetrieve) {
|
EnumSet<Field> fieldsToRetrieve) {
|
||||||
TimelineEntity entityToBeReturned = new TimelineEntity();
|
TimelineEntity entityToBeReturned = new TimelineEntity();
|
||||||
|
@ -254,23 +198,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
return (time >= timeBegin) && (time <= timeEnd);
|
return (time >= timeBegin) && (time <= timeEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean matchRelations(
|
|
||||||
Map<String, Set<String>> entityRelations,
|
|
||||||
Map<String, Set<String>> relations) {
|
|
||||||
for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
|
|
||||||
Set<String> ids = entityRelations.get(relation.getKey());
|
|
||||||
if (ids == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (String id : relation.getValue()) {
|
|
||||||
if (!ids.contains(id)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void mergeEntities(TimelineEntity entity1,
|
private static void mergeEntities(TimelineEntity entity1,
|
||||||
TimelineEntity entity2) {
|
TimelineEntity entity2) {
|
||||||
// Ideally created time wont change except in the case of issue from client.
|
// Ideally created time wont change except in the case of issue from client.
|
||||||
|
@ -364,22 +291,22 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
// First sort the selected entities based on created/start time.
|
// First sort the selected entities based on created/start time.
|
||||||
Map<Long, Set<TimelineEntity>> sortedEntities =
|
Map<Long, Set<TimelineEntity>> sortedEntities =
|
||||||
new TreeMap<>(
|
new TreeMap<>(
|
||||||
new Comparator<Long>() {
|
new Comparator<Long>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(Long l1, Long l2) {
|
public int compare(Long l1, Long l2) {
|
||||||
return l2.compareTo(l1);
|
return l2.compareTo(l1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
);
|
);
|
||||||
for (File entityFile : dir.listFiles()) {
|
for (File entityFile : dir.listFiles()) {
|
||||||
if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
|
if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try (BufferedReader reader =
|
try (BufferedReader reader =
|
||||||
new BufferedReader(
|
new BufferedReader(
|
||||||
new InputStreamReader(
|
new InputStreamReader(
|
||||||
new FileInputStream(
|
new FileInputStream(
|
||||||
entityFile), Charset.forName("UTF-8")))) {
|
entityFile), Charset.forName("UTF-8")))) {
|
||||||
TimelineEntity entity = readEntityFromFile(reader);
|
TimelineEntity entity = readEntityFromFile(reader);
|
||||||
if (!entity.getType().equals(entityType)) {
|
if (!entity.getType().equals(entityType)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -393,27 +320,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (relatesTo != null && !relatesTo.isEmpty() &&
|
if (relatesTo != null && !relatesTo.isEmpty() &&
|
||||||
!matchRelations(entity.getRelatesToEntities(), relatesTo)) {
|
!TimelineReaderUtils
|
||||||
|
.matchRelations(entity.getRelatesToEntities(), relatesTo)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
|
if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
|
||||||
!matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
|
!TimelineReaderUtils
|
||||||
|
.matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (infoFilters != null && !infoFilters.isEmpty() &&
|
if (infoFilters != null && !infoFilters.isEmpty() &&
|
||||||
!matchFilters(entity.getInfo(), infoFilters)) {
|
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (configFilters != null && !configFilters.isEmpty() &&
|
if (configFilters != null && !configFilters.isEmpty() &&
|
||||||
!matchFilters(entity.getConfigs(), configFilters)) {
|
!TimelineReaderUtils.matchFilters(
|
||||||
|
entity.getConfigs(), configFilters)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (metricFilters != null && !metricFilters.isEmpty() &&
|
if (metricFilters != null && !metricFilters.isEmpty() &&
|
||||||
!matchMetricFilters(entity.getMetrics(), metricFilters)) {
|
!TimelineReaderUtils.matchMetricFilters(
|
||||||
|
entity.getMetrics(), metricFilters)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (eventFilters != null && !eventFilters.isEmpty() &&
|
if (eventFilters != null && !eventFilters.isEmpty() &&
|
||||||
!matchEventFilters(entity.getEvents(), eventFilters)) {
|
!TimelineReaderUtils.matchEventFilters(
|
||||||
|
entity.getEvents(), eventFilters)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
TimelineEntity entityToBeReturned =
|
TimelineEntity entityToBeReturned =
|
||||||
|
@ -461,8 +393,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
||||||
File entityFile =
|
File entityFile =
|
||||||
new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||||
try (BufferedReader reader =
|
try (BufferedReader reader =
|
||||||
new BufferedReader(new InputStreamReader(
|
new BufferedReader(new InputStreamReader(
|
||||||
new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
|
new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
|
||||||
TimelineEntity entity = readEntityFromFile(reader);
|
TimelineEntity entity = readEntityFromFile(reader);
|
||||||
return createEntityToBeReturned(entity, fieldsToRetrieve);
|
return createEntityToBeReturned(entity, fieldsToRetrieve);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,424 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
public class HBaseTimelineReaderImpl
|
||||||
|
extends AbstractService implements TimelineReader {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(HBaseTimelineReaderImpl.class);
|
||||||
|
private static final long DEFAULT_BEGIN_TIME = 0L;
|
||||||
|
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
|
||||||
|
|
||||||
|
private Configuration hbaseConf = null;
|
||||||
|
private Connection conn;
|
||||||
|
private EntityTable entityTable;
|
||||||
|
private AppToFlowTable appToFlowTable;
|
||||||
|
|
||||||
|
public HBaseTimelineReaderImpl() {
|
||||||
|
super(HBaseTimelineReaderImpl.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
super.serviceInit(conf);
|
||||||
|
hbaseConf = HBaseConfiguration.create(conf);
|
||||||
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
|
entityTable = new EntityTable();
|
||||||
|
appToFlowTable = new AppToFlowTable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
if (conn != null) {
|
||||||
|
LOG.info("closing the hbase Connection");
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineEntity getEntity(String userId, String clusterId,
|
||||||
|
String flowId, Long flowRunId, String appId, String entityType,
|
||||||
|
String entityId, EnumSet<Field> fieldsToRetrieve)
|
||||||
|
throws IOException {
|
||||||
|
validateParams(userId, clusterId, appId, entityType, entityId, true);
|
||||||
|
// In reality both should be null or neither should be null
|
||||||
|
if (flowId == null || flowRunId == null) {
|
||||||
|
FlowContext context = lookupFlowContext(clusterId, appId);
|
||||||
|
flowId = context.flowId;
|
||||||
|
flowRunId = context.flowRunId;
|
||||||
|
}
|
||||||
|
if (fieldsToRetrieve == null) {
|
||||||
|
fieldsToRetrieve = EnumSet.noneOf(Field.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] rowKey = EntityRowKey.getRowKey(
|
||||||
|
clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
|
||||||
|
Get get = new Get(rowKey);
|
||||||
|
get.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
return parseEntity(
|
||||||
|
entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
|
||||||
|
false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
|
||||||
|
DEFAULT_END_TIME, null, null, null, null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<TimelineEntity> getEntities(String userId, String clusterId,
|
||||||
|
String flowId, Long flowRunId, String appId, String entityType,
|
||||||
|
Long limit, Long createdTimeBegin, Long createdTimeEnd,
|
||||||
|
Long modifiedTimeBegin, Long modifiedTimeEnd,
|
||||||
|
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||||
|
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||||
|
Set<String> metricFilters, Set<String> eventFilters,
|
||||||
|
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||||
|
validateParams(userId, clusterId, appId, entityType, null, false);
|
||||||
|
// In reality both should be null or neither should be null
|
||||||
|
if (flowId == null || flowRunId == null) {
|
||||||
|
FlowContext context = lookupFlowContext(clusterId, appId);
|
||||||
|
flowId = context.flowId;
|
||||||
|
flowRunId = context.flowRunId;
|
||||||
|
}
|
||||||
|
if (limit == null) {
|
||||||
|
limit = TimelineReader.DEFAULT_LIMIT;
|
||||||
|
}
|
||||||
|
if (createdTimeBegin == null) {
|
||||||
|
createdTimeBegin = DEFAULT_BEGIN_TIME;
|
||||||
|
}
|
||||||
|
if (createdTimeEnd == null) {
|
||||||
|
createdTimeEnd = DEFAULT_END_TIME;
|
||||||
|
}
|
||||||
|
if (modifiedTimeBegin == null) {
|
||||||
|
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
|
||||||
|
}
|
||||||
|
if (modifiedTimeEnd == null) {
|
||||||
|
modifiedTimeEnd = DEFAULT_END_TIME;
|
||||||
|
}
|
||||||
|
if (fieldsToRetrieve == null) {
|
||||||
|
fieldsToRetrieve = EnumSet.noneOf(Field.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
||||||
|
// Scan through part of the table to find the entities belong to one app and
|
||||||
|
// one type
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
|
||||||
|
clusterId, userId, flowId, flowRunId, appId, entityType));
|
||||||
|
scan.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
|
||||||
|
for (Result result : scanner) {
|
||||||
|
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
|
||||||
|
true, createdTimeBegin, createdTimeEnd,
|
||||||
|
true, modifiedTimeBegin, modifiedTimeEnd,
|
||||||
|
isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
|
||||||
|
metricFilters);
|
||||||
|
if (entity == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (entities.size() > limit) {
|
||||||
|
entities.pollLast();
|
||||||
|
}
|
||||||
|
entities.add(entity);
|
||||||
|
}
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FlowContext lookupFlowContext(String clusterId, String appId)
|
||||||
|
throws IOException {
|
||||||
|
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
|
||||||
|
Get get = new Get(rowKey);
|
||||||
|
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
|
||||||
|
if (result != null && !result.isEmpty()) {
|
||||||
|
return new FlowContext(
|
||||||
|
AppToFlowColumn.FLOW_ID.readResult(result).toString(),
|
||||||
|
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
|
||||||
|
} else {
|
||||||
|
throw new IOException(
|
||||||
|
"Unable to find the context flow ID and flow run ID for clusterId=" +
|
||||||
|
clusterId + ", appId=" + appId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FlowContext {
|
||||||
|
private String flowId;
|
||||||
|
private Long flowRunId;
|
||||||
|
public FlowContext(String flowId, Long flowRunId) {
|
||||||
|
this.flowId = flowId;
|
||||||
|
this.flowRunId = flowRunId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void validateParams(String userId, String clusterId,
|
||||||
|
String appId, String entityType, String entityId, boolean checkEntityId) {
|
||||||
|
Preconditions.checkNotNull(userId, "userId shouldn't be null");
|
||||||
|
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
|
||||||
|
Preconditions.checkNotNull(appId, "appId shouldn't be null");
|
||||||
|
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
|
||||||
|
if (checkEntityId) {
|
||||||
|
Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimelineEntity parseEntity(
|
||||||
|
Result result, EnumSet<Field> fieldsToRetrieve,
|
||||||
|
boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
|
||||||
|
boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
|
||||||
|
Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
|
||||||
|
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||||
|
Set<String> eventFilters, Set<String> metricFilters)
|
||||||
|
throws IOException {
|
||||||
|
if (result == null || result.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setType(EntityColumn.TYPE.readResult(result).toString());
|
||||||
|
entity.setId(EntityColumn.ID.readResult(result).toString());
|
||||||
|
|
||||||
|
// fetch created time
|
||||||
|
entity.setCreatedTime(
|
||||||
|
((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
|
||||||
|
if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
|
||||||
|
entity.getCreatedTime() > createdTimeEnd)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch modified time
|
||||||
|
entity.setCreatedTime(
|
||||||
|
((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
|
||||||
|
if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
|
||||||
|
entity.getModifiedTime() > modifiedTimeEnd)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch is related to entities
|
||||||
|
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
|
||||||
|
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
|
||||||
|
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
|
||||||
|
entity.getIsRelatedToEntities(), isRelatedTo)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
|
||||||
|
entity.getIsRelatedToEntities().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch relates to entities
|
||||||
|
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
|
||||||
|
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
|
||||||
|
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
|
||||||
|
entity.getRelatesToEntities(), relatesTo)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.RELATES_TO)) {
|
||||||
|
entity.getRelatesToEntities().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch info
|
||||||
|
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
|
||||||
|
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
|
||||||
|
if (checkInfo &&
|
||||||
|
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.INFO)) {
|
||||||
|
entity.getInfo().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch configs
|
||||||
|
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
|
||||||
|
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
|
||||||
|
if (checkConfigs && !TimelineReaderUtils.matchFilters(
|
||||||
|
entity.getConfigs(), configFilters)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.CONFIGS)) {
|
||||||
|
entity.getConfigs().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch events
|
||||||
|
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
|
||||||
|
readEvents(entity, result);
|
||||||
|
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
|
||||||
|
entity.getEvents(), eventFilters)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.EVENTS)) {
|
||||||
|
entity.getEvents().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch metrics
|
||||||
|
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
|
||||||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
|
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
|
||||||
|
readMetrics(entity, result);
|
||||||
|
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
|
||||||
|
entity.getMetrics(), metricFilters)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!fieldsToRetrieve.contains(Field.ALL) &&
|
||||||
|
!fieldsToRetrieve.contains(Field.METRICS)) {
|
||||||
|
entity.getMetrics().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readRelationship(
|
||||||
|
TimelineEntity entity, Result result, EntityColumnPrefix prefix)
|
||||||
|
throws IOException {
|
||||||
|
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
|
||||||
|
Map<String, Object> columns = prefix.readResults(result);
|
||||||
|
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
||||||
|
for (String id : Separator.VALUES.splitEncoded(
|
||||||
|
column.getValue().toString())) {
|
||||||
|
if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
|
||||||
|
entity.addIsRelatedToEntity(column.getKey(), id);
|
||||||
|
} else {
|
||||||
|
entity.addRelatesToEntity(column.getKey(), id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readKeyValuePairs(
|
||||||
|
TimelineEntity entity, Result result, EntityColumnPrefix prefix)
|
||||||
|
throws IOException {
|
||||||
|
// info and configuration are of type Map<String, Object or String>
|
||||||
|
Map<String, Object> columns = prefix.readResults(result);
|
||||||
|
if (prefix.equals(EntityColumnPrefix.CONFIG)) {
|
||||||
|
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
||||||
|
entity.addConfig(column.getKey(), column.getKey().toString());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entity.addInfo(columns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readEvents(TimelineEntity entity, Result result)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, TimelineEvent> eventsMap = new HashMap<>();
|
||||||
|
Map<String, Object> eventsResult =
|
||||||
|
EntityColumnPrefix.EVENT.readResults(result);
|
||||||
|
for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
|
||||||
|
Collection<String> tokens =
|
||||||
|
Separator.VALUES.splitEncoded(eventResult.getKey());
|
||||||
|
if (tokens.size() != 2 && tokens.size() != 3) {
|
||||||
|
throw new IOException(
|
||||||
|
"Invalid event column name: " + eventResult.getKey());
|
||||||
|
}
|
||||||
|
Iterator<String> idItr = tokens.iterator();
|
||||||
|
String id = idItr.next();
|
||||||
|
String tsStr = idItr.next();
|
||||||
|
// TODO: timestamp is not correct via ser/des through UTF-8 string
|
||||||
|
Long ts =
|
||||||
|
TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
|
||||||
|
StandardCharsets.UTF_8)));
|
||||||
|
String key = Separator.VALUES.joinEncoded(id, ts.toString());
|
||||||
|
TimelineEvent event = eventsMap.get(key);
|
||||||
|
if (event == null) {
|
||||||
|
event = new TimelineEvent();
|
||||||
|
event.setId(id);
|
||||||
|
event.setTimestamp(ts);
|
||||||
|
eventsMap.put(key, event);
|
||||||
|
}
|
||||||
|
if (tokens.size() == 3) {
|
||||||
|
String infoKey = idItr.next();
|
||||||
|
event.addInfo(infoKey, eventResult.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
|
||||||
|
entity.addEvents(eventsSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void readMetrics(TimelineEntity entity, Result result)
|
||||||
|
throws IOException {
|
||||||
|
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
|
||||||
|
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
|
||||||
|
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
|
||||||
|
metricsResult.entrySet()) {
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setId(metricResult.getKey());
|
||||||
|
// Simply assume that if the value set contains more than 1 elements, the
|
||||||
|
// metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
|
||||||
|
metric.setType(metricResult.getValue().size() > 1 ?
|
||||||
|
TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
|
||||||
|
metric.addValues(metricResult.getValue());
|
||||||
|
entity.addMetric(metric);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,9 +33,14 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||||
|
@ -55,6 +60,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
|
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
private TypedBufferedMutator<EntityTable> entityTable;
|
private TypedBufferedMutator<EntityTable> entityTable;
|
||||||
|
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(HBaseTimelineWriterImpl.class);
|
.getLog(HBaseTimelineWriterImpl.class);
|
||||||
|
@ -77,6 +83,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
Configuration hbaseConf = HBaseConfiguration.create(conf);
|
Configuration hbaseConf = HBaseConfiguration.create(conf);
|
||||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
|
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
|
||||||
|
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,7 +104,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
|
|
||||||
byte[] rowKey =
|
byte[] rowKey =
|
||||||
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
|
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||||
te);
|
te.getType(), te.getId());
|
||||||
|
|
||||||
storeInfo(rowKey, te, flowVersion);
|
storeInfo(rowKey, te, flowVersion);
|
||||||
storeEvents(rowKey, te.getEvents());
|
storeEvents(rowKey, te.getEvents());
|
||||||
|
@ -107,11 +114,37 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
EntityColumnPrefix.IS_RELATED_TO);
|
EntityColumnPrefix.IS_RELATED_TO);
|
||||||
storeRelations(rowKey, te.getRelatesToEntities(),
|
storeRelations(rowKey, te.getRelatesToEntities(),
|
||||||
EntityColumnPrefix.RELATES_TO);
|
EntityColumnPrefix.RELATES_TO);
|
||||||
}
|
|
||||||
|
|
||||||
|
if (isApplicationCreated(te)) {
|
||||||
|
onApplicationCreated(
|
||||||
|
clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
|
||||||
|
}
|
||||||
|
}
|
||||||
return putStatus;
|
return putStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isApplicationCreated(TimelineEntity te) {
|
||||||
|
if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
|
||||||
|
boolean isAppCreated = false;
|
||||||
|
for (TimelineEvent event : te.getEvents()) {
|
||||||
|
if (event.getId().equals(
|
||||||
|
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onApplicationCreated(String clusterId, String userId,
|
||||||
|
String flowName, String flowVersion, long flowRunId, String appId,
|
||||||
|
TimelineEntity te) throws IOException {
|
||||||
|
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
|
||||||
|
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
|
||||||
|
AppToFlowColumn.FLOW_RUN_ID.store(
|
||||||
|
rowKey, appToFlowTable, null, flowRunId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores the Relations from the {@linkplain TimelineEntity} object
|
* Stores the Relations from the {@linkplain TimelineEntity} object
|
||||||
*/
|
*/
|
||||||
|
@ -245,6 +278,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
// flush all buffered mutators
|
// flush all buffered mutators
|
||||||
entityTable.flush();
|
entityTable.flush();
|
||||||
|
appToFlowTable.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -258,6 +292,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
// The close API performs flushing and releases any resources held
|
// The close API performs flushing and releases any resources held
|
||||||
entityTable.close();
|
entityTable.close();
|
||||||
}
|
}
|
||||||
|
if (appToFlowTable != null) {
|
||||||
|
LOG.info("closing app_flow table");
|
||||||
|
// The close API performs flushing and releases any resources held
|
||||||
|
appToFlowTable.close();
|
||||||
|
}
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
LOG.info("closing the hbase Connection");
|
LOG.info("closing the hbase Connection");
|
||||||
conn.close();
|
conn.close();
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,6 +71,11 @@ public class TimelineSchemaCreator {
|
||||||
int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
|
int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
|
||||||
new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
|
new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
|
||||||
}
|
}
|
||||||
|
// Grab the appToflowTableName argument
|
||||||
|
String appToflowTableName = commandLine.getOptionValue("a2f");
|
||||||
|
if (StringUtils.isNotBlank(appToflowTableName)) {
|
||||||
|
hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
|
||||||
|
}
|
||||||
createAllTables(hbaseConf);
|
createAllTables(hbaseConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +101,11 @@ public class TimelineSchemaCreator {
|
||||||
o.setRequired(false);
|
o.setRequired(false);
|
||||||
options.addOption(o);
|
options.addOption(o);
|
||||||
|
|
||||||
|
o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
|
||||||
|
o.setArgName("appToflowTableName");
|
||||||
|
o.setRequired(false);
|
||||||
|
options.addOption(o);
|
||||||
|
|
||||||
CommandLineParser parser = new PosixParser();
|
CommandLineParser parser = new PosixParser();
|
||||||
CommandLine commandLine = null;
|
CommandLine commandLine = null;
|
||||||
try {
|
try {
|
||||||
|
@ -120,6 +131,7 @@ public class TimelineSchemaCreator {
|
||||||
throw new IOException("Cannot create table since admin is null");
|
throw new IOException("Cannot create table since admin is null");
|
||||||
}
|
}
|
||||||
new EntityTable().createTable(admin, hbaseConf);
|
new EntityTable().createTable(admin, hbaseConf);
|
||||||
|
new AppToFlowTable().createTable(admin, hbaseConf);
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
conn.close();
|
conn.close();
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identifies fully qualified columns for the {@link AppToFlowTable}.
|
||||||
|
*/
|
||||||
|
public enum AppToFlowColumn implements Column<AppToFlowTable> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The flow ID
|
||||||
|
*/
|
||||||
|
FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The flow run ID
|
||||||
|
*/
|
||||||
|
FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id");
|
||||||
|
|
||||||
|
private final ColumnHelper<AppToFlowTable> column;
|
||||||
|
private final ColumnFamily<AppToFlowTable> columnFamily;
|
||||||
|
private final String columnQualifier;
|
||||||
|
private final byte[] columnQualifierBytes;
|
||||||
|
|
||||||
|
AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
|
||||||
|
String columnQualifier) {
|
||||||
|
this.columnFamily = columnFamily;
|
||||||
|
this.columnQualifier = columnQualifier;
|
||||||
|
// Future-proof by ensuring the right column prefix hygiene.
|
||||||
|
this.columnQualifierBytes =
|
||||||
|
Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
|
||||||
|
this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the column name value
|
||||||
|
*/
|
||||||
|
private String getColumnQualifier() {
|
||||||
|
return columnQualifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void store(byte[] rowKey,
|
||||||
|
TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
|
||||||
|
Object inputValue) throws IOException {
|
||||||
|
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
|
||||||
|
inputValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object readResult(Result result) throws IOException {
|
||||||
|
return column.readResult(result, columnQualifierBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
|
||||||
|
* match. The following holds true: {@code columnFor(x) == columnFor(y)} if
|
||||||
|
* and only if {@code x.equals(y)} or {@code (x == y == null)}
|
||||||
|
*
|
||||||
|
* @param columnQualifier Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link AppToFlowColumn} or null
|
||||||
|
*/
|
||||||
|
public static final AppToFlowColumn columnFor(String columnQualifier) {
|
||||||
|
|
||||||
|
// Match column based on value, assume column family matches.
|
||||||
|
for (AppToFlowColumn ec : AppToFlowColumn.values()) {
|
||||||
|
// Find a match based only on name.
|
||||||
|
if (ec.getColumnQualifier().equals(columnQualifier)) {
|
||||||
|
return ec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
|
||||||
|
* match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
|
||||||
|
* if and only if {@code a.equals(b) & x.equals(y)} or
|
||||||
|
* {@code (x == y == null)}
|
||||||
|
*
|
||||||
|
* @param columnFamily The columnFamily for which to retrieve the column.
|
||||||
|
* @param name Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link AppToFlowColumn} or null if both arguments
|
||||||
|
* don't match.
|
||||||
|
*/
|
||||||
|
public static final AppToFlowColumn columnFor(
|
||||||
|
AppToFlowColumnFamily columnFamily, String name) {
|
||||||
|
|
||||||
|
for (AppToFlowColumn ec : AppToFlowColumn.values()) {
|
||||||
|
// Find a match based column family and on name.
|
||||||
|
if (ec.columnFamily.equals(columnFamily)
|
||||||
|
&& ec.getColumnQualifier().equals(name)) {
|
||||||
|
return ec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the app_flow table column families.
|
||||||
|
*/
|
||||||
|
public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
|
||||||
|
/**
|
||||||
|
* Mapping column family houses known columns such as flowId and flowRunId
|
||||||
|
*/
|
||||||
|
MAPPING("m");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Byte representation of this column family.
|
||||||
|
*/
|
||||||
|
private final byte[] bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param value create a column family with this name. Must be lower case and
|
||||||
|
* without spaces.
|
||||||
|
*/
|
||||||
|
AppToFlowColumnFamily(String value) {
|
||||||
|
// column families should be lower case and not contain any spaces.
|
||||||
|
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getBytes() {
|
||||||
|
return Bytes.copy(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a rowkey for the app_flow table.
|
||||||
|
*/
|
||||||
|
public class AppToFlowRowKey {
|
||||||
|
/**
|
||||||
|
* Constructs a row key prefix for the app_flow table as follows:
|
||||||
|
* {@code clusterId!AppId}
|
||||||
|
*
|
||||||
|
* @param clusterId
|
||||||
|
* @param appId
|
||||||
|
* @return byte array with the row key
|
||||||
|
*/
|
||||||
|
public static byte[] getRowKey(String clusterId, String appId) {
|
||||||
|
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The app_flow table as column families mapping. Mapping stores
|
||||||
|
* appId to flowId and flowRunId mapping information
|
||||||
|
*
|
||||||
|
* Example app_flow table record:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* |--------------------------------------|
|
||||||
|
* | Row | Column Family |
|
||||||
|
* | key | info |
|
||||||
|
* |--------------------------------------|
|
||||||
|
* | clusterId! | flowId: |
|
||||||
|
* | AppId | foo@daily_hive_report |
|
||||||
|
* | | |
|
||||||
|
* | | flowRunId: |
|
||||||
|
* | | 1452828720457 |
|
||||||
|
* | | |
|
||||||
|
* | | |
|
||||||
|
* | | |
|
||||||
|
* |--------------------------------------|
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
public class AppToFlowTable extends BaseTable<AppToFlowTable> {
|
||||||
|
/** app_flow prefix */
|
||||||
|
private static final String PREFIX =
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
|
||||||
|
|
||||||
|
/** config param name that specifies the app_flow table name */
|
||||||
|
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
|
||||||
|
|
||||||
|
/** default value for app_flow table name */
|
||||||
|
private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);
|
||||||
|
|
||||||
|
public AppToFlowTable() {
|
||||||
|
super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
|
||||||
|
* (org.apache.hadoop.hbase.client.Admin,
|
||||||
|
* org.apache.hadoop.conf.Configuration)
|
||||||
|
*/
|
||||||
|
public void createTable(Admin admin, Configuration hbaseConf)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
TableName table = getTableName(hbaseConf);
|
||||||
|
if (admin.tableExists(table)) {
|
||||||
|
// do not disable / delete existing table
|
||||||
|
// similar to the approach taken by map-reduce jobs when
|
||||||
|
// output directory exists
|
||||||
|
throw new IOException("Table " + table.getNameAsString()
|
||||||
|
+ " already exists.");
|
||||||
|
}
|
||||||
|
|
||||||
|
HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
|
||||||
|
HColumnDescriptor mappCF =
|
||||||
|
new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
|
||||||
|
mappCF.setBloomFilterType(BloomType.ROWCOL);
|
||||||
|
appToFlowTableDescp.addFamily(mappCF);
|
||||||
|
|
||||||
|
appToFlowTableDescp
|
||||||
|
.setRegionSplitPolicyClassName(
|
||||||
|
"org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
|
||||||
|
appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
|
||||||
|
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
|
||||||
|
admin.createTable(appToFlowTableDescp,
|
||||||
|
TimelineHBaseSchemaConstants.getUsernameSplits());
|
||||||
|
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||||
|
+ admin.tableExists(table));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -93,6 +95,20 @@ public abstract class BaseTable<T> {
|
||||||
return table.getScanner(scan);
|
return table.getScanner(scan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param hbaseConf used to read settings that override defaults
|
||||||
|
* @param conn used to create table from
|
||||||
|
* @param get that specifies what single row you want to get from this table
|
||||||
|
* @return result of get operation
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Result getResult(Configuration hbaseConf, Connection conn, Get get)
|
||||||
|
throws IOException {
|
||||||
|
Table table = conn.getTable(getTableName(hbaseConf));
|
||||||
|
return table.get(get);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the table name for this table.
|
* Get the table name for this table.
|
||||||
*
|
*
|
||||||
|
|
|
@ -64,7 +64,7 @@ public interface ColumnPrefix<T> {
|
||||||
public Object readResult(Result result, String qualifier) throws IOException;
|
public Object readResult(Result result, String qualifier) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param resultfrom which to read columns
|
* @param result from which to read columns
|
||||||
* @return the latest values of columns in the column family with this prefix
|
* @return the latest values of columns in the column family with this prefix
|
||||||
* (or all of them if the prefix value is null).
|
* (or all of them if the prefix value is null).
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class TimelineEntitySchemaConstants {
|
public class TimelineHBaseSchemaConstants {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to create a pre-split for tables starting with a username in the
|
* Used to create a pre-split for tables starting with a username in the
|
|
@ -0,0 +1,112 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
|
public class TimelineReaderUtils {
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param entityRelations the relations of an entity
|
||||||
|
* @param relationFilters the relations for filtering
|
||||||
|
* @return a boolean flag to indicate if both match
|
||||||
|
*/
|
||||||
|
public static boolean matchRelations(
|
||||||
|
Map<String, Set<String>> entityRelations,
|
||||||
|
Map<String, Set<String>> relationFilters) {
|
||||||
|
for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
|
||||||
|
Set<String> ids = entityRelations.get(relation.getKey());
|
||||||
|
if (ids == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (String id : relation.getValue()) {
|
||||||
|
if (!ids.contains(id)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param map the map of key/value pairs in an entity
|
||||||
|
* @param filters the map of key/value pairs for filtering
|
||||||
|
* @return a boolean flag to indicate if both match
|
||||||
|
*/
|
||||||
|
public static boolean matchFilters(Map<String, ? extends Object> map,
|
||||||
|
Map<String, ? extends Object> filters) {
|
||||||
|
for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
|
||||||
|
Object value = map.get(filter.getKey());
|
||||||
|
if (value == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!value.equals(filter.getValue())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param entityEvents the set of event objects in an entity
|
||||||
|
* @param eventFilters the set of event Ids for filtering
|
||||||
|
* @return a boolean flag to indicate if both match
|
||||||
|
*/
|
||||||
|
public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
|
||||||
|
Set<String> eventFilters) {
|
||||||
|
Set<String> eventIds = new HashSet<String>();
|
||||||
|
for (TimelineEvent event : entityEvents) {
|
||||||
|
eventIds.add(event.getId());
|
||||||
|
}
|
||||||
|
for (String eventFilter : eventFilters) {
|
||||||
|
if (!eventIds.contains(eventFilter)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param metrics the set of metric objects in an entity
|
||||||
|
* @param metricFilters the set of metric Ids for filtering
|
||||||
|
* @return a boolean flag to indicate if both match
|
||||||
|
*/
|
||||||
|
public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
|
||||||
|
Set<String> metricFilters) {
|
||||||
|
Set<String> metricIds = new HashSet<String>();
|
||||||
|
for (TimelineMetric metric : metrics) {
|
||||||
|
metricIds.add(metric.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String metricFilter : metricFilters) {
|
||||||
|
if (!metricIds.contains(metricFilter)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -62,7 +62,7 @@ public enum EntityColumn implements Column<EntityTable> {
|
||||||
private final String columnQualifier;
|
private final String columnQualifier;
|
||||||
private final byte[] columnQualifierBytes;
|
private final byte[] columnQualifierBytes;
|
||||||
|
|
||||||
private EntityColumn(ColumnFamily<EntityTable> columnFamily,
|
EntityColumn(ColumnFamily<EntityTable> columnFamily,
|
||||||
String columnQualifier) {
|
String columnQualifier) {
|
||||||
this.columnFamily = columnFamily;
|
this.columnFamily = columnFamily;
|
||||||
this.columnQualifier = columnQualifier;
|
this.columnQualifier = columnQualifier;
|
||||||
|
|
|
@ -53,7 +53,7 @@ public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
|
||||||
* @param value create a column family with this name. Must be lower case and
|
* @param value create a column family with this name. Must be lower case and
|
||||||
* without spaces.
|
* without spaces.
|
||||||
*/
|
*/
|
||||||
private EntityColumnFamily(String value) {
|
EntityColumnFamily(String value) {
|
||||||
// column families should be lower case and not contain any spaces.
|
// column families should be lower case and not contain any spaces.
|
||||||
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
|
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
|
||||||
* @param columnFamily that this column is stored in.
|
* @param columnFamily that this column is stored in.
|
||||||
* @param columnPrefix for this column.
|
* @param columnPrefix for this column.
|
||||||
*/
|
*/
|
||||||
private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
|
EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
|
||||||
String columnPrefix) {
|
String columnPrefix) {
|
||||||
column = new ColumnHelper<EntityTable>(columnFamily);
|
column = new ColumnHelper<EntityTable>(columnFamily);
|
||||||
this.columnFamily = columnFamily;
|
this.columnFamily = columnFamily;
|
||||||
|
|
|
@ -55,17 +55,18 @@ public class EntityRowKey {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a row key prefix for the entity table as follows:
|
* Constructs a row key prefix for the entity table as follows:
|
||||||
* {@code userName!clusterId!flowId!flowRunId!AppId}
|
* {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
|
||||||
*
|
*
|
||||||
* @param clusterId
|
* @param clusterId
|
||||||
* @param userId
|
* @param userId
|
||||||
* @param flowId
|
* @param flowId
|
||||||
* @param flowRunId
|
* @param flowRunId
|
||||||
* @param appId
|
* @param appId
|
||||||
|
* @param entityType
|
||||||
* @return byte array with the row key prefix
|
* @return byte array with the row key prefix
|
||||||
*/
|
*/
|
||||||
public static byte[] getRowKey(String clusterId, String userId,
|
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||||
String flowId, Long flowRunId, String appId, TimelineEntity te) {
|
String flowId, Long flowRunId, String appId, String entityType) {
|
||||||
byte[] first =
|
byte[] first =
|
||||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
||||||
flowId));
|
flowId));
|
||||||
|
@ -73,8 +74,35 @@ public class EntityRowKey {
|
||||||
// time.
|
// time.
|
||||||
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
|
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
|
||||||
byte[] third =
|
byte[] third =
|
||||||
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
|
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
|
||||||
te.getId()));
|
return Separator.QUALIFIERS.join(first, second, third);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a row key for the entity table as follows:
|
||||||
|
* {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
|
||||||
|
*
|
||||||
|
* @param clusterId
|
||||||
|
* @param userId
|
||||||
|
* @param flowId
|
||||||
|
* @param flowRunId
|
||||||
|
* @param appId
|
||||||
|
* @param entityType
|
||||||
|
* @param entityId
|
||||||
|
* @return byte array with the row key
|
||||||
|
*/
|
||||||
|
public static byte[] getRowKey(String clusterId, String userId,
|
||||||
|
String flowId, Long flowRunId, String appId, String entityType,
|
||||||
|
String entityId) {
|
||||||
|
byte[] first =
|
||||||
|
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
|
||||||
|
flowId));
|
||||||
|
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||||
|
// time.
|
||||||
|
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
|
||||||
|
byte[] third =
|
||||||
|
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
|
||||||
|
entityId));
|
||||||
return Separator.QUALIFIERS.join(first, second, third);
|
return Separator.QUALIFIERS.join(first, second, third);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The entity table as column families info, config and metrics. Info stores
|
* The entity table as column families info, config and metrics. Info stores
|
||||||
|
@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
|
||||||
public class EntityTable extends BaseTable<EntityTable> {
|
public class EntityTable extends BaseTable<EntityTable> {
|
||||||
/** entity prefix */
|
/** entity prefix */
|
||||||
private static final String PREFIX =
|
private static final String PREFIX =
|
||||||
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
|
||||||
|
|
||||||
/** config param name that specifies the entity table name */
|
/** config param name that specifies the entity table name */
|
||||||
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
|
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
|
||||||
|
@ -146,9 +146,9 @@ public class EntityTable extends BaseTable<EntityTable> {
|
||||||
entityTableDescp
|
entityTableDescp
|
||||||
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
|
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
|
||||||
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
|
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
|
||||||
TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
|
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
|
||||||
admin.createTable(entityTableDescp,
|
admin.createTable(entityTableDescp,
|
||||||
TimelineEntitySchemaConstants.getUsernameSplits());
|
TimelineHBaseSchemaConstants.getUsernameSplits());
|
||||||
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||||
+ admin.tableExists(table));
|
+ admin.tableExists(table));
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -38,11 +39,15 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
||||||
|
@ -71,6 +76,8 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
private static void createSchema() throws IOException {
|
private static void createSchema() throws IOException {
|
||||||
new EntityTable()
|
new EntityTable()
|
||||||
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
||||||
|
new AppToFlowTable()
|
||||||
|
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -138,10 +145,15 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
te.addEntity(entity);
|
te.addEntity(entity);
|
||||||
|
|
||||||
HBaseTimelineWriterImpl hbi = null;
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
try {
|
try {
|
||||||
Configuration c1 = util.getConfiguration();
|
Configuration c1 = util.getConfiguration();
|
||||||
hbi = new HBaseTimelineWriterImpl(c1);
|
hbi = new HBaseTimelineWriterImpl(c1);
|
||||||
hbi.init(c1);
|
hbi.init(c1);
|
||||||
|
hbi.start();
|
||||||
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
|
hbr.init(c1);
|
||||||
|
hbr.start();
|
||||||
String cluster = "cluster1";
|
String cluster = "cluster1";
|
||||||
String user = "user1";
|
String user = "user1";
|
||||||
String flow = "some_flow_name";
|
String flow = "some_flow_name";
|
||||||
|
@ -255,9 +267,22 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
assertEquals(1, rowCount);
|
assertEquals(1, rowCount);
|
||||||
assertEquals(17, colCount);
|
assertEquals(17, colCount);
|
||||||
|
|
||||||
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
|
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
assertNotNull(e1);
|
||||||
|
assertEquals(1, es1.size());
|
||||||
} finally {
|
} finally {
|
||||||
hbi.stop();
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.stop();
|
||||||
|
hbi.close();
|
||||||
|
}
|
||||||
|
if (hbr != null) {
|
||||||
|
hbr.stop();
|
||||||
|
hbr.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Somewhat of a hack, not a separate test in order not to have to deal with
|
// Somewhat of a hack, not a separate test in order not to have to deal with
|
||||||
|
@ -283,7 +308,7 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
|
|
||||||
private void testAdditionalEntity() throws IOException {
|
private void testAdditionalEntity() throws IOException {
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
String eventId = "foo_event_id";
|
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
|
||||||
event.setId(eventId);
|
event.setId(eventId);
|
||||||
Long expTs = 1436512802000L;
|
Long expTs = 1436512802000L;
|
||||||
event.setTimestamp(expTs);
|
event.setTimestamp(expTs);
|
||||||
|
@ -291,19 +316,23 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
Object expVal = "test";
|
Object expVal = "test";
|
||||||
event.addInfo(expKey, expVal);
|
event.addInfo(expKey, expVal);
|
||||||
|
|
||||||
final TimelineEntity entity = new TimelineEntity();
|
final TimelineEntity entity = new ApplicationEntity();
|
||||||
entity.setId("attempt_1329348432655_0001_m_000008_18");
|
entity.setId(ApplicationId.newInstance(0, 1).toString());
|
||||||
entity.setType("FOO_ATTEMPT");
|
|
||||||
entity.addEvent(event);
|
entity.addEvent(event);
|
||||||
|
|
||||||
TimelineEntities entities = new TimelineEntities();
|
TimelineEntities entities = new TimelineEntities();
|
||||||
entities.addEntity(entity);
|
entities.addEntity(entity);
|
||||||
|
|
||||||
HBaseTimelineWriterImpl hbi = null;
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
try {
|
try {
|
||||||
Configuration c1 = util.getConfiguration();
|
Configuration c1 = util.getConfiguration();
|
||||||
hbi = new HBaseTimelineWriterImpl(c1);
|
hbi = new HBaseTimelineWriterImpl(c1);
|
||||||
hbi.init(c1);
|
hbi.init(c1);
|
||||||
|
hbi.start();
|
||||||
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
|
hbr.init(c1);
|
||||||
|
hbr.start();
|
||||||
String cluster = "cluster2";
|
String cluster = "cluster2";
|
||||||
String user = "user2";
|
String user = "user2";
|
||||||
String flow = "other_flow_name";
|
String flow = "other_flow_name";
|
||||||
|
@ -352,9 +381,31 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
}
|
}
|
||||||
assertEquals(1, rowCount);
|
assertEquals(1, rowCount);
|
||||||
|
|
||||||
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
|
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
||||||
|
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
|
||||||
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
assertNotNull(e1);
|
||||||
|
assertNotNull(e2);
|
||||||
|
assertEquals(e1, e2);
|
||||||
|
assertEquals(1, es1.size());
|
||||||
|
assertEquals(1, es2.size());
|
||||||
|
assertEquals(es1, es2);
|
||||||
} finally {
|
} finally {
|
||||||
hbi.stop();
|
if (hbi != null) {
|
||||||
hbi.close();
|
hbi.stop();
|
||||||
|
hbi.close();
|
||||||
|
}
|
||||||
|
if (hbr != null) {
|
||||||
|
hbr.stop();
|
||||||
|
hbr.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,10 +426,15 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
entities.addEntity(entity);
|
entities.addEntity(entity);
|
||||||
|
|
||||||
HBaseTimelineWriterImpl hbi = null;
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
try {
|
try {
|
||||||
Configuration c1 = util.getConfiguration();
|
Configuration c1 = util.getConfiguration();
|
||||||
hbi = new HBaseTimelineWriterImpl(c1);
|
hbi = new HBaseTimelineWriterImpl(c1);
|
||||||
hbi.init(c1);
|
hbi.init(c1);
|
||||||
|
hbi.start();
|
||||||
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
|
hbr.init(c1);
|
||||||
|
hbr.start();
|
||||||
String cluster = "cluster_emptyeventkey";
|
String cluster = "cluster_emptyeventkey";
|
||||||
String user = "user_emptyeventkey";
|
String user = "user_emptyeventkey";
|
||||||
String flow = "other_flow_name";
|
String flow = "other_flow_name";
|
||||||
|
@ -430,13 +486,21 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
}
|
}
|
||||||
assertEquals(1, rowCount);
|
assertEquals(1, rowCount);
|
||||||
|
|
||||||
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
|
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
assertNotNull(e1);
|
||||||
|
assertEquals(1, es1.size());
|
||||||
} finally {
|
} finally {
|
||||||
hbi.stop();
|
hbi.stop();
|
||||||
hbi.close();
|
hbi.close();
|
||||||
|
hbr.stop();;
|
||||||
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
util.shutdownMiniCluster();
|
util.shutdownMiniCluster();
|
||||||
|
|
Loading…
Reference in New Issue