diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 953d9b71fe4..d54715cc4a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -59,6 +59,13 @@ public abstract class TimelineCollectorManager extends AbstractService { super.serviceInit(conf); } + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + if (writer != null) { + writer.start(); + } + } // access to this map is synchronized with the map itself private final Map collectors = @@ -147,4 +154,16 @@ public abstract class TimelineCollectorManager extends AbstractService { return collectors.containsKey(appId); } + @Override + protected void serviceStop() throws Exception { + if (collectors != null && collectors.size() > 1) { + for (TimelineCollector c : collectors.values()) { + c.serviceStop(); + } + } + if (writer != null) { + writer.close(); + } + super.serviceStop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java new file mode 100644 index 00000000000..2894c41813d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Info Column Family details like Column names, types and byte + * representations for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * object that is stored in hbase Also has utility functions for storing each of + * these to the backend + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +enum EntityColumnDetails { + ID(EntityColumnFamily.INFO, "id"), + TYPE(EntityColumnFamily.INFO, "type"), + CREATED_TIME(EntityColumnFamily.INFO, "created_time"), + MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"), + FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"), + PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"), + PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"), + PREFIX_EVENTS(EntityColumnFamily.INFO, "e"); + + private final EntityColumnFamily columnFamily; + private final String value; + private final byte[] inBytes; + + private EntityColumnDetails(EntityColumnFamily columnFamily, + String value) { + this.columnFamily = columnFamily; + this.value = value; + this.inBytes = Bytes.toBytes(this.value.toLowerCase()); + } + + public String getValue() { + return value; + } + + byte[] getInBytes() { + return inBytes; + } + + void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue) + throws IOException { + TimelineWriterUtils.store(rowKey, entityTable, + this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue, + null); + } + + /** + * stores events data with column prefix + */ + void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes, + String key, Object inputValue) throws IOException { + TimelineWriterUtils.store(rowKey, entityTable, + this.columnFamily.getInBytes(), + // column prefix + TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + this.getInBytes(), idBytes), + // column qualifier + Bytes.toBytes(key), + inputValue, null); + } + + /** + * stores relation entities with a column prefix + */ + void store(byte[] rowKey, BufferedMutator entityTable, String key, + Set inputValue) throws IOException { + TimelineWriterUtils.store(rowKey, entityTable, + this.columnFamily.getInBytes(), + // column prefix + this.getInBytes(), + // column qualifier + Bytes.toBytes(key), + // value + TimelineWriterUtils.getValueAsString( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue), + // cell timestamp + null); + } + + // TODO add a method that accepts a byte array, + // iterates over the enum and returns an enum from those bytes + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java new file mode 100644 index 00000000000..e5563517d5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Column family names and byte representations for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * object that is stored in hbase + * Also has utility functions for storing each of these to the backend + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +enum EntityColumnFamily { + INFO("i"), + CONFIG("c"), + METRICS("m"); + + private final String value; + private final byte[] inBytes; + + private EntityColumnFamily(String value) { + this.value = value; + this.inBytes = Bytes.toBytes(this.value.toLowerCase()); + } + + byte[] getInBytes() { + return inBytes; + } + + public String getValue() { + return value; + } + + /** + * stores the key as column and value as hbase column value in the given + * column family in the entity table + * + * @param rowKey + * @param entityTable + * @param inputValue + * @throws IOException + */ + public void store(byte[] rowKey, BufferedMutator entityTable, String key, + String inputValue) throws IOException { + if (key == null) { + return; + } + TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null, + Bytes.toBytes(key), inputValue, null); + } + + /** + * stores the values along with cell timestamp + * + * @param rowKey + * @param entityTable + * @param key + * @param timestamp + * @param inputValue + * @throws IOException + */ + public void store(byte[] rowKey, BufferedMutator entityTable, String key, + Long timestamp, Number inputValue) throws IOException { + if (key == null) { + return; + } + TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null, + Bytes.toBytes(key), inputValue, timestamp); + } + + // TODO add a method that accepts a byte array, + // iterates over the enum and returns an enum from those bytes +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 00000000000..aa71c6c6316 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants; + +/** + * This implements a hbase based backend for storing application timeline entity + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private Connection conn; + private BufferedMutator entityTable; + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + public HBaseTimelineWriterImpl() { + super(HBaseTimelineWriterImpl.class.getName()); + } + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", + HBaseTimelineWriterImpl.class.getName())); + } + + /** + * initializes the hbase connection to write to the entity table + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + Configuration hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + TableName entityTableName = TableName.valueOf(hbaseConf.get( + TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); + entityTable = conn.getBufferedMutator(entityTableName); + } + + /** + * Stores the entire information in TimelineEntities to the timeline store. + */ + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId, + userId, flowName, flowRunId, appId); + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + for (TimelineEntity te : data.getEntities()) { + + // a set can have at most 1 null + if (te == null) { + continue; + } + // get row key + byte[] row = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix, + Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); + + storeInfo(row, te, flowVersion); + storeEvents(row, te.getEvents()); + storeConfig(row, te.getConfigs()); + storeMetrics(row, te.getMetrics()); + storeRelations(row, te.getIsRelatedToEntities(), + EntityColumnDetails.PREFIX_IS_RELATED_TO); + storeRelations(row, te.getRelatesToEntities(), + EntityColumnDetails.PREFIX_RELATES_TO); + } + + return putStatus; + } + + /** + * Stores the Relations from the {@linkplain TimelineEntity} object + */ + private void storeRelations(byte[] rowKey, + Map> connectedEntities, + EntityColumnDetails columnNamePrefix) throws IOException { + for (Map.Entry> entry : connectedEntities.entrySet()) { + columnNamePrefix.store(rowKey, entityTable, entry.getKey(), + entry.getValue()); + } + } + + /** + * Stores information from the {@linkplain TimelineEntity} object + */ + private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) + throws IOException { + + EntityColumnDetails.ID.store(rowKey, entityTable, te.getId()); + EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType()); + EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable, + te.getCreatedTime()); + EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable, + te.getModifiedTime()); + EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion); + } + + /** + * stores the config information from {@linkplain TimelineEntity} + */ + private void storeConfig(byte[] rowKey, Map config) + throws IOException { + if (config == null) { + return; + } + for (Map.Entry entry : config.entrySet()) { + EntityColumnFamily.CONFIG.store(rowKey, entityTable, + entry.getKey(), entry.getValue()); + } + } + + /** + * stores the {@linkplain TimelineMetric} information from the + * {@linkplain TimelineEvent} object + */ + private void storeMetrics(byte[] rowKey, Set metrics) + throws IOException { + if (metrics != null) { + for (TimelineMetric metric : metrics) { + String key = metric.getId(); + Map timeseries = metric.getValues(); + for (Map.Entry entry : timeseries.entrySet()) { + EntityColumnFamily.METRICS.store(rowKey, entityTable, key, + entry.getKey(), entry.getValue()); + } + } + } + } + + /** + * Stores the events from the {@linkplain TimelineEvent} object + */ + private void storeEvents(byte[] rowKey, Set events) + throws IOException { + if (events != null) { + for (TimelineEvent event : events) { + if (event != null) { + String id = event.getId(); + if (id != null) { + byte[] idBytes = Bytes.toBytes(id); + Map eventInfo = event.getInfo(); + if (eventInfo != null) { + for (Map.Entry info : eventInfo.entrySet()) { + EntityColumnDetails.PREFIX_EVENTS.store(rowKey, + entityTable, idBytes, info.getKey(), info.getValue()); + } + } + } + } + } + } + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + /** + * close the hbase connections + * The close APIs perform flushing and release any + * resources held + */ + @Override + protected void serviceStop() throws Exception { + if (entityTable != null) { + LOG.info("closing entity table"); + // The close API performs flushing and releases any resources held + entityTable.close(); + } + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java new file mode 100644 index 00000000000..2a2db816b3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class Range { + private final int startIdx; + private final int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java new file mode 100644 index 00000000000..d95cbb27242 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineEntitySchemaConstants { + + /** entity prefix */ + public static final String ENTITY_PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + + ".entity"; + + /** config param name that specifies the entity table name */ + public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX + + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * entity table + */ + public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX + + ".table.metrics.ttl"; + + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; + + /** in bytes default value for entity table name */ + static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes + .toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** separator in row key */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** byte representation of the separator in row key */ + static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + + public static final byte ZERO_BYTES = 0; + + /** default TTL is 30 days for metrics timeseries */ + public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000; + + /** default max number of versions */ + public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 00000000000..820a6d1244c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineSchemaCreator { + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); + final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"), + Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"), + Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"), + Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"), + Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), + Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), + Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"), + Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"), + Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), + Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; + + public static final String SPLIT_KEY_PREFIX_LENGTH = "4"; + + public static void main(String[] args) throws Exception { + + Configuration hbaseConf = HBaseConfiguration.create(); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + // Grab the entityTableName argument + String entityTableName = commandLine.getOptionValue("e"); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + entityTableName); + } + String entityTable_TTL_Metrics = commandLine.getOptionValue("m"); + if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) { + hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, + entityTable_TTL_Metrics); + } + createAllTables(hbaseConf); + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option("e", "entityTableName", true, "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option("m", "metricsTTL", true, "TTL for metrics column family"); + o.setArgName("metricsTTL"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + private static void createAllTables(Configuration hbaseConf) + throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + createTimelineEntityTable(admin, hbaseConf); + } finally { + if (conn != null) { + conn.close(); + } + } + } + + /** + * Creates a table with column families info, config and metrics + * info stores information about a timeline entity object + * config stores configuration data of a timeline entity object + * metrics stores the metrics of a timeline entity object + * + * Example entity table record: + *
+   *|------------------------------------------------------------|
+   *|  Row       | Column Family  | Column Family | Column Family|
+   *|  key       | info           | metrics       | config       |
+   *|------------------------------------------------------------|
+   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
+   *| clusterId! |                | metricValue1  | configValue1 |
+   *| flowId!    | type:entityType| @timestamp1   |              |
+   *| flowRunId! |                |               | configKey2:  |
+   *| AppId!     | created_time:  | metricName1:  | configValue2 |
+   *| entityType!| 1392993084018  | metricValue2  |              |
+   *| entityId   |                | @timestamp2   |              |
+   *|            | modified_time: |               |              |
+   *|            | 1392995081012  | metricName2:  |              |
+   *|            |                | metricValue1  |              |
+   *|            | r!relatesToKey:| @timestamp2   |              |
+   *|            | id3!id4!id5    |               |              |
+   *|            |                |               |              |
+   *|            | s!isRelatedToKey|              |              |
+   *|            | id7!id9!id5    |               |              |
+   *|            |                |               |              |
+   *|            | e!eventKey:    |               |              |
+   *|            | eventValue     |               |              |
+   *|            |                |               |              |
+   *|            | flowVersion:   |               |              |
+   *|            | versionValue   |               |              |
+   *|------------------------------------------------------------|
+   *
+ * @param admin + * @param hbaseConf + * @throws IOException + */ + public static void createTimelineEntityTable(Admin admin, + Configuration hbaseConf) throws IOException { + + TableName table = TableName.valueOf(hbaseConf.get( + TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor cf1 = new HColumnDescriptor( + EntityColumnFamily.INFO.getInBytes()); + cf1.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(cf1); + + HColumnDescriptor cf2 = new HColumnDescriptor( + EntityColumnFamily.CONFIG.getInBytes()); + cf2.setBloomFilterType(BloomType.ROWCOL); + cf2.setBlockCacheEnabled(true); + entityTableDescp.addFamily(cf2); + + HColumnDescriptor cf3 = new HColumnDescriptor( + EntityColumnFamily.METRICS.getInBytes()); + entityTableDescp.addFamily(cf3); + cf3.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + cf3.setMinVersions(1); + cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT); + cf3.setTimeToLive(hbaseConf.getInt( + TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, + TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT)); + entityTableDescp + .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(entityTableDescp, splits); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 00000000000..113935e8cee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.Range; + +/** + * bunch of utility functions used across TimelineWriter classes + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + private static final String SPACE = " "; + private static final String UNDERSCORE = "_"; + private static final String EMPTY_STRING = ""; + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return byte array after joining the components + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + if (comp != null) { + finalSize += comp.length; + } + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + if (components[i] != null) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + } + return buf; + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[] array after splitting the source + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, -1); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @param limit + * @return byte[][] after splitting the input source + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, -1); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, int limit) { + List segments = new ArrayList(); + if ((source == null) || (separator == null)) { + return segments; + } + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit-1)) { + // everything else goes in one final segment + break; + } + + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length-1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * converts run id into it's inverse timestamp + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + /** + * return a value from the Map as a String + * @param key + * @param values + * @return value as a String or "" + * @throws IOException + */ + public static String getValueAsString(final byte[] key, + final Map values) throws IOException { + if( values == null ) { + return EMPTY_STRING; + } + byte[] value = values.get(key); + if (value != null) { + return GenericObjectMapper.read(value).toString(); + } else { + return EMPTY_STRING; + } + } + + /** + * return a value from the Map as a long + * @param key + * @param values + * @return value as Long or 0L + * @throws IOException + */ + public static long getValueAsLong(final byte[] key, + final Map values) throws IOException { + if (values == null) { + return 0; + } + byte[] value = values.get(key); + if (value != null) { + Number val = (Number) GenericObjectMapper.read(value); + return val.longValue(); + } else { + return 0L; + } + } + + /** + * concates the values from a Set to return a single delimited string value + * @param rowKeySeparator + * @param values + * @return Value from the set of strings as a string + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + if (values == null) { + return EMPTY_STRING; + } + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if(concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + /** + * Constructs a row key prefix for the entity table + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key prefix + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)), + Bytes.toBytes(cleanse(flowId)), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(cleanse(appId))); + } + + /** + * Takes a string token to be used as a key or qualifier and + * cleanses out reserved tokens. + * This operation is not symmetrical. + * Logic is to replace all spaces and separator chars in input with + * underscores. + * + * @param token token to cleanse. + * @return String with no spaces and no separator chars + */ + public static String cleanse(String token) { + if (token == null || token.length() == 0) { + return token; + } + + String cleansed = token.replaceAll(SPACE, UNDERSCORE); + cleansed = cleansed.replaceAll( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE); + + return cleansed; + } + + /** + * stores the info to the table in hbase + * + * @param rowKey + * @param table + * @param columnFamily + * @param columnPrefix + * @param columnQualifier + * @param inputValue + * @param cellTimeStamp + * @throws IOException + */ + public static void store(byte[] rowKey, BufferedMutator table, byte[] columnFamily, + byte[] columnPrefix, byte[] columnQualifier, Object inputValue, + Long cellTimeStamp) throws IOException { + if ((rowKey == null) || (table == null) || (columnFamily == null) + || (columnQualifier == null) || (inputValue == null)) { + return; + } + + Put p = null; + if (cellTimeStamp == null) { + if (columnPrefix != null) { + // store with prefix + p = new Put(rowKey); + p.addColumn( + columnFamily, + join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + columnPrefix, columnQualifier), GenericObjectMapper + .write(inputValue)); + } else { + // store without prefix + p = new Put(rowKey); + p.addColumn(columnFamily, columnQualifier, + GenericObjectMapper.write(inputValue)); + } + } else { + // store with cell timestamp + Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier, + // set the cell timestamp + cellTimeStamp, + // KeyValue Type minimum + TimelineEntitySchemaConstants.ZERO_BYTES, + GenericObjectMapper.write(inputValue)); + p = new Put(rowKey); + p.add(cell); + } + if (p != null) { + table.mutate(p); + } + + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java new file mode 100644 index 00000000000..48bacd6f66b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +/** + * Unit test HBaseTimelineWriterImpl + * YARN 3411 + * + * @throws Exception + */ +public class TestHBaseTimelineWriterImpl { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + byte[][] families = new byte[3][]; + families[0] = EntityColumnFamily.INFO.getInBytes(); + families[1] = EntityColumnFamily.CONFIG.getInBytes(); + families[2] = EntityColumnFamily.METRICS.getInBytes(); + TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(), + util.getConfiguration()); + } + + @Test + public void testWriteEntityToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + metricValues.put(1429741609000L, 100000000); + metricValues.put(1429742609000L, 200000000); + metricValues.put(1429743609000L, 300000000); + metricValues.put(1429744609000L, 400000000); + metricValues.put(1429745609000L, 50000000000L); + metricValues.put(1429746609000L, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "some app name"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.stop(); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, flow, + runid, appName); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = null; + TableName entityTableName = TableName + .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(c1); + Table entityTable = conn.getTable(entityTableName); + int rowCount = 0; + int colCount = 0; + scanner = entityTable.getScanner(s); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check info column family + NavigableMap infoValues = result + .getFamilyMap(EntityColumnFamily.INFO.getInBytes()); + String id1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.ID.getInBytes(), infoValues); + assertEquals(id, id1); + String type1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.TYPE.getInBytes(), infoValues); + assertEquals(type, type1); + Long cTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues); + assertEquals(cTime1, cTime); + Long mTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues); + assertEquals(mTime1, mTime); + checkRelatedEntities(isRelatedTo, infoValues, + EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes()); + checkRelatedEntities(relatesTo, infoValues, + EntityColumnDetails.PREFIX_RELATES_TO.getInBytes()); + + // check config column family + NavigableMap configValuesResult = result + .getFamilyMap(EntityColumnFamily.CONFIG.getInBytes()); + checkConfigs(configValuesResult, conf); + + NavigableMap metricsResult = result + .getFamilyMap(EntityColumnFamily.METRICS.getInBytes()); + checkMetricsSizeAndKey(metricsResult, metrics); + List metricCells = result.getColumnCells( + EntityColumnFamily.METRICS.getInBytes(), + Bytes.toBytes(m1.getId())); + checkMetricsTimeseries(metricCells, m1); + } + } + assertEquals(1, rowCount); + assertEquals(15, colCount); + + } finally { + hbi.stop(); + hbi.close(); + } + } + + private void checkMetricsTimeseries(List metricCells, + TimelineMetric m1) throws IOException { + Map timeseries = m1.getValues(); + assertEquals(metricCells.size(), timeseries.size()); + for (Cell c1 : metricCells) { + assertTrue(timeseries.containsKey(c1.getTimestamp())); + assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)), + timeseries.get(c1.getTimestamp())); + } + } + + private void checkMetricsSizeAndKey( + NavigableMap metricsResult, Set metrics) { + assertEquals(metrics.size(), metricsResult.size()); + for (TimelineMetric m1 : metrics) { + byte[] key = Bytes.toBytes(m1.getId()); + assertTrue(metricsResult.containsKey(key)); + } + } + + private void checkConfigs(NavigableMap configValuesResult, + Map conf) throws IOException { + + assertEquals(conf.size(), configValuesResult.size()); + byte[] columnName; + for (String key : conf.keySet()) { + columnName = Bytes.toBytes(key); + assertTrue(configValuesResult.containsKey(columnName)); + byte[] value = configValuesResult.get(columnName); + assertNotNull(value); + assertEquals(conf.get(key), GenericObjectMapper.read(value)); + } + } + + private void checkRelatedEntities(Map> isRelatedTo, + NavigableMap infoValues, byte[] columnPrefix) + throws IOException { + + for (String key : isRelatedTo.keySet()) { + byte[] columnName = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix, + Bytes.toBytes(key)); + + byte[] value = infoValues.get(columnName); + assertNotNull(value); + String isRelatedToEntities = GenericObjectMapper.read(value).toString(); + assertNotNull(isRelatedToEntities); + assertEquals( + TimelineWriterUtils.getValueAsString( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, + isRelatedTo.get(key)), isRelatedToEntities); + } + } + + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid, String appName, TimelineEntity te) { + + byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey, + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES); + + assertTrue(rowKeyComponents.length == 7); + assertEquals(user, Bytes.toString(rowKeyComponents[0])); + assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.encodeRunId(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(TimelineWriterUtils.cleanse(appName), Bytes.toString(rowKeyComponents[4])); + assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); + assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + return true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}