YARN-3411. [Storage implementation] explore the native HBase write schema for storage (Vrushali C via sjlee)
(cherry picked from commit 7a3068854d27eadae1c57545988f5b2029bf119a)
This commit is contained in:
parent
d275677e24
commit
5a4278ccbd
|
@ -59,6 +59,13 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
super.serviceStart();
|
||||||
|
if (writer != null) {
|
||||||
|
writer.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// access to this map is synchronized with the map itself
|
// access to this map is synchronized with the map itself
|
||||||
private final Map<ApplicationId, TimelineCollector> collectors =
|
private final Map<ApplicationId, TimelineCollector> collectors =
|
||||||
|
@ -147,4 +154,16 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
||||||
return collectors.containsKey(appId);
|
return collectors.containsKey(appId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
if (collectors != null && collectors.size() > 1) {
|
||||||
|
for (TimelineCollector c : collectors.values()) {
|
||||||
|
c.serviceStop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (writer != null) {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains the Info Column Family details like Column names, types and byte
|
||||||
|
* representations for
|
||||||
|
* {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||||
|
* object that is stored in hbase Also has utility functions for storing each of
|
||||||
|
* these to the backend
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
enum EntityColumnDetails {
|
||||||
|
ID(EntityColumnFamily.INFO, "id"),
|
||||||
|
TYPE(EntityColumnFamily.INFO, "type"),
|
||||||
|
CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
|
||||||
|
MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
|
||||||
|
FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"),
|
||||||
|
PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"),
|
||||||
|
PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"),
|
||||||
|
PREFIX_EVENTS(EntityColumnFamily.INFO, "e");
|
||||||
|
|
||||||
|
private final EntityColumnFamily columnFamily;
|
||||||
|
private final String value;
|
||||||
|
private final byte[] inBytes;
|
||||||
|
|
||||||
|
private EntityColumnDetails(EntityColumnFamily columnFamily,
|
||||||
|
String value) {
|
||||||
|
this.columnFamily = columnFamily;
|
||||||
|
this.value = value;
|
||||||
|
this.inBytes = Bytes.toBytes(this.value.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] getInBytes() {
|
||||||
|
return inBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue)
|
||||||
|
throws IOException {
|
||||||
|
TimelineWriterUtils.store(rowKey, entityTable,
|
||||||
|
this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue,
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores events data with column prefix
|
||||||
|
*/
|
||||||
|
void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes,
|
||||||
|
String key, Object inputValue) throws IOException {
|
||||||
|
TimelineWriterUtils.store(rowKey, entityTable,
|
||||||
|
this.columnFamily.getInBytes(),
|
||||||
|
// column prefix
|
||||||
|
TimelineWriterUtils.join(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
|
||||||
|
this.getInBytes(), idBytes),
|
||||||
|
// column qualifier
|
||||||
|
Bytes.toBytes(key),
|
||||||
|
inputValue, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores relation entities with a column prefix
|
||||||
|
*/
|
||||||
|
void store(byte[] rowKey, BufferedMutator entityTable, String key,
|
||||||
|
Set<String> inputValue) throws IOException {
|
||||||
|
TimelineWriterUtils.store(rowKey, entityTable,
|
||||||
|
this.columnFamily.getInBytes(),
|
||||||
|
// column prefix
|
||||||
|
this.getInBytes(),
|
||||||
|
// column qualifier
|
||||||
|
Bytes.toBytes(key),
|
||||||
|
// value
|
||||||
|
TimelineWriterUtils.getValueAsString(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue),
|
||||||
|
// cell timestamp
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO add a method that accepts a byte array,
|
||||||
|
// iterates over the enum and returns an enum from those bytes
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,95 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Contains the Column family names and byte representations for
|
||||||
|
* {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||||
|
* object that is stored in hbase
|
||||||
|
* Also has utility functions for storing each of these to the backend
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
enum EntityColumnFamily {
|
||||||
|
INFO("i"),
|
||||||
|
CONFIG("c"),
|
||||||
|
METRICS("m");
|
||||||
|
|
||||||
|
private final String value;
|
||||||
|
private final byte[] inBytes;
|
||||||
|
|
||||||
|
private EntityColumnFamily(String value) {
|
||||||
|
this.value = value;
|
||||||
|
this.inBytes = Bytes.toBytes(this.value.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] getInBytes() {
|
||||||
|
return inBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores the key as column and value as hbase column value in the given
|
||||||
|
* column family in the entity table
|
||||||
|
*
|
||||||
|
* @param rowKey
|
||||||
|
* @param entityTable
|
||||||
|
* @param inputValue
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void store(byte[] rowKey, BufferedMutator entityTable, String key,
|
||||||
|
String inputValue) throws IOException {
|
||||||
|
if (key == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
|
||||||
|
Bytes.toBytes(key), inputValue, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores the values along with cell timestamp
|
||||||
|
*
|
||||||
|
* @param rowKey
|
||||||
|
* @param entityTable
|
||||||
|
* @param key
|
||||||
|
* @param timestamp
|
||||||
|
* @param inputValue
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void store(byte[] rowKey, BufferedMutator entityTable, String key,
|
||||||
|
Long timestamp, Number inputValue) throws IOException {
|
||||||
|
if (key == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
|
||||||
|
Bytes.toBytes(key), inputValue, timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO add a method that accepts a byte array,
|
||||||
|
// iterates over the enum and returns an enum from those bytes
|
||||||
|
}
|
|
@ -0,0 +1,225 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This implements a hbase based backend for storing application timeline entity
|
||||||
|
* information.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
|
TimelineWriter {
|
||||||
|
|
||||||
|
private Connection conn;
|
||||||
|
private BufferedMutator entityTable;
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(HBaseTimelineWriterImpl.class);
|
||||||
|
|
||||||
|
public HBaseTimelineWriterImpl() {
|
||||||
|
super(HBaseTimelineWriterImpl.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public HBaseTimelineWriterImpl(Configuration conf) throws IOException {
|
||||||
|
super(conf.get("yarn.application.id",
|
||||||
|
HBaseTimelineWriterImpl.class.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* initializes the hbase connection to write to the entity table
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
super.serviceInit(conf);
|
||||||
|
Configuration hbaseConf = HBaseConfiguration.create(conf);
|
||||||
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
|
TableName entityTableName = TableName.valueOf(hbaseConf.get(
|
||||||
|
TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
|
||||||
|
TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
|
||||||
|
entityTable = conn.getBufferedMutator(entityTableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the entire information in TimelineEntities to the timeline store.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public TimelineWriteResponse write(String clusterId, String userId,
|
||||||
|
String flowName, String flowVersion, long flowRunId, String appId,
|
||||||
|
TimelineEntities data) throws IOException {
|
||||||
|
|
||||||
|
byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId,
|
||||||
|
userId, flowName, flowRunId, appId);
|
||||||
|
|
||||||
|
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
||||||
|
for (TimelineEntity te : data.getEntities()) {
|
||||||
|
|
||||||
|
// a set can have at most 1 null
|
||||||
|
if (te == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// get row key
|
||||||
|
byte[] row = TimelineWriterUtils.join(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix,
|
||||||
|
Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
|
||||||
|
|
||||||
|
storeInfo(row, te, flowVersion);
|
||||||
|
storeEvents(row, te.getEvents());
|
||||||
|
storeConfig(row, te.getConfigs());
|
||||||
|
storeMetrics(row, te.getMetrics());
|
||||||
|
storeRelations(row, te.getIsRelatedToEntities(),
|
||||||
|
EntityColumnDetails.PREFIX_IS_RELATED_TO);
|
||||||
|
storeRelations(row, te.getRelatesToEntities(),
|
||||||
|
EntityColumnDetails.PREFIX_RELATES_TO);
|
||||||
|
}
|
||||||
|
|
||||||
|
return putStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the Relations from the {@linkplain TimelineEntity} object
|
||||||
|
*/
|
||||||
|
private void storeRelations(byte[] rowKey,
|
||||||
|
Map<String, Set<String>> connectedEntities,
|
||||||
|
EntityColumnDetails columnNamePrefix) throws IOException {
|
||||||
|
for (Map.Entry<String, Set<String>> entry : connectedEntities.entrySet()) {
|
||||||
|
columnNamePrefix.store(rowKey, entityTable, entry.getKey(),
|
||||||
|
entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores information from the {@linkplain TimelineEntity} object
|
||||||
|
*/
|
||||||
|
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
EntityColumnDetails.ID.store(rowKey, entityTable, te.getId());
|
||||||
|
EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType());
|
||||||
|
EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable,
|
||||||
|
te.getCreatedTime());
|
||||||
|
EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable,
|
||||||
|
te.getModifiedTime());
|
||||||
|
EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores the config information from {@linkplain TimelineEntity}
|
||||||
|
*/
|
||||||
|
private void storeConfig(byte[] rowKey, Map<String, String> config)
|
||||||
|
throws IOException {
|
||||||
|
if (config == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Map.Entry<String, String> entry : config.entrySet()) {
|
||||||
|
EntityColumnFamily.CONFIG.store(rowKey, entityTable,
|
||||||
|
entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores the {@linkplain TimelineMetric} information from the
|
||||||
|
* {@linkplain TimelineEvent} object
|
||||||
|
*/
|
||||||
|
private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics)
|
||||||
|
throws IOException {
|
||||||
|
if (metrics != null) {
|
||||||
|
for (TimelineMetric metric : metrics) {
|
||||||
|
String key = metric.getId();
|
||||||
|
Map<Long, Number> timeseries = metric.getValues();
|
||||||
|
for (Map.Entry<Long, Number> entry : timeseries.entrySet()) {
|
||||||
|
EntityColumnFamily.METRICS.store(rowKey, entityTable, key,
|
||||||
|
entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the events from the {@linkplain TimelineEvent} object
|
||||||
|
*/
|
||||||
|
private void storeEvents(byte[] rowKey, Set<TimelineEvent> events)
|
||||||
|
throws IOException {
|
||||||
|
if (events != null) {
|
||||||
|
for (TimelineEvent event : events) {
|
||||||
|
if (event != null) {
|
||||||
|
String id = event.getId();
|
||||||
|
if (id != null) {
|
||||||
|
byte[] idBytes = Bytes.toBytes(id);
|
||||||
|
Map<String, Object> eventInfo = event.getInfo();
|
||||||
|
if (eventInfo != null) {
|
||||||
|
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
|
||||||
|
EntityColumnDetails.PREFIX_EVENTS.store(rowKey,
|
||||||
|
entityTable, idBytes, info.getKey(), info.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineWriteResponse aggregate(TimelineEntity data,
|
||||||
|
TimelineAggregationTrack track) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* close the hbase connections
|
||||||
|
* The close APIs perform flushing and release any
|
||||||
|
* resources held
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
if (entityTable != null) {
|
||||||
|
LOG.info("closing entity table");
|
||||||
|
// The close API performs flushing and releases any resources held
|
||||||
|
entityTable.close();
|
||||||
|
}
|
||||||
|
if (conn != null) {
|
||||||
|
LOG.info("closing the hbase Connection");
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class Range {
|
||||||
|
private final int startIdx;
|
||||||
|
private final int endIdx;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines a range from start index (inclusive) to end index (exclusive).
|
||||||
|
*
|
||||||
|
* @param start
|
||||||
|
* Starting index position
|
||||||
|
* @param end
|
||||||
|
* Ending index position (exclusive)
|
||||||
|
*/
|
||||||
|
public Range(int start, int end) {
|
||||||
|
if (start < 0 || end < start) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid range, required that: 0 <= start <= end; start=" + start
|
||||||
|
+ ", end=" + end);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.startIdx = start;
|
||||||
|
this.endIdx = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int start() {
|
||||||
|
return startIdx;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int end() {
|
||||||
|
return endIdx;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int length() {
|
||||||
|
return endIdx - startIdx;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* contains the constants used in the context of schema accesses for
|
||||||
|
* {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||||
|
* information
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class TimelineEntitySchemaConstants {
|
||||||
|
|
||||||
|
/** entity prefix */
|
||||||
|
public static final String ENTITY_PREFIX =
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX
|
||||||
|
+ ".entity";
|
||||||
|
|
||||||
|
/** config param name that specifies the entity table name */
|
||||||
|
public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX
|
||||||
|
+ ".table.name";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* config param name that specifies the TTL for metrics column family in
|
||||||
|
* entity table
|
||||||
|
*/
|
||||||
|
public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX
|
||||||
|
+ ".table.metrics.ttl";
|
||||||
|
|
||||||
|
/** default value for entity table name */
|
||||||
|
public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity";
|
||||||
|
|
||||||
|
/** in bytes default value for entity table name */
|
||||||
|
static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes
|
||||||
|
.toBytes(DEFAULT_ENTITY_TABLE_NAME);
|
||||||
|
|
||||||
|
/** separator in row key */
|
||||||
|
public static final String ROW_KEY_SEPARATOR = "!";
|
||||||
|
|
||||||
|
/** byte representation of the separator in row key */
|
||||||
|
static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
|
||||||
|
.toBytes(ROW_KEY_SEPARATOR);
|
||||||
|
|
||||||
|
public static final byte ZERO_BYTES = 0;
|
||||||
|
|
||||||
|
/** default TTL is 30 days for metrics timeseries */
|
||||||
|
public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000;
|
||||||
|
|
||||||
|
/** default max number of versions */
|
||||||
|
public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000;
|
||||||
|
}
|
|
@ -0,0 +1,231 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
|
import org.apache.commons.cli.Option;
|
||||||
|
import org.apache.commons.cli.Options;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.cli.PosixParser;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This creates the schema for a hbase based backend for storing application
|
||||||
|
* timeline information.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class TimelineSchemaCreator {
|
||||||
|
|
||||||
|
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
|
||||||
|
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
|
||||||
|
final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"),
|
||||||
|
Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"),
|
||||||
|
Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"),
|
||||||
|
Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
|
||||||
|
Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"),
|
||||||
|
Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
|
||||||
|
Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"),
|
||||||
|
Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"),
|
||||||
|
Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"),
|
||||||
|
Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
|
||||||
|
|
||||||
|
public static final String SPLIT_KEY_PREFIX_LENGTH = "4";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
Configuration hbaseConf = HBaseConfiguration.create();
|
||||||
|
// Grab input args and allow for -Dxyz style arguments
|
||||||
|
String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
|
||||||
|
.getRemainingArgs();
|
||||||
|
|
||||||
|
// Grab the arguments we're looking for.
|
||||||
|
CommandLine commandLine = parseArgs(otherArgs);
|
||||||
|
|
||||||
|
// Grab the entityTableName argument
|
||||||
|
String entityTableName = commandLine.getOptionValue("e");
|
||||||
|
if (StringUtils.isNotBlank(entityTableName)) {
|
||||||
|
hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
|
||||||
|
entityTableName);
|
||||||
|
}
|
||||||
|
String entityTable_TTL_Metrics = commandLine.getOptionValue("m");
|
||||||
|
if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) {
|
||||||
|
hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
|
||||||
|
entityTable_TTL_Metrics);
|
||||||
|
}
|
||||||
|
createAllTables(hbaseConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse command-line arguments.
|
||||||
|
*
|
||||||
|
* @param args
|
||||||
|
* command line arguments passed to program.
|
||||||
|
* @return parsed command line.
|
||||||
|
* @throws ParseException
|
||||||
|
*/
|
||||||
|
private static CommandLine parseArgs(String[] args) throws ParseException {
|
||||||
|
Options options = new Options();
|
||||||
|
|
||||||
|
// Input
|
||||||
|
Option o = new Option("e", "entityTableName", true, "entity table name");
|
||||||
|
o.setArgName("entityTableName");
|
||||||
|
o.setRequired(false);
|
||||||
|
options.addOption(o);
|
||||||
|
|
||||||
|
o = new Option("m", "metricsTTL", true, "TTL for metrics column family");
|
||||||
|
o.setArgName("metricsTTL");
|
||||||
|
o.setRequired(false);
|
||||||
|
options.addOption(o);
|
||||||
|
|
||||||
|
CommandLineParser parser = new PosixParser();
|
||||||
|
CommandLine commandLine = null;
|
||||||
|
try {
|
||||||
|
commandLine = parser.parse(options, args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("ERROR: " + e.getMessage() + "\n");
|
||||||
|
HelpFormatter formatter = new HelpFormatter();
|
||||||
|
formatter.printHelp(NAME + " ", options, true);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return commandLine;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createAllTables(Configuration hbaseConf)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
Connection conn = null;
|
||||||
|
try {
|
||||||
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
|
Admin admin = conn.getAdmin();
|
||||||
|
if (admin == null) {
|
||||||
|
throw new IOException("Cannot create table since admin is null");
|
||||||
|
}
|
||||||
|
createTimelineEntityTable(admin, hbaseConf);
|
||||||
|
} finally {
|
||||||
|
if (conn != null) {
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a table with column families info, config and metrics
|
||||||
|
* info stores information about a timeline entity object
|
||||||
|
* config stores configuration data of a timeline entity object
|
||||||
|
* metrics stores the metrics of a timeline entity object
|
||||||
|
*
|
||||||
|
* Example entity table record:
|
||||||
|
* <pre>
|
||||||
|
*|------------------------------------------------------------|
|
||||||
|
*| Row | Column Family | Column Family | Column Family|
|
||||||
|
*| key | info | metrics | config |
|
||||||
|
*|------------------------------------------------------------|
|
||||||
|
*| userName! | id:entityId | metricName1: | configKey1: |
|
||||||
|
*| clusterId! | | metricValue1 | configValue1 |
|
||||||
|
*| flowId! | type:entityType| @timestamp1 | |
|
||||||
|
*| flowRunId! | | | configKey2: |
|
||||||
|
*| AppId! | created_time: | metricName1: | configValue2 |
|
||||||
|
*| entityType!| 1392993084018 | metricValue2 | |
|
||||||
|
*| entityId | | @timestamp2 | |
|
||||||
|
*| | modified_time: | | |
|
||||||
|
*| | 1392995081012 | metricName2: | |
|
||||||
|
*| | | metricValue1 | |
|
||||||
|
*| | r!relatesToKey:| @timestamp2 | |
|
||||||
|
*| | id3!id4!id5 | | |
|
||||||
|
*| | | | |
|
||||||
|
*| | s!isRelatedToKey| | |
|
||||||
|
*| | id7!id9!id5 | | |
|
||||||
|
*| | | | |
|
||||||
|
*| | e!eventKey: | | |
|
||||||
|
*| | eventValue | | |
|
||||||
|
*| | | | |
|
||||||
|
*| | flowVersion: | | |
|
||||||
|
*| | versionValue | | |
|
||||||
|
*|------------------------------------------------------------|
|
||||||
|
*</pre>
|
||||||
|
* @param admin
|
||||||
|
* @param hbaseConf
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void createTimelineEntityTable(Admin admin,
|
||||||
|
Configuration hbaseConf) throws IOException {
|
||||||
|
|
||||||
|
TableName table = TableName.valueOf(hbaseConf.get(
|
||||||
|
TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
|
||||||
|
TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
|
||||||
|
if (admin.tableExists(table)) {
|
||||||
|
// do not disable / delete existing table
|
||||||
|
// similar to the approach taken by map-reduce jobs when
|
||||||
|
// output directory exists
|
||||||
|
throw new IOException("Table " + table.getNameAsString()
|
||||||
|
+ " already exists.");
|
||||||
|
}
|
||||||
|
|
||||||
|
HTableDescriptor entityTableDescp = new HTableDescriptor(table);
|
||||||
|
HColumnDescriptor cf1 = new HColumnDescriptor(
|
||||||
|
EntityColumnFamily.INFO.getInBytes());
|
||||||
|
cf1.setBloomFilterType(BloomType.ROWCOL);
|
||||||
|
entityTableDescp.addFamily(cf1);
|
||||||
|
|
||||||
|
HColumnDescriptor cf2 = new HColumnDescriptor(
|
||||||
|
EntityColumnFamily.CONFIG.getInBytes());
|
||||||
|
cf2.setBloomFilterType(BloomType.ROWCOL);
|
||||||
|
cf2.setBlockCacheEnabled(true);
|
||||||
|
entityTableDescp.addFamily(cf2);
|
||||||
|
|
||||||
|
HColumnDescriptor cf3 = new HColumnDescriptor(
|
||||||
|
EntityColumnFamily.METRICS.getInBytes());
|
||||||
|
entityTableDescp.addFamily(cf3);
|
||||||
|
cf3.setBlockCacheEnabled(true);
|
||||||
|
// always keep 1 version (the latest)
|
||||||
|
cf3.setMinVersions(1);
|
||||||
|
cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT);
|
||||||
|
cf3.setTimeToLive(hbaseConf.getInt(
|
||||||
|
TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
|
||||||
|
TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT));
|
||||||
|
entityTableDescp
|
||||||
|
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
|
||||||
|
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
|
||||||
|
SPLIT_KEY_PREFIX_LENGTH);
|
||||||
|
admin.createTable(entityTableDescp, splits);
|
||||||
|
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||||
|
+ admin.tableExists(table));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,344 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.Range;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* bunch of utility functions used across TimelineWriter classes
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class TimelineWriterUtils {
|
||||||
|
|
||||||
|
/** empty bytes */
|
||||||
|
public static final byte[] EMPTY_BYTES = new byte[0];
|
||||||
|
private static final String SPACE = " ";
|
||||||
|
private static final String UNDERSCORE = "_";
|
||||||
|
private static final String EMPTY_STRING = "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a single byte array containing all of the individual component
|
||||||
|
* arrays separated by the separator array.
|
||||||
|
*
|
||||||
|
* @param separator
|
||||||
|
* @param components
|
||||||
|
* @return byte array after joining the components
|
||||||
|
*/
|
||||||
|
public static byte[] join(byte[] separator, byte[]... components) {
|
||||||
|
if (components == null || components.length == 0) {
|
||||||
|
return EMPTY_BYTES;
|
||||||
|
}
|
||||||
|
|
||||||
|
int finalSize = 0;
|
||||||
|
if (separator != null) {
|
||||||
|
finalSize = separator.length * (components.length - 1);
|
||||||
|
}
|
||||||
|
for (byte[] comp : components) {
|
||||||
|
if (comp != null) {
|
||||||
|
finalSize += comp.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] buf = new byte[finalSize];
|
||||||
|
int offset = 0;
|
||||||
|
for (int i = 0; i < components.length; i++) {
|
||||||
|
if (components[i] != null) {
|
||||||
|
System.arraycopy(components[i], 0, buf, offset, components[i].length);
|
||||||
|
offset += components[i].length;
|
||||||
|
if (i < (components.length - 1) && separator != null
|
||||||
|
&& separator.length > 0) {
|
||||||
|
System.arraycopy(separator, 0, buf, offset, separator.length);
|
||||||
|
offset += separator.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splits the source array into multiple array segments using the given
|
||||||
|
* separator, up to a maximum of count items. This will naturally produce
|
||||||
|
* copied byte arrays for each of the split segments. To identify the split
|
||||||
|
* ranges without the array copies, see
|
||||||
|
* {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
|
||||||
|
*
|
||||||
|
* @param source
|
||||||
|
* @param separator
|
||||||
|
* @return byte[] array after splitting the source
|
||||||
|
*/
|
||||||
|
public static byte[][] split(byte[] source, byte[] separator) {
|
||||||
|
return split(source, separator, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splits the source array into multiple array segments using the given
|
||||||
|
* separator, up to a maximum of count items. This will naturally produce
|
||||||
|
* copied byte arrays for each of the split segments. To identify the split
|
||||||
|
* ranges without the array copies, see
|
||||||
|
* {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
|
||||||
|
*
|
||||||
|
* @param source
|
||||||
|
* @param separator
|
||||||
|
* @param limit
|
||||||
|
* @return byte[][] after splitting the input source
|
||||||
|
*/
|
||||||
|
public static byte[][] split(byte[] source, byte[] separator, int limit) {
|
||||||
|
List<Range> segments = splitRanges(source, separator, limit);
|
||||||
|
|
||||||
|
byte[][] splits = new byte[segments.size()][];
|
||||||
|
for (int i = 0; i < segments.size(); i++) {
|
||||||
|
Range r = segments.get(i);
|
||||||
|
byte[] tmp = new byte[r.length()];
|
||||||
|
if (tmp.length > 0) {
|
||||||
|
System.arraycopy(source, r.start(), tmp, 0, r.length());
|
||||||
|
}
|
||||||
|
splits[i] = tmp;
|
||||||
|
}
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of ranges identifying [start, end) -- closed, open --
|
||||||
|
* positions within the source byte array that would be split using the
|
||||||
|
* separator byte array.
|
||||||
|
*/
|
||||||
|
public static List<Range> splitRanges(byte[] source, byte[] separator) {
|
||||||
|
return splitRanges(source, separator, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of ranges identifying [start, end) -- closed, open --
|
||||||
|
* positions within the source byte array that would be split using the
|
||||||
|
* separator byte array.
|
||||||
|
* @param source the source data
|
||||||
|
* @param separator the separator pattern to look for
|
||||||
|
* @param limit the maximum number of splits to identify in the source
|
||||||
|
*/
|
||||||
|
public static List<Range> splitRanges(byte[] source, byte[] separator, int limit) {
|
||||||
|
List<Range> segments = new ArrayList<Range>();
|
||||||
|
if ((source == null) || (separator == null)) {
|
||||||
|
return segments;
|
||||||
|
}
|
||||||
|
int start = 0;
|
||||||
|
itersource: for (int i = 0; i < source.length; i++) {
|
||||||
|
for (int j = 0; j < separator.length; j++) {
|
||||||
|
if (source[i + j] != separator[j]) {
|
||||||
|
continue itersource;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// all separator elements matched
|
||||||
|
if (limit > 0 && segments.size() >= (limit-1)) {
|
||||||
|
// everything else goes in one final segment
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
segments.add(new Range(start, i));
|
||||||
|
start = i + separator.length;
|
||||||
|
// i will be incremented again in outer for loop
|
||||||
|
i += separator.length-1;
|
||||||
|
}
|
||||||
|
// add in remaining to a final range
|
||||||
|
if (start <= source.length) {
|
||||||
|
segments.add(new Range(start, source.length));
|
||||||
|
}
|
||||||
|
return segments;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* converts run id into it's inverse timestamp
|
||||||
|
* @param flowRunId
|
||||||
|
* @return inverted long
|
||||||
|
*/
|
||||||
|
public static long encodeRunId(Long flowRunId) {
|
||||||
|
return Long.MAX_VALUE - flowRunId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return a value from the Map as a String
|
||||||
|
* @param key
|
||||||
|
* @param values
|
||||||
|
* @return value as a String or ""
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static String getValueAsString(final byte[] key,
|
||||||
|
final Map<byte[], byte[]> values) throws IOException {
|
||||||
|
if( values == null ) {
|
||||||
|
return EMPTY_STRING;
|
||||||
|
}
|
||||||
|
byte[] value = values.get(key);
|
||||||
|
if (value != null) {
|
||||||
|
return GenericObjectMapper.read(value).toString();
|
||||||
|
} else {
|
||||||
|
return EMPTY_STRING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return a value from the Map as a long
|
||||||
|
* @param key
|
||||||
|
* @param values
|
||||||
|
* @return value as Long or 0L
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static long getValueAsLong(final byte[] key,
|
||||||
|
final Map<byte[], byte[]> values) throws IOException {
|
||||||
|
if (values == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
byte[] value = values.get(key);
|
||||||
|
if (value != null) {
|
||||||
|
Number val = (Number) GenericObjectMapper.read(value);
|
||||||
|
return val.longValue();
|
||||||
|
} else {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* concates the values from a Set<Strings> to return a single delimited string value
|
||||||
|
* @param rowKeySeparator
|
||||||
|
* @param values
|
||||||
|
* @return Value from the set of strings as a string
|
||||||
|
*/
|
||||||
|
public static String getValueAsString(String rowKeySeparator,
|
||||||
|
Set<String> values) {
|
||||||
|
|
||||||
|
if (values == null) {
|
||||||
|
return EMPTY_STRING;
|
||||||
|
}
|
||||||
|
StringBuilder concatStrings = new StringBuilder();
|
||||||
|
for (String value : values) {
|
||||||
|
concatStrings.append(value);
|
||||||
|
concatStrings.append(rowKeySeparator);
|
||||||
|
}
|
||||||
|
// remove the last separator
|
||||||
|
if(concatStrings.length() > 1) {
|
||||||
|
concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator));
|
||||||
|
}
|
||||||
|
return concatStrings.toString();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Constructs a row key prefix for the entity table
|
||||||
|
* @param clusterId
|
||||||
|
* @param userId
|
||||||
|
* @param flowId
|
||||||
|
* @param flowRunId
|
||||||
|
* @param appId
|
||||||
|
* @return byte array with the row key prefix
|
||||||
|
*/
|
||||||
|
static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
|
||||||
|
Long flowRunId, String appId) {
|
||||||
|
return TimelineWriterUtils.join(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
|
||||||
|
Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)),
|
||||||
|
Bytes.toBytes(cleanse(flowId)),
|
||||||
|
Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)),
|
||||||
|
Bytes.toBytes(cleanse(appId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a string token to be used as a key or qualifier and
|
||||||
|
* cleanses out reserved tokens.
|
||||||
|
* This operation is not symmetrical.
|
||||||
|
* Logic is to replace all spaces and separator chars in input with
|
||||||
|
* underscores.
|
||||||
|
*
|
||||||
|
* @param token token to cleanse.
|
||||||
|
* @return String with no spaces and no separator chars
|
||||||
|
*/
|
||||||
|
public static String cleanse(String token) {
|
||||||
|
if (token == null || token.length() == 0) {
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
String cleansed = token.replaceAll(SPACE, UNDERSCORE);
|
||||||
|
cleansed = cleansed.replaceAll(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE);
|
||||||
|
|
||||||
|
return cleansed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stores the info to the table in hbase
|
||||||
|
*
|
||||||
|
* @param rowKey
|
||||||
|
* @param table
|
||||||
|
* @param columnFamily
|
||||||
|
* @param columnPrefix
|
||||||
|
* @param columnQualifier
|
||||||
|
* @param inputValue
|
||||||
|
* @param cellTimeStamp
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void store(byte[] rowKey, BufferedMutator table, byte[] columnFamily,
|
||||||
|
byte[] columnPrefix, byte[] columnQualifier, Object inputValue,
|
||||||
|
Long cellTimeStamp) throws IOException {
|
||||||
|
if ((rowKey == null) || (table == null) || (columnFamily == null)
|
||||||
|
|| (columnQualifier == null) || (inputValue == null)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Put p = null;
|
||||||
|
if (cellTimeStamp == null) {
|
||||||
|
if (columnPrefix != null) {
|
||||||
|
// store with prefix
|
||||||
|
p = new Put(rowKey);
|
||||||
|
p.addColumn(
|
||||||
|
columnFamily,
|
||||||
|
join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
|
||||||
|
columnPrefix, columnQualifier), GenericObjectMapper
|
||||||
|
.write(inputValue));
|
||||||
|
} else {
|
||||||
|
// store without prefix
|
||||||
|
p = new Put(rowKey);
|
||||||
|
p.addColumn(columnFamily, columnQualifier,
|
||||||
|
GenericObjectMapper.write(inputValue));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// store with cell timestamp
|
||||||
|
Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier,
|
||||||
|
// set the cell timestamp
|
||||||
|
cellTimeStamp,
|
||||||
|
// KeyValue Type minimum
|
||||||
|
TimelineEntitySchemaConstants.ZERO_BYTES,
|
||||||
|
GenericObjectMapper.write(inputValue));
|
||||||
|
p = new Put(rowKey);
|
||||||
|
p.add(cell);
|
||||||
|
}
|
||||||
|
if (p != null) {
|
||||||
|
table.mutate(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,292 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test HBaseTimelineWriterImpl
|
||||||
|
* YARN 3411
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public class TestHBaseTimelineWriterImpl {
|
||||||
|
|
||||||
|
private static HBaseTestingUtility util;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
util = new HBaseTestingUtility();
|
||||||
|
util.startMiniCluster();
|
||||||
|
createSchema();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createSchema() throws IOException {
|
||||||
|
byte[][] families = new byte[3][];
|
||||||
|
families[0] = EntityColumnFamily.INFO.getInBytes();
|
||||||
|
families[1] = EntityColumnFamily.CONFIG.getInBytes();
|
||||||
|
families[2] = EntityColumnFamily.METRICS.getInBytes();
|
||||||
|
TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(),
|
||||||
|
util.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteEntityToHBase() throws Exception {
|
||||||
|
TimelineEntities te = new TimelineEntities();
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
String id = "hello";
|
||||||
|
String type = "world";
|
||||||
|
entity.setId(id);
|
||||||
|
entity.setType(type);
|
||||||
|
Long cTime = 1425016501000L;
|
||||||
|
Long mTime = 1425026901000L;
|
||||||
|
entity.setCreatedTime(cTime);
|
||||||
|
entity.setModifiedTime(mTime);
|
||||||
|
|
||||||
|
// add the isRelatedToEntity info
|
||||||
|
String key = "task";
|
||||||
|
String value = "is_related_to_entity_id_here";
|
||||||
|
Set<String> isRelatedToSet = new HashSet<String>();
|
||||||
|
isRelatedToSet.add(value);
|
||||||
|
Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
|
||||||
|
isRelatedTo.put(key, isRelatedToSet);
|
||||||
|
entity.setIsRelatedToEntities(isRelatedTo);
|
||||||
|
|
||||||
|
// add the relatesTo info
|
||||||
|
key = "container";
|
||||||
|
value = "relates_to_entity_id_here";
|
||||||
|
Set<String> relatesToSet = new HashSet<String>();
|
||||||
|
relatesToSet.add(value);
|
||||||
|
value = "relates_to_entity_id_here_Second";
|
||||||
|
relatesToSet.add(value);
|
||||||
|
Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
|
||||||
|
relatesTo.put(key, relatesToSet);
|
||||||
|
entity.setRelatesToEntities(relatesTo);
|
||||||
|
|
||||||
|
// add some config entries
|
||||||
|
Map<String, String> conf = new HashMap<String, String>();
|
||||||
|
conf.put("config_param1", "value1");
|
||||||
|
conf.put("config_param2", "value2");
|
||||||
|
entity.addConfigs(conf);
|
||||||
|
|
||||||
|
// add metrics
|
||||||
|
Set<TimelineMetric> metrics = new HashSet<>();
|
||||||
|
TimelineMetric m1 = new TimelineMetric();
|
||||||
|
m1.setId("MAP_SLOT_MILLIS");
|
||||||
|
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||||
|
metricValues.put(1429741609000L, 100000000);
|
||||||
|
metricValues.put(1429742609000L, 200000000);
|
||||||
|
metricValues.put(1429743609000L, 300000000);
|
||||||
|
metricValues.put(1429744609000L, 400000000);
|
||||||
|
metricValues.put(1429745609000L, 50000000000L);
|
||||||
|
metricValues.put(1429746609000L, 60000000000L);
|
||||||
|
m1.setType(Type.TIME_SERIES);
|
||||||
|
m1.setValues(metricValues);
|
||||||
|
metrics.add(m1);
|
||||||
|
entity.addMetrics(metrics);
|
||||||
|
|
||||||
|
te.addEntity(entity);
|
||||||
|
|
||||||
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
|
try {
|
||||||
|
Configuration c1 = util.getConfiguration();
|
||||||
|
hbi = new HBaseTimelineWriterImpl(c1);
|
||||||
|
hbi.init(c1);
|
||||||
|
String cluster = "cluster1";
|
||||||
|
String user = "user1";
|
||||||
|
String flow = "some_flow_name";
|
||||||
|
String flowVersion = "AB7822C10F1111";
|
||||||
|
long runid = 1002345678919L;
|
||||||
|
String appName = "some app name";
|
||||||
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||||
|
hbi.stop();
|
||||||
|
|
||||||
|
// scan the table and see that entity exists
|
||||||
|
Scan s = new Scan();
|
||||||
|
byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, flow,
|
||||||
|
runid, appName);
|
||||||
|
s.setStartRow(startRow);
|
||||||
|
s.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
TableName entityTableName = TableName
|
||||||
|
.valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME);
|
||||||
|
Connection conn = ConnectionFactory.createConnection(c1);
|
||||||
|
Table entityTable = conn.getTable(entityTableName);
|
||||||
|
int rowCount = 0;
|
||||||
|
int colCount = 0;
|
||||||
|
scanner = entityTable.getScanner(s);
|
||||||
|
for (Result result : scanner) {
|
||||||
|
if (result != null && !result.isEmpty()) {
|
||||||
|
rowCount++;
|
||||||
|
colCount += result.size();
|
||||||
|
byte[] row1 = result.getRow();
|
||||||
|
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
|
||||||
|
entity));
|
||||||
|
|
||||||
|
// check info column family
|
||||||
|
NavigableMap<byte[], byte[]> infoValues = result
|
||||||
|
.getFamilyMap(EntityColumnFamily.INFO.getInBytes());
|
||||||
|
String id1 = TimelineWriterUtils.getValueAsString(
|
||||||
|
EntityColumnDetails.ID.getInBytes(), infoValues);
|
||||||
|
assertEquals(id, id1);
|
||||||
|
String type1 = TimelineWriterUtils.getValueAsString(
|
||||||
|
EntityColumnDetails.TYPE.getInBytes(), infoValues);
|
||||||
|
assertEquals(type, type1);
|
||||||
|
Long cTime1 = TimelineWriterUtils.getValueAsLong(
|
||||||
|
EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
|
||||||
|
assertEquals(cTime1, cTime);
|
||||||
|
Long mTime1 = TimelineWriterUtils.getValueAsLong(
|
||||||
|
EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
|
||||||
|
assertEquals(mTime1, mTime);
|
||||||
|
checkRelatedEntities(isRelatedTo, infoValues,
|
||||||
|
EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes());
|
||||||
|
checkRelatedEntities(relatesTo, infoValues,
|
||||||
|
EntityColumnDetails.PREFIX_RELATES_TO.getInBytes());
|
||||||
|
|
||||||
|
// check config column family
|
||||||
|
NavigableMap<byte[], byte[]> configValuesResult = result
|
||||||
|
.getFamilyMap(EntityColumnFamily.CONFIG.getInBytes());
|
||||||
|
checkConfigs(configValuesResult, conf);
|
||||||
|
|
||||||
|
NavigableMap<byte[], byte[]> metricsResult = result
|
||||||
|
.getFamilyMap(EntityColumnFamily.METRICS.getInBytes());
|
||||||
|
checkMetricsSizeAndKey(metricsResult, metrics);
|
||||||
|
List<Cell> metricCells = result.getColumnCells(
|
||||||
|
EntityColumnFamily.METRICS.getInBytes(),
|
||||||
|
Bytes.toBytes(m1.getId()));
|
||||||
|
checkMetricsTimeseries(metricCells, m1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(1, rowCount);
|
||||||
|
assertEquals(15, colCount);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
hbi.stop();
|
||||||
|
hbi.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkMetricsTimeseries(List<Cell> metricCells,
|
||||||
|
TimelineMetric m1) throws IOException {
|
||||||
|
Map<Long, Number> timeseries = m1.getValues();
|
||||||
|
assertEquals(metricCells.size(), timeseries.size());
|
||||||
|
for (Cell c1 : metricCells) {
|
||||||
|
assertTrue(timeseries.containsKey(c1.getTimestamp()));
|
||||||
|
assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)),
|
||||||
|
timeseries.get(c1.getTimestamp()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkMetricsSizeAndKey(
|
||||||
|
NavigableMap<byte[], byte[]> metricsResult, Set<TimelineMetric> metrics) {
|
||||||
|
assertEquals(metrics.size(), metricsResult.size());
|
||||||
|
for (TimelineMetric m1 : metrics) {
|
||||||
|
byte[] key = Bytes.toBytes(m1.getId());
|
||||||
|
assertTrue(metricsResult.containsKey(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkConfigs(NavigableMap<byte[], byte[]> configValuesResult,
|
||||||
|
Map<String, String> conf) throws IOException {
|
||||||
|
|
||||||
|
assertEquals(conf.size(), configValuesResult.size());
|
||||||
|
byte[] columnName;
|
||||||
|
for (String key : conf.keySet()) {
|
||||||
|
columnName = Bytes.toBytes(key);
|
||||||
|
assertTrue(configValuesResult.containsKey(columnName));
|
||||||
|
byte[] value = configValuesResult.get(columnName);
|
||||||
|
assertNotNull(value);
|
||||||
|
assertEquals(conf.get(key), GenericObjectMapper.read(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkRelatedEntities(Map<String, Set<String>> isRelatedTo,
|
||||||
|
NavigableMap<byte[], byte[]> infoValues, byte[] columnPrefix)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
for (String key : isRelatedTo.keySet()) {
|
||||||
|
byte[] columnName = TimelineWriterUtils.join(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix,
|
||||||
|
Bytes.toBytes(key));
|
||||||
|
|
||||||
|
byte[] value = infoValues.get(columnName);
|
||||||
|
assertNotNull(value);
|
||||||
|
String isRelatedToEntities = GenericObjectMapper.read(value).toString();
|
||||||
|
assertNotNull(isRelatedToEntities);
|
||||||
|
assertEquals(
|
||||||
|
TimelineWriterUtils.getValueAsString(
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR,
|
||||||
|
isRelatedTo.get(key)), isRelatedToEntities);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
|
||||||
|
String flow, Long runid, String appName, TimelineEntity te) {
|
||||||
|
|
||||||
|
byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey,
|
||||||
|
TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES);
|
||||||
|
|
||||||
|
assertTrue(rowKeyComponents.length == 7);
|
||||||
|
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
|
||||||
|
assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
|
||||||
|
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
|
||||||
|
assertEquals(TimelineWriterUtils.encodeRunId(runid),
|
||||||
|
Bytes.toLong(rowKeyComponents[3]));
|
||||||
|
assertEquals(TimelineWriterUtils.cleanse(appName), Bytes.toString(rowKeyComponents[4]));
|
||||||
|
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
|
||||||
|
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue